DataWorks怎么读取ElasticSearch索引中的_id?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在DataWorks中,您可以使用ODPS writer来读取Elasticsearch索引中的_id。具体来说,您需要在ODPS writer的配置中指定Elasticsearch索引的名称和字段名称,以便ODPS writer能够正确地读取Elasticsearch索引中的_id。例如,您可以在ODPS writer的配置中添加以下内容:
{
  "access_key_id": "your_access_key_id",
  "access_key_secret": "your_access_key_secret",
  "project_name": "your_project_name",
  "table_name": "your_table_name",
  "endpoint": "http://your_endpoint",
  "index_name": "your_index_name",
  "type_name": "your_type_name",
  "field_names": [
    "id",
    "name",
    "age"
  ]
}
在上面的配置中,"index_name"参数指定了Elasticsearch索引的名称,"type_name"参数指定了Elasticsearch索引中的数据类型,"field_names"参数指定了需要读取的Elasticsearch字段的名称。在ODPS writer读取Elasticsearch索引中的数据时,它会将_id字段解析为ODPS表中的"ID"字段,将其他字段解析为ODPS表中对应的字段。总之,您可以使用ODPS writer来读取Elasticsearch索引中的_id,只需要在ODPS writer的配置中指定Elasticsearch索引的名称和字段名称即可。
Elasticsearch Reader会获取Server端shard信息用于数据同步,需要确保在任务同步中Server端的shards处于存活状态,否则会存在数据不一致风险。
基本配置:
{
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    },
    "setting":{
        "errorLimit":{
            "record":"0" //错误记录数。
        },
        "jvmOption":"",
        "speed":{
            "concurrent":3,//并发数
            "throttle":true,//
                     "mbps":"12",//限流
        }
    },
    "steps":[
        {
            "category":"reader",
            "name":"Reader",
            "parameter":{
                "column":[ //读取列。
                    "id",
                    "name"
                ],
                "endpoint":"", //服务地址。
                "index":"",  //索引。
                "password":"",  //密码。
                "scroll":"",  //scroll标志。
                "search":"",  //查询query参数,与Elasticsearch的query内容相同,使用_search api,重命名为search。
                "type":"default",
                "username":""  //用户名。
            },
            "stepType":"elasticsearch"
        },
        {
            "stepType": "elasticsearch",
            "parameter": {
                "column": [ //写入列
                    {
                        "name": "id",
                        "type": "integer"
                    },
                    {
                        "name": "name",
                        "type": "text"
                    }
                ],
                "index": "test",   //写入索引
                 "indexType": "",   //写入索引类型,es7不填
                "actionType": "index",  //写入方式
                "cleanup": false,         //是否重建索引
                "datasource": "test",   //数据源名称
                "primaryKeyInfo": {     //主键取值方式
                    "fieldDelimiterOrigin": ",",
                    "column": [
                        "id"
                    ],
                    "type": "specific",
                    "fieldDelimiter": ","
                },
                "dynamic": false,  //动态映射
                "batchSize": 1024   //批量写文档数
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "type":"job",
    "version":"2.0" //版本号。
}
DataWorks基于MaxCompute/Hologres/EMR/CDP等大数据引擎,为数据仓库/数据湖/湖仓一体等解决方案提供统一的全链路大数据开发治理平台。