免费体验阿里云高性能向量检索服务:https://www.aliyun.com/product/ai/dashvector
本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。方案的主要流程包括:
- 首先,升级Milvus版本,目前Milvus只有在最新版本(v.2.3.x)中支持全量导出
- 其次,将Milvus Collection的Schema信息和数据信息导出到具体的文件中
- 最后,以导出的文件作为输入来构建DashVector Collection并数据导入
下面,将详细阐述迁移方案的具体操作细节。
1. Milvus升级2.3.x版本
本文中,我们将借助Milvus的query_iterator来全量导出数据(query接口无法导出完整数据),由于该接口目前只在v2.3.x版本中支持,所以在导出数据前,需要先将Milvus版本升级到该版本。Milvus版本升级的详细操作参考Milvus用户文档。
注意:在进行Milvus Upgrade时需要注意数据的备份安全问题。
2. Milvus全量数据导出
数据的导出包含Schema以及数据记录,Schema主要用于完备地定义Collection,数据记录对应于每个Partition下的全量数据,这两部分涵盖了需要导出的全部数据。下文展示如何将单个Milvus Collection全量导出。
2.1. Schema导出
DashVector和Milvus在Schema的设计上有一些区别,DashVector向用户透出的接口非常简单,Milvus则更加详尽。从Milvus迁移DashVector时会涉及到部分Schema参数的删除(例如Collection的index_param参数),只会保留DashVector构建Collection的必要参数,以下为一个Schema转换的简单示例(其中,Collection已有的数据参考Milvus示例代码写入)。
frompymilvusimport ( connections, utility, Collection, DataType) importosimportjsonfrompathlibimportPathfmt="\n=== {:30} ===\n"print(fmt.format("start connecting to Milvus")) host=os.environ.get('MILVUS_HOST', "localhost") print(fmt.format(f"Milvus host: {host}")) connections.connect("default", host=host, port="19530") metrics_map= { 'COSINE': 'cosine', 'L2': 'euclidean', 'IP': 'dotproduct', } dtype_map= { DataType.BOOL: 'bool', DataType.INT8: 'int', DataType.INT16: 'int', DataType.INT32: 'int', DataType.INT64: 'int', DataType.FLOAT: 'float', DataType.DOUBLE: 'float', DataType.STRING: 'str', DataType.VARCHAR: 'str', } defload_collection(collection_name: str) ->Collection: has=utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") ifnothas: returnNonecollection=Collection(collection_name) collection.load() returncollectiondefexport_collection_schema(collection, file: str): schema=collection.schema.to_dict() index=collection.indexes[0].to_dict() export_schema=dict() milvus_metric_type=index['index_param']['metric_type'] try: export_schema['metrics'] =metrics_map[milvus_metric_type] except: raiseException(f"milvus metrics_type{milvus_metric_type} not supported") export_schema['fields_schema'] = {} forfieldinschema['fields']: if'is_primary'infieldandfield['is_primary']: continueiffield['name'] ==index['field']: # vectoriffield['type'] ==DataType.FLOAT_VECTOR: export_schema['dtype'] ='float'export_schema['dimension'] =field['params']['dim'] else: raiseException(f"milvus dtype{field['type']} not supported yet") else: try: # non-vectorexport_schema['fields_schema'][field['name']] =dtype_map[field['type']] except: raiseException(f"milvus dtype{field['type']} not supported yet") withopen(file, 'w') asfile: json.dump(export_schema, file, indent=4) if__name__=="__main__": collection_name="YOUR_MILVUS_COLLECTION_NAME"collection=load_collection(collection_name) dump_path_str=collection_name+'.dump'dump_path=Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True) schema_file=dump_path_str+"/schema.json"export_collection_schema(collection, schema_file)
以下是一个可用于创建DashVector Collection的schema文件示例。
{ "metrics": "euclidean", "fields_schema": { "random": "float", "var": "str" }, "dtype": "float", "dimension": 8}
2.2. Data导出
DashVector和Milvus在设计上都有Partition的概念,所以向量以及其他数据进行导出时,需要注意按照Partition粒度进行导出。此外,DashVector的主键类型为str,而Milvus设计其为自定义类型,所以在导出时需要考虑主键类型的转换。以下为一个基于query_iterator接口导出的简单代码示例:
frompymilvusimport ( connections, utility, Collection, DataType) importosimportjsonimportnumpyasnpfrompathlibimportPathfmt="\n=== {:30} ===\n"print(fmt.format("start connecting to Milvus")) host=os.environ.get('MILVUS_HOST', "localhost") print(fmt.format(f"Milvus host: {host}")) connections.connect("default", host=host, port="19530") pk="pk"vector_field_name="vector"defload_collection(collection_name: str) ->Collection: has=utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") ifnothas: returnNonecollection=Collection(collection_name) collection.load() returncollectiondefexport_partition_data(collection, partition_name, file: str): batch_size=10output_fields=["pk", "random", "var", "embeddings"] query_iter=collection.query_iterator( batch_size=batch_size, output_fields=output_fields, partition_names=[partition_name] ) export_file=open(file, 'w') whileTrue: docs=query_iter.next() iflen(docs) ==0: # close the iteratorquery_iter.close() breakfordocindocs: new_doc= {} new_doc_fields= {} fork, vindoc.items(): ifk==pk: # primary keynew_doc['pk'] =str(v) elifk==vector_field_name: new_doc['vector'] = [float(k) forkinv] else: new_doc_fields[k] =vnew_doc['fields'] =new_doc_fieldsjson.dump(new_doc, export_file) export_file.write('\n') export_file.close() if__name__=="__main__": collection_name="YOUR_MILVUS_COLLECTION_NAME"collection=load_collection(collection_name) pk=collection.schema.primary_field.namevector_field_name=collection.indexes[0].field_namedump_path_str=collection_name+'.dump'dump_path=Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True) forpartitionincollection.partitions: partition_name=partition.nameifpartition_name=='_default': export_path=dump_path_str+'/default.txt'else: export_path=dump_path_str+'/'+partition_name+".txt"export_partition_data(collection, partition_name, export_path)
上述示例代码会将Milvus Collection的各个Partition分别进行数据导出,导出后的文件结构如下图所示:
# collection_name = hello_milvus hello_milvus.dump/ ├── default.txt └── schema.json
3. 将数据导入DashVector
3.1. 创建Cluster
参考DashVector官方用户手册构建Cluster。
3.2. 创建Collection
根据2.1章节中导出的Schema信息以及参考Dashvector官方用户手册来创建Collection。下面的示例代码会根据2.1章节中导出的schema.json来创建一个DashVector的Collection。
fromdashvectorimportClient, DashVectorExceptionfrompydanticimportBaseModelfromtypingimportDict, Typeimportjsondtype_convert= { 'int': int, 'float': float, 'bool': bool, 'str': str} classSchema(BaseModel): metrics: strdtype: Typedimension: intfields_schema: Dict[str, Type] deffrom_dict(cls, json_data): metrics=json_data['metrics'] dtype=dtype_convert[json_data['dtype']] dimension=json_data['dimension'] fields_schema= {k: dtype_convert[v] fork, vinjson_data['fields_schema'].items()} returncls(metrics=metrics, dtype=dtype, dimension=dimension, fields_schema=fields_schema) defread_schema(schema_path) ->Schema: withopen(schema_path) asfile: json_data=json.loads(file.read()) returnSchema.from_dict(json_data) if__name__=="__main__": milvus_dump_path=f"{YOUR_MILVUS_COLLECTION_NAME}.dump"milvus_dump_scheme_path=milvus_dump_path+"/schema.json"schema=read_schema(milvus_dump_scheme_path) client=dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collectionrsp=client.create(name="YOUR_DASHVECTOR_COLLECTION_NAME", dimension=schema.dimension, metric=schema.metrics, dtype=schema.dtype, fields_schema=schema.fields_schema) ifnotrsp: raiseDashVectorException(rsp.code, reason=rsp.message)
3.3. 导入Data
根据2.2章节中导出的数据以及参考DashVector官方用户手册来批量插入Doc。下面的示例代码会依次解析各个Partition导出的数据,然后依次创建DashVector下的Partition并导入数据。
fromdashvectorimportClient, DashVectorException, DocfrompydanticimportBaseModelfromtypingimportDict, TypeimportjsonimportglobfrompathlibimportPathdefinsert_data(collection, partition_name, partition_file): ifpartition_name!='default': rsp=collection.create_partition(partition_name) ifnotrsp: raiseDashVectorException(rsp.code, reason=rsp.message) withopen(partition_file) asf: forlineinf: ifline.strip(): json_data=json.loads(line) rsp=collection.insert( [ Doc(id=json_data['pk'], vector=json_data['vector'], fields=json_data['fields']) ] ) ifnotrsp: raiseDashVectorException(rsp.code, reason=rsp.message) if__name__=="__main__": milvus_dump_path=f"{YOUR_MILVUS_COLLECTION_NAME}.dump"client=dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collectioncollection=client.get("YOUR_DASHVECTOR_COLLECTION_NAME") partition_files=glob.glob(milvus_dump_path+'/*.txt', recursive=False) forpartition_fileinpartition_files: # create partitionpartition_name=Path(partition_file).steminsert_data(collection, partition_name, partition_file)
免费体验阿里云高性能向量检索服务:https://www.aliyun.com/product/ai/dashvector