在做 RAG 或者语义搜索的时候,向量检索系统通常要面对两个比较棘手的问题。
一个是多租户隔离——比如你是一个SaaS服务商,手里几十上百个企业客户,每个客户的知识库必须天然隔离,谁的也不能串到谁家去。另一个就是数据规模太大,单索引的量级一旦到了千万甚至亿级,检索延迟就很难压下来,实时性往往跟不上。
针对这两个场景,OSS 向量 Bucket 提供了一种挺实用的思路:支持在同一账号、同一地域下创建大量的向量索引(Index)。通过将数据按租户或业务维度拆分到不同的索引中,隔离和性能的问题都能兼顾着解决。
多索引架构的优势
- 数据隔离:让不同租户的数据放在各自独立的索引里,从物理层面杜绝了跨租户泄露的风险。
- 检索提速:大表拆成多张小表之后,单次检索的范围立刻变小了。如果再配合并发检索多个索引再合并结果,总体响应时延还能进一步降下来。
- 运维灵活:每个索引可以独立配置维度、模型和相似度算法。删某个租户的数据也很省事,直接删掉对应的索引就行,不用逐条过滤删除。
通过 CLI 按租户导入数据
oss-vectors-embed 这个 CLI 工具可以指定文件写入到指定的索引里,天然就支持按租户或业务做定向导入。
开始之前,需要确保下面几个条件已经满足:
- 配置好环境变量
OSS_ACCESS_KEY_ID、OSS_ACCESS_KEY_SECRET 和 DASHSCOPE_API_KEY。
- 已经创建了向量 Bucket,以及每个租户对应的向量索引。
把下面示例里的占位符换成实际值就行:
| 占位符 |
说明 |
| 阿里云账号 ID |
| 向量 Bucket 名称 |
按租户写入不同索引
不同租户的数据写入各自独立的索引,数据隔离自然就实现了。
# 将租户 A 的文档写入租户 A 的索引
oss-vectors-embed --account-id "" --vectors-region cn-hangzhou put --vector-bucket-name "" --index-name "tenantcompanya" --model-id text-embedding-v4 --text-value "租户A的知识库文档内容" --key "doc_001" --metadata '{"tenant": "company_a", "category": "faq"}'
# 将租户 B 的文档写入租户 B 的索引
oss-vectors-embed --account-id "" --vectors-region cn-hangzhou put --vector-bucket-name "" --index-name "tenantcompanyb" --model-id text-embedding-v4 --text-value "租户B的知识库文档内容" --key "doc_001" --metadata '{"tenant": "company_b", "category": "manual"}'
按租户定向检索
检索时只查目标租户的索引,数据隔离天然生效。
# 仅在租户 A 的索引中检索
oss-vectors-embed --account-id "" --vectors-region cn-hangzhou query --vector-bucket-name "" --index-name "tenantcompanya" --model-id text-embedding-v4 --text-value "常见问题" --top-k 5 --return-metadata
通过 SDK 构建多索引架构
Python SDK
开始前记得安装 alibabacloud-oss-v2 SDK:
pip install alibabacloud-oss-v2
环境变量 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET 也要配置好。
创建多租户索引
以租户 ID 为后缀命名索引,批量创建独立的向量索引。
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
def create_vector_client():
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = REGION
cfg.account_id = ACCOUNT_ID
return oss_vectors.Client(cfg)
client = create_vector_client()
# 批量为租户创建索引
tenant_ids = ["companya", "companyb", "companyc"]
for tenant_id in tenant_ids:
index_name = f"tenant{tenant_id}"
result = client.put_vector_index(oss_vectors.models.PutVectorIndexRequest(
bucket=BUCKET,
index_name=index_name,
dimension=1024,
data_type="float32",
distance_metric="cosine",
))
print(f"索引 {index_name} 创建完成,status_code={result.status_code}")
运行后会看到:
索引 tenantcompanya 创建完成,status_code=200
索引 tenantcompanyb 创建完成,status_code=200
索引 tenantcompanyc 创建完成,status_code=200
按租户写入数据
不同租户的数据写入各自对应的索引。
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
def create_vector_client():
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = REGION
cfg.account_id = ACCOUNT_ID
return oss_vectors.Client(cfg)
client = create_vector_client()
# 向租户 A 的索引写入数据
result = client.put_vectors(oss_vectors.models.PutVectorsRequest(
bucket=BUCKET,
index_name="tenantcompanya",
vectors=[{
"key": "faq_001",
"data": {"float32": [0.1] * 1024},
"metadata": {"tenant": "company_a", "category": "faq"}
}]
))
print(f"租户 A 写入完成,status_code={result.status_code}")
# 向租户 B 的索引写入数据
result = client.put_vectors(oss_vectors.models.PutVectorsRequest(
bucket=BUCKET,
index_name="tenantcompanyb",
vectors=[{
"key": "manual_001",
"data": {"float32": [0.2] * 1024},
"metadata": {"tenant": "company_b", "category": "manual"}
}]
))
print(f"租户 B 写入完成,status_code={result.status_code}")
运行后输出:
租户 A 写入完成,status_code=200
租户 B 写入完成,status_code=200
并发检索多个索引并合并结果
大表拆分之后,用并发检索多张小表再合并排序的方式,确实可以显著降低总响应时延。
from concurrent.futures import ThreadPoolExecutor, as_completed
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = ""
REGION = "cn-hangzhou"
BUCKET = ""
def create_vector_client():
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = REGION
cfg.account_id = ACCOUNT_ID
return oss_vectors.Client(cfg)
def search_index(client, index_name, query_vector, top_k=10):
"""检索单个索引"""
result = client.query_vectors(oss_vectors.models.QueryVectorsRequest(
bucket=BUCKET,
index_name=index_name,
query_vector=query_vector,
return_metadata=True,
return_distance=True,
top_k=top_k,
))
return {"index": index_name, "status_code": result.status_code, "vectors": result.vectors or []}
def parallel_search(index_names, query_vector, top_k=10):
"""并发检索多个索引并合并结果"""
client = create_vector_client()
all_vectors = []
with ThreadPoolExecutor(max_workers=len(index_names)) as executor:
futures = {executor.submit(search_index, client, idx, query_vector, top_k): idx for idx in index_names}
for future in as_completed(futures):
result = future.result()
print(f"索引 {result['index']} 返回 {len(result['vectors'])} 条结果")
all_vectors.extend(result["vectors"])
# 按 distance 升序排序,取全局 TopK
all_vectors.sort(key=lambda v: v.get("distance", float("inf")))
return all_vectors[:top_k]
# 并发检索 3 个分表索引
indices = ["tenantcompanya", "tenantcompanyb", "tenantcompanyc"]
query_vec = {"float32": [0.1] * 1024}
results = parallel_search(indices, query_vec, top_k=5)
print(f"\n合并后全局 Top5:")
for v in results:
print(f"key={v.get('key')}, distance={v.get('distance')}, metadata={v.get('metadata')}")
运行后输出:
索引 tenantcompanya 返回 1 条结果
索引 tenantcompanyb 返回 1 条结果
索引 tenantcompanyc 返回 0 条结果
合并后全局 Top5:
key=faq_001, distance=0.0, metadata={'tenant': 'company_a', 'category': 'faq'}
key=manual_001, distance=0.19999998807907104, metadata={'tenant': 'company_b', 'category': 'manual'}
这里有个说明:并发检索多个索引之后,在客户端按 distance 排序合并就行了。如果对精度有更高要求,可以引入 Rerank 模型再做一次二次精排。
Go SDK
开始前安装 SDK:
go get github.com/aliyun/alibabacloud-oss-go-sdk-v2
同时确保已经配置了 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET 环境变量。
创建多租户索引
package main
import (
"context"
"fmt"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
region = "cn-hangzhou"
bucketName = ""
accountId = ""
)
func main() {
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region).
WithAccountId(accountId)
client := vectors.NewVectorsClient(cfg)
tenantIDs := []string{"companya", "companyb", "companyc"}
for _, tenantID := range tenantIDs {
indexName := fmt.Sprintf("tenant%s", tenantID)
result, err := client.PutVectorIndex(context.TODO(), &vectors.PutVectorIndexRequest{
Bucket: oss.Ptr(bucketName),
IndexName: oss.Ptr(indexName),
Dimension: oss.Ptr(1024),
DataType: oss.Ptr("float32"),
DistanceMetric: oss.Ptr("cosine"),
})
if err != nil {
log.Printf("索引 %s 创建失败: %v", indexName, err)
continue
}
fmt.Printf("索引 %s 创建完成,status_code=%d\n", indexName, result.StatusCode)
}
}
运行后输出:
索引 tenantcompanya 创建完成,status_code=200
索引 tenantcompanyb 创建完成,status_code=200
索引 tenantcompanyc 创建完成,status_code=200
并发检索多个索引并合并结果
package main
import (
"context"
"fmt"
"log"
"sort"
"sync"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
region = "cn-hangzhou"
bucketName = ""
accountId = ""
dimension = 1024
)
func makeVector(val float32, dim int) []float32 {
v := make([]float32, dim)
for i := range v {
v[i] = val
}
return v
}
func main() {
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region).
WithAccountId(accountId)
client := vectors.NewVectorsClient(cfg)
indices := []string{"tenantcompanya", "tenantcompanyb", "tenantcompanyc"}
queryVector := map[string]any{"float32": makeVector(0.1, dimension)}
var mu sync.Mutex
var allVectors []map[string]any
var wg sync.WaitGroup
for _, indexName := range indices {
wg.Add(1)
go func(idx string) {
defer wg.Done()
result, err := client.QueryVectors(context.TODO(), &vectors.QueryVectorsRequest{
Bucket: oss.Ptr(bucketName),
IndexName: oss.Ptr(idx),
QueryVector: queryVector,
ReturnMetadata: oss.Ptr(true),
ReturnDistance: oss.Ptr(true),
TopK: oss.Ptr(10),
})
if err != nil {
log.Printf("索引 %s 检索失败: %v", idx, err)
return
}
fmt.Printf("索引 %s 返回 %d 条结果\n", idx, len(result.Vectors))
mu.Lock()
allVectors = append(allVectors, result.Vectors...)
mu.Unlock()
}(indexName)
}
wg.Wait()
sort.Slice(allVectors, func(i, j int) bool {
di, _ := allVectors[i]["distance"].(float64)
dj, _ := allVectors[j]["distance"].(float64)
return di < dj
})
topK := 5
if len(allVectors) < topK {
topK = len(allVectors)
}
fmt.Printf("\n合并后全局 Top%d:\n", topK)
for _, v := range allVectors[:topK] {
fmt.Printf("key=%v, distance=%v, metadata=%v\n", v["key"], v["distance"], v["metadata"])
}
}
运行后输出:
索引 tenantcompanya 返回 1 条结果
索引 tenantcompanyc 返回 0 条结果
索引 tenantcompanyb 返回 1 条结果
合并后全局 Top2:
key=faq_001, distance=0, metadata=map[category:faq tenant:company_a]
key=manual_001, distance=0.19999998807907104, metadata=map[category:manual tenant:company_b]
最佳实践
- 索引命名规范:用租户 ID 或业务维度作为索引名称的后缀(像
tenant{tenantid} 这种格式)。需要注意索引名只支持小写字母和数字,不支持下划线和连字符。
- 租户数比较多的时候:直接利用索引名称做逻辑隔离。OSS 向量索引的创建是秒级的,管理成本几乎可以忽略。
- 追求极低延迟:一旦单索引的数据量超过千万级别,可以按业务逻辑(比如时间、类别)做水平拆分,再用并发检索多个索引再合并结果的方式来处理。
- 结果重排(Rerank):多索引表的结果合并之后,可以按 distance 相似度做简单排序,也可以引入专门的 Rerank 模型来做二次排序。
- 索引清理:删除某个租户或者业务的数据,直接调用
DeleteVectorIndex 删除对应索引就行,省去了逐条过滤删除的麻烦。