高并发向量语义检索加速:5种方法性能排名

2026-06-12阅读 0热度 0
人工智能

当我们需要同时处理多个查询——无论是批量问答、RAG 多路召回,还是多用户并发搜索——最简单的做法是一条一条地跑,但结果就是整体耗时随查询数量线性增长,效率大打折扣。换个思路呢?把这些查询丢出去并发执行。你会发现,总耗时从 N 倍的单次延迟,直接逼近到一次延迟的水平,吞吐量的提升可以说是立竿见影。

并发这事儿,做对了是吞吐利器,做砸了就是限流元凶。下面从两个维度展开:CLI 并发SDK 并发。它们各自擅长不同的战场:

  • 批量语义搜索:一次扔进去一堆查询文本,快速拿到所有结果。
  • RAG 多路召回:同一个请求下,从不同角度同时发起检索,端到端延迟能降下来。
  • 多模态批量检索:文本、图片、视频……不同模态的向量数据一起上,也扛得住。

并发方式怎么选?先给一个判断框架

方式适用场景特点
CLI 并发运维脚本、一次性批量检索、快速验证(不用写代码)输入文本自动 Embedding;不用管向量维度;Shell 脚本就能实现
SDK 并发业务服务集成、需要精细控制(过滤条件、后处理)、高性能后端直接调用 API,过滤条件随意设;复用客户端连接;Python 和 Go 都支持

一句话总结:手里有一批查询文本,想最快拿到结果又不想写代码 → 走 CLI 并发。正在开发业务服务,需要把向量检索嵌进应用逻辑 → 走 SDK 并发

注意一个区别:CLI 内置了 Embedding 模型调用,输入文本就行;SDK 需要你传进去已经生成好的向量,适合那些已经有 Embedding 流程的场景。

CLI 并发检索:从简单到灵活

CLI 并发的思路很直接:启动多个 oss-vectors-embed query 进程并行跑。下面给出三种方式,复杂度从低到高,按需取用。

动手之前,确认几件事:

  • 已经装好 OSS Vectors Embed CLI。
  • 环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRETDASHSCOPE_API_KEY 都配好了。
  • 向量 Bucket 和索引已经存在,且索引维度跟 Embedding 模型输出维度一致。

把下面这些占位符换成你自己的实际值:

占位符说明
阿里云账号 ID
向量 Bucket 名称
向量索引名称

xargs 快速并发:一行命令搞定

不需要复杂流程控制?xargs -P 是最简单的路子。

cat queries.txt | xargs -P 5 -I {} 
oss-vectors-embed 
--account-id "" 
--vectors-region cn-hangzhou 
query 
--vector-bucket-name "" 
--index-name "" 
--model-id text-embedding-v4 
--text-value "{}" 
--top-k 10

-P 5 意思是最多 5 个进程并行跑。结果直接打到终端,适合快速验证。想保存?重定向到文件就行。

Shell 后台并发:适合查询数不多的情况

通过 & 把多个查询扔到后台,再用 wait 等全部结束。10 条以内的查询,这个方式很干净。

#!/bin/bash

ACCOUNT_ID=""
REGION="cn-hangzhou"
BUCKET=""
INDEX=""
MODEL="text-embedding-v4"

queries=(
  "如何配置生命周期规则"
  "对象存储有哪些存储类型"
  "如何设置跨区域复制"
)

mkdir -p ./query-results

for i in "${!queries[@]}"; do
  oss-vectors-embed 
    --account-id "$ACCOUNT_ID" 
    --vectors-region "$REGION" 
    query 
    --vector-bucket-name "$BUCKET" 
    --index-name "$INDEX" 
    --model-id "$MODEL" 
    --text-value "${queries[$i]}" 
    --top-k 10 
    --return-metadata 
    > "./query-results/result_${i}.json" 2>&1 &
done

wait
echo "全部查询完成,结果保存在 ./query-results/"

每个结果都是独立的 JSON 文件,用 cat ./query-results/result_0.json | python3 -m json.tool 就能看。

控制并发数的 Shell 脚本:查询量大时必备

如果查询有几十上百条,就得限制同时跑的进程数,不然 API 配额扛不住。下面这个脚本从文件逐行读查询,最多同时跑 5 个。

#!/bin/bash

ACCOUNT_ID=""
REGION="cn-hangzhou"
BUCKET=""
INDEX=""
MODEL="text-embedding-v4"

MAX_CONCURRENT=5
QUERY_FILE="./queries.txt"

mkdir -p ./query-results

run_query() {
  local idx=$1
  local text=$2
  oss-vectors-embed 
    --account-id "$ACCOUNT_ID" 
    --vectors-region "$REGION" 
    query 
    --vector-bucket-name "$BUCKET" 
    --index-name "$INDEX" 
    --model-id "$MODEL" 
    --text-value "$text" 
    --top-k 10 
    > "./query-results/result_${idx}.json" 2>&1
}

idx=0
while IFS= read -r query_text; do
  run_query "$idx" "$query_text" &
  idx=$((idx + 1))

  if (( $(jobs -rp | wc -l) >= MAX_CONCURRENT )); then
    wait -n
  fi
done < "$QUERY_FILE"

wait
echo "全部 $idx 条查询完成"

运行前准备 queries.txt,每行一条查询文本:

如何配置生命周期规则
对象存储有哪些存储类型
如何设置跨区域复制
Bucket 标签的使用限制
如何启用版本控制

输出:全部 5 条查询完成

Python 封装 CLI 并发:需要后处理时

如果返回的结果要做解析、汇总、统计,用 Python 的 asyncio 封装一下 CLI 调用更顺手。

import asyncio
import json
from pathlib import Path

ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
INDEX = ""
MODEL = "text-embedding-v4"
MAX_CONCURRENT = 5

async def run_query(semaphore: asyncio.Semaphore, query_text: str, query_id: int):
    async with semaphore:
        cmd = [
            "oss-vectors-embed",
            "--account-id", ACCOUNT_ID,
            "--vectors-region", REGION,
            "query",
            "--vector-bucket-name", BUCKET,
            "--index-name", INDEX,
            "--model-id", MODEL,
            "--text-value", query_text,
            "--top-k", "10",
            "--return-metadata",
        ]
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()
        if proc.returncode == 0:
            result = json.loads(stdout.decode())
            print(f"查询 {query_id} 完成,返回 {len(result.get('results', []))} 条结果")
            return {"query_id": query_id, "query_text": query_text, "result": result}
        else:
            print(f"查询 {query_id} 失败: {stderr.decode()}")
            return {"query_id": query_id, "query_text": query_text, "error": stderr.decode()}

async def batch_query(queries: list[str]):
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    tasks = [run_query(semaphore, text, idx) for idx, text in enumerate(queries)]
    results = await asyncio.gather(*tasks)
    output_path = Path("./query-results/batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results

if __name__ == "__main__":
    queries = [
        "如何配置生命周期规则",
        "对象存储有哪些存储类型",
        "如何设置跨区域复制",
        "Bucket 标签的使用限制",
        "如何启用版本控制",
    ]
    asyncio.run(batch_query(queries))

输出:

查询 0 完成,返回 10 条结果
查询 1 完成,返回 10 条结果
查询 2 完成,返回 10 条结果
查询 3 完成,返回 10 条结果
查询 4 完成,返回 10 条结果
汇总结果已保存到 query-results/batch_results.json

SDK 并发检索:业务集成的正确姿势

SDK 并发不走外部进程,直接调用 query_vectors API。当你需要控制过滤条件、或者把检索集成到服务中时,这条路更干净。

同样的提醒:SDK 方式需要传已经生成的查询向量(比如 float32 数组),而不是原始文本。如果从文本出发,建议先用 Embedding 模型生成向量,或者直接用上面的 CLI 方式。

Python SDK 并发检索

alibabacloud-oss-v2 的线程池并发调用 query_vectors。先装 SDK:

pip install alibabacloud-oss-v2

环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET 要配好,向量 Bucket 和索引也要已创建。

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors

ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
INDEX = ""
MAX_CONCURRENT = 5

def create_vector_client():
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)

def run_query(client, query_vector, query_id, query_filter=None):
    request = oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=INDEX,
        query_vector=query_vector,
        filter=query_filter,
        return_distance=True,
        return_metadata=True,
        top_k=10,
    )
    result = client.query_vectors(request)
    print(f"查询 {query_id} 完成,status code: {result.status_code}")
    return {
        "query_id": query_id,
        "status_code": result.status_code,
        "vectors": [str(v) for v in result.vectors] if result.vectors else [],
    }

def batch_query(query_vectors):
    client = create_vector_client()
    results = []
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(run_query, client, qv, idx): idx
            for idx, qv in enumerate(query_vectors)
        }
        for future in as_completed(futures):
            idx = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询 {idx} 失败: {e}")
                results.append({"query_id": idx, "error": str(e)})
    results.sort(key=lambda x: x["query_id"])
    output_path = Path("./query-results/sdk_batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results

if __name__ == "__main__":
    query_vectors = [
        {"float32": [0.1] * 128},
        {"float32": [0.2] * 128},
        {"float32": [0.3] * 128},
        {"float32": [0.4] * 128},
        {"float32": [0.5] * 128},
    ]
    batch_query(query_vectors)

输出(注意顺序可能乱,但最终会按 query_id 排好):

查询 0 完成,status code: 200
查询 2 完成,status code: 200
查询 1 完成,status code: 200
查询 4 完成,status code: 200
查询 3 完成,status code: 200
汇总结果已保存到 query-results/sdk_batch_results.json

带过滤条件的并发检索

实际业务中,不同查询可能需要不同的过滤条件。下面这个例子每条查询都配了自己的 filter

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors

ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
INDEX = ""
MAX_CONCURRENT = 5

def create_vector_client():
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)

def run_query(client, query_vector, query_id, query_filter=None):
    request = oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=INDEX,
        query_vector=query_vector,
        filter=query_filter,
        return_distance=True,
        return_metadata=True,
        top_k=10,
    )
    result = client.query_vectors(request)
    print(f"查询 {query_id} 完成,status code: {result.status_code}")
    return {
        "query_id": query_id,
        "status_code": result.status_code,
        "vectors": [str(v) for v in result.vectors] if result.vectors else [],
    }

def batch_query(query_vectors):
    client = create_vector_client()
    results = []
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(run_query, client, qv, idx): idx
            for idx, qv in enumerate(query_vectors)
        }
        for future in as_completed(futures):
            idx = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询 {idx} 失败: {e}")
                results.append({"query_id": idx, "error": str(e)})
    results.sort(key=lambda x: x["query_id"])
    output_path = Path("./query-results/sdk_batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results

if __name__ == "__main__":
    tasks = [
        {
            "vector": {"float32": [0.1] * 128},
            "filter": {"$and": [{"type": {"$in": ["tutorial"]}}]},
        },
        {
            "vector": {"float32": [0.2] * 128},
            "filter": {"$and": [{"type": {"$nin": ["comedy", "documentary"]}}]},
        },
        {
            "vector": {"float32": [0.3] * 128},
            "filter": None,
        },
    ]
    client = create_vector_client()
    results = []
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(run_query, client, t["vector"], idx, t["filter"]): idx
            for idx, t in enumerate(tasks)
        }
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询失败: {e}")
    for r in sorted(results, key=lambda x: x["query_id"]):
        print(f"查询 {r['query_id']}: 返回 {len(r.get('vectors', []))} 条结果")

输出:

查询 0 完成,status code: 200
查询 1 完成,status code: 200
查询 2 完成,status code: 200
查询 0: 返回 10 条结果
查询 1: 返回 10 条结果
查询 2: 返回 10 条结果

Go SDK 并发检索

alibabacloud-oss-go-sdk-v2 的 goroutine + 信号量来控制并发。先装 SDK:

go get github.com/aliyun/alibabacloud-oss-go-sdk-v2

环境变量配好,Bucket 和索引已创建。

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)

const (
    region        = "cn-hangzhou"
    bucketName    = ""
    accountId     = ""
    indexName     = ""
    maxConcurrent = 5
)

func main() {
    cfg := oss.LoadDefaultConfig().
        WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
        WithRegion(region).
        WithAccountId(accountId)
    client := vectors.NewVectorsClient(cfg)

    queryVectors := []map[string]any{
        {"float32": []float32{0.1}},
        {"float32": []float32{0.2}},
        {"float32": []float32{0.3}},
        {"float32": []float32{0.4}},
        {"float32": []float32{0.5}},
    }

    var wg sync.WaitGroup
    sem := make(chan struct{}, maxConcurrent)

    for i, qv := range queryVectors {
        wg.Add(1)
        sem <- struct{}{}
        go func(idx int, queryVector map[string]any) {
            defer wg.Done()
            defer func() { <-sem }()
            request := &vectors.QueryVectorsRequest{
                Bucket:         oss.Ptr(bucketName),
                IndexName:      oss.Ptr(indexName),
                QueryVector:    queryVector,
                ReturnMetadata: oss.Ptr(true),
                ReturnDistance: oss.Ptr(true),
                TopK:           oss.Ptr(10),
            }
            result, err := client.QueryVectors(context.TODO(), request)
            if err != nil {
                log.Printf("查询 %d 失败: %v", idx, err)
                return
            }
            fmt.Printf("查询 %d 完成,status code: %d\n", idx, result.StatusCode)
        }(i, qv)
    }
    wg.Wait()
    fmt.Println("全部查询完成")
}

输出:

查询 0 完成,status code: 200
查询 2 完成,status code: 200
查询 1 完成,status code: 200
查询 3 完成,status code: 200
查询 4 完成,status code: 200
全部查询完成

带过滤条件的 Go SDK 并发检索

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)

type queryTask struct {
    vector map[string]any
    filter map[string]any
}

const (
    region        = "cn-hangzhou"
    bucketName    = ""
    accountId     = ""
    indexName     = ""
    maxConcurrent = 5
)

func main() {
    cfg := oss.LoadDefaultConfig().
        WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
        WithRegion(region).
        WithAccountId(accountId)
    client := vectors.NewVectorsClient(cfg)

    tasks := []queryTask{
        {
            vector: map[string]any{"float32": []float32{0.1}},
            filter: map[string]any{
                "$and": []map[string]any{
                    {"type": map[string]any{"$in": []string{"tutorial"}}},
                },
            },
        },
        {
            vector: map[string]any{"float32": []float32{0.2}},
            filter: map[string]any{
                "$and": []map[string]any{
                    {"type": map[string]any{"$nin": []string{"comedy", "documentary"}}},
                },
            },
        },
        {
            vector: map[string]any{"float32": []float32{0.3}},
            filter: nil,
        },
    }

    var wg sync.WaitGroup
    sem := make(chan struct{}, maxConcurrent)

    for i, task := range tasks {
        wg.Add(1)
        sem <- struct{}{}
        go func(idx int, t queryTask) {
            defer wg.Done()
            defer func() { <-sem }()
            request := &vectors.QueryVectorsRequest{
                Bucket:         oss.Ptr(bucketName),
                IndexName:      oss.Ptr(indexName),
                QueryVector:    t.vector,
                ReturnMetadata: oss.Ptr(true),
                ReturnDistance: oss.Ptr(true),
                TopK:           oss.Ptr(10),
            }
            if t.filter != nil {
                request.Filter = t.filter
            }
            result, err := client.QueryVectors(context.TODO(), request)
            if err != nil {
                log.Printf("查询 %d 失败: %v", idx, err)
                return
            }
            fmt.Printf("查询 %d 完成,status code: %d\n", idx, result.StatusCode)
        }(i, task)
    }
    wg.Wait()
    fmt.Println("全部查询完成")
}

输出:

查询 0 完成,status code: 200
查询 1 完成,status code: 200
查询 2 完成,status code: 200
全部查询完成

并发性能调优:几个关键点

调优项建议说明
并发数3~5query 并发别超过 5,跟 put 命令的最大并发一致,避免触发限流
top_k按需设置返回结果越多,单次请求延迟越高,只拿业务需要的数量就好
错误重试间隔 1~2 秒并发时可能遇到 HTTP 429,捕获到限流错误后等一等再重试
CLI 结果输出重定向到文件多个进程同时往终端输出内容会交错,每条结果写进独立文件更清爽
SDK 客户端复用复用同一个实例不要每条查询都 new 一个 Client,重复创建增加连接建立和认证开销

相关文档

  • 使用OSS Vectors Embed CLI工具写入和检索向量数据
  • OSS向量Bucket最佳实践:快速构建多模态图片语义检索
免责声明

本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。

相关阅读

更多
欢迎回来 登录或注册后,可保存提示词和历史记录
登录后可同步收藏、历史记录和常用模板
注册即表示同意服务条款与隐私政策