如何从Milvus迁移至DashVector?

简介: 本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。

免费体验阿里云高性能向量检索服务https://www.aliyun.com/product/ai/dashvector


本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。方案的主要流程包括:

  1. 首先,升级Milvus版本,目前Milvus只有在最新版本(v.2.3.x)中支持全量导出
  2. 其次,将Milvus Collection的Schema信息和数据信息导出到具体的文件中
  3. 最后,以导出的文件作为输入来构建DashVector Collection并数据导入

下面,将详细阐述迁移方案的具体操作细节。


1. Milvus升级2.3.x版本

本文中,我们将借助Milvus的query_iterator来全量导出数据(query接口无法导出完整数据),由于该接口目前只在v2.3.x版本中支持,所以在导出数据前,需要先将Milvus版本升级到该版本。Milvus版本升级的详细操作参考Milvus用户文档


注意:在进行Milvus Upgrade时需要注意数据的备份安全问题。


2. Milvus全量数据导出

数据的导出包含Schema以及数据记录,Schema主要用于完备地定义Collection,数据记录对应于每个Partition下的全量数据,这两部分涵盖了需要导出的全部数据。下文展示如何将单个Milvus Collection全量导出。


2.1. Schema导出

DashVector和Milvus在Schema的设计上有一些区别,DashVector向用户透出的接口非常简单,Milvus则更加详尽。从Milvus迁移DashVector时会涉及到部分Schema参数的删除(例如Collection的index_param参数),只会保留DashVector构建Collection的必要参数,以下为一个Schema转换的简单示例(其中,Collection已有的数据参考Milvus示例代码写入)。

frompymilvusimport (
connections,
utility,
Collection,
DataType)
importosimportjsonfrompathlibimportPathfmt="\n=== {:30} ===\n"print(fmt.format("start connecting to Milvus"))
host=os.environ.get('MILVUS_HOST', "localhost")
print(fmt.format(f"Milvus host: {host}"))
connections.connect("default", host=host, port="19530")
metrics_map= {
'COSINE': 'cosine',
'L2': 'euclidean',
'IP': 'dotproduct',
}
dtype_map= {
DataType.BOOL: 'bool',
DataType.INT8: 'int',
DataType.INT16: 'int',
DataType.INT32: 'int',
DataType.INT64: 'int',
DataType.FLOAT: 'float',
DataType.DOUBLE: 'float',
DataType.STRING: 'str',
DataType.VARCHAR: 'str',
}
defload_collection(collection_name: str) ->Collection:
has=utility.has_collection(collection_name)
print(f"Does collection hello_milvus exist in Milvus: {has}")
ifnothas:
returnNonecollection=Collection(collection_name)      
collection.load()
returncollectiondefexport_collection_schema(collection, file: str):
schema=collection.schema.to_dict()
index=collection.indexes[0].to_dict()
export_schema=dict()
milvus_metric_type=index['index_param']['metric_type']
try:
export_schema['metrics'] =metrics_map[milvus_metric_type]
except:
raiseException(f"milvus metrics_type{milvus_metric_type} not supported")
export_schema['fields_schema'] = {}
forfieldinschema['fields']:
if'is_primary'infieldandfield['is_primary']:
continueiffield['name'] ==index['field']:
# vectoriffield['type'] ==DataType.FLOAT_VECTOR:
export_schema['dtype'] ='float'export_schema['dimension'] =field['params']['dim']
else:
raiseException(f"milvus dtype{field['type']} not supported yet")
else:
try:
# non-vectorexport_schema['fields_schema'][field['name']] =dtype_map[field['type']]
except:
raiseException(f"milvus dtype{field['type']} not supported yet")
withopen(file, 'w') asfile:
json.dump(export_schema, file, indent=4)  
if__name__=="__main__":
collection_name="YOUR_MILVUS_COLLECTION_NAME"collection=load_collection(collection_name)
dump_path_str=collection_name+'.dump'dump_path=Path(dump_path_str)
dump_path.mkdir(parents=True, exist_ok=True)
schema_file=dump_path_str+"/schema.json"export_collection_schema(collection, schema_file)

以下是一个可用于创建DashVector Collection的schema文件示例。

{
"metrics": "euclidean",
"fields_schema": {
"random": "float",
"var": "str"    },
"dtype": "float",
"dimension": 8}

2.2. Data导出

DashVector和Milvus在设计上都有Partition的概念,所以向量以及其他数据进行导出时,需要注意按照Partition粒度进行导出。此外,DashVector的主键类型为str,而Milvus设计其为自定义类型,所以在导出时需要考虑主键类型的转换。以下为一个基于query_iterator接口导出的简单代码示例:

frompymilvusimport (
connections,
utility,
Collection,
DataType)
importosimportjsonimportnumpyasnpfrompathlibimportPathfmt="\n=== {:30} ===\n"print(fmt.format("start connecting to Milvus"))
host=os.environ.get('MILVUS_HOST', "localhost")
print(fmt.format(f"Milvus host: {host}"))
connections.connect("default", host=host, port="19530")
pk="pk"vector_field_name="vector"defload_collection(collection_name: str) ->Collection:
has=utility.has_collection(collection_name)
print(f"Does collection hello_milvus exist in Milvus: {has}")
ifnothas:
returnNonecollection=Collection(collection_name)      
collection.load()
returncollectiondefexport_partition_data(collection, partition_name, file: str):
batch_size=10output_fields=["pk", "random", "var", "embeddings"]
query_iter=collection.query_iterator(
batch_size=batch_size,
output_fields=output_fields,
partition_names=[partition_name]
  )
export_file=open(file, 'w')
whileTrue:
docs=query_iter.next()
iflen(docs) ==0:
# close the iteratorquery_iter.close()
breakfordocindocs:
new_doc= {}
new_doc_fields= {}
fork, vindoc.items():
ifk==pk:
# primary keynew_doc['pk'] =str(v)
elifk==vector_field_name:
new_doc['vector'] = [float(k) forkinv]
else:
new_doc_fields[k] =vnew_doc['fields'] =new_doc_fieldsjson.dump(new_doc, export_file)
export_file.write('\n')
export_file.close()
if__name__=="__main__":
collection_name="YOUR_MILVUS_COLLECTION_NAME"collection=load_collection(collection_name)
pk=collection.schema.primary_field.namevector_field_name=collection.indexes[0].field_namedump_path_str=collection_name+'.dump'dump_path=Path(dump_path_str)
dump_path.mkdir(parents=True, exist_ok=True)
forpartitionincollection.partitions:
partition_name=partition.nameifpartition_name=='_default':
export_path=dump_path_str+'/default.txt'else:
export_path=dump_path_str+'/'+partition_name+".txt"export_partition_data(collection, partition_name, export_path)

上述示例代码会将Milvus Collection的各个Partition分别进行数据导出,导出后的文件结构如下图所示:

# collection_name = hello_milvus
hello_milvus.dump/
├── default.txt
└── schema.json


3. 将数据导入DashVector


3.1. 创建Cluster

参考DashVector官方用户手册构建Cluster。


3.2. 创建Collection

根据2.1章节中导出的Schema信息以及参考Dashvector官方用户手册来创建Collection。下面的示例代码会根据2.1章节中导出的schema.json来创建一个DashVector的Collection。

fromdashvectorimportClient, DashVectorExceptionfrompydanticimportBaseModelfromtypingimportDict, Typeimportjsondtype_convert= {
'int': int,
'float': float,
'bool': bool,
'str': str}
classSchema(BaseModel):
metrics: strdtype: Typedimension: intfields_schema: Dict[str, Type]
@classmethoddeffrom_dict(cls, json_data):
metrics=json_data['metrics']
dtype=dtype_convert[json_data['dtype']]
dimension=json_data['dimension']
fields_schema= {k: dtype_convert[v] fork, vinjson_data['fields_schema'].items()}
returncls(metrics=metrics, dtype=dtype, dimension=dimension, fields_schema=fields_schema)
defread_schema(schema_path) ->Schema:
withopen(schema_path) asfile:
json_data=json.loads(file.read())
returnSchema.from_dict(json_data)
if__name__=="__main__":
milvus_dump_path=f"{YOUR_MILVUS_COLLECTION_NAME}.dump"milvus_dump_scheme_path=milvus_dump_path+"/schema.json"schema=read_schema(milvus_dump_scheme_path)
client=dashvector.Client(
api_key='YOUR_API_KEY',
endpoint='YOUR_CLUSTER_ENDPOINT'  )
# create collectionrsp=client.create(name="YOUR_DASHVECTOR_COLLECTION_NAME", 
dimension=schema.dimension, 
metric=schema.metrics, 
dtype=schema.dtype,
fields_schema=schema.fields_schema)
ifnotrsp:
raiseDashVectorException(rsp.code, reason=rsp.message)

3.3. 导入Data

根据2.2章节中导出的数据以及参考DashVector官方用户手册来批量插入Doc。下面的示例代码会依次解析各个Partition导出的数据,然后依次创建DashVector下的Partition并导入数据。

fromdashvectorimportClient, DashVectorException, DocfrompydanticimportBaseModelfromtypingimportDict, TypeimportjsonimportglobfrompathlibimportPathdefinsert_data(collection, partition_name, partition_file):
ifpartition_name!='default':
rsp=collection.create_partition(partition_name)
ifnotrsp:
raiseDashVectorException(rsp.code, reason=rsp.message)
withopen(partition_file) asf:
forlineinf:
ifline.strip():
json_data=json.loads(line)
rsp=collection.insert(
          [
Doc(id=json_data['pk'], vector=json_data['vector'], fields=json_data['fields'])
          ]
        )
ifnotrsp:
raiseDashVectorException(rsp.code, reason=rsp.message)  
if__name__=="__main__":
milvus_dump_path=f"{YOUR_MILVUS_COLLECTION_NAME}.dump"client=dashvector.Client(
api_key='YOUR_API_KEY',
endpoint='YOUR_CLUSTER_ENDPOINT'  )
# create collectioncollection=client.get("YOUR_DASHVECTOR_COLLECTION_NAME")
partition_files=glob.glob(milvus_dump_path+'/*.txt', recursive=False)
forpartition_fileinpartition_files:
# create partitionpartition_name=Path(partition_file).steminsert_data(collection, partition_name, partition_file)

免费体验阿里云高性能向量检索服务https://www.aliyun.com/product/ai/dashvector

相关文章
|
8天前
|
人工智能 Cloud Native API
向量检索服务DashVector的体验
向量检索服务DashVector的体验
112 2
|
6月前
|
消息中间件 Kubernetes 数据安全/隐私保护
milvus本地集群部署(非k8s)
milvus本地集群部署(非k8s)
236 0
|
8天前
|
算法 数据库 Docker
大模型必备向量数据库-Milvus的安装过程
大模型必备向量数据库-Milvus的安装过程
23 0
|
8天前
|
存储 机器学习/深度学习 API
开源向量数据库比较:Chroma, Milvus, Faiss,Weaviate
该文探讨了向量数据库在语义搜索和RAG中的核心作用,并介绍了四个开源向量数据库:Chroma、Milvus、Faiss和Weaviate。这些数据库用于存储高维向量,支持基于相似性的快速搜索,改变了传统的精确匹配方法。文章详细比较了它们的特性,如Chroma的易用性,Milvus的存储效率,Faiss的GPU加速,和Weaviate的图数据模型。选择合适的数据库取决于具体需求,如数据类型、性能和使用场景。
140 0
|
8天前
|
人工智能 Java API
如何快速使用向量检索服务DashVector?
本文将介绍如何快速上手使用向量检索服务DashVector。
|
8天前
|
数据库 Python
DashVector + ModelScope 检索
DashVector + ModelScope 检索
34 0
|
8天前
|
自然语言处理 分布式计算 算法
通过OpenSearch向量检索版进行混合检索的最佳实践
本文介绍如何通过OpenSearch向量检索版,使用稀疏-稠密向量进行混合检索,获得更好的搜索效果。
1237 0
|
8天前
|
Java
DashVector实践记录
DashVector内测期间,在业务场景中实践落地了向量检索。
|
8天前
|
存储 人工智能 对象存储
DashVector + DashScope升级多模态检索
本教程在前述教程(DashVector + ModelScope玩转多模态检索)的基础之上,基于DashScope上新推出的ONE-PEACE通用多模态表征模型结合向量检索服务DashVector来对多模态检索进行升级,接下拉我们将展示更丰富的多模态检索能力。
|
8天前
|
关系型数据库 数据库 PostgreSQL
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
219 1

热门文章

最新文章