datax-elasticsearch 同步踩坑记录

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: 为了用datax同步es数据到其他地方,踩了不少坑.记录一下.

datax-elasticsearch 同步踩坑记录

为了用datax同步es数据到其他地方,踩了不少坑.记录一下.

1 打jar包

1,官方的datax没有es的reader插件,只有es的writer插件.
2,git上有大佬写了一个

https://github.com/Kestrong/datax-elasticsearch/blob/master/elasticsearchreader/doc/elasticsearchreader.md

3,问题在打包.
**
1,将官方datax的src下载下来;
2,将git上面的reader放到合适的位置.这块没啥好说的.
3,整体打包.这块遇到的坑特别多.
3.1 datax很多pom的依赖都out of date了,需要自行替换成新版本
3.2 涉及${relative_path}的,这种一般是因为缺少properties,手动补全即可
3.3 涉及Assembly的报错,需要修改package.xml
3.4 涉及 surefire 和 junit的报错,需要在pom中增加如下代码


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <!-- 包含哪些测试用例 -->
                    <includes>
                        <include>**/*Test.java</include>
                    </includes>
                    <!-- 不包含哪些测试用例 -->
                    <excludes>
                    </excludes>
                    <testFailureIgnore>true</testFailureIgnore>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>

**

2.打包后的配置

1,打包好了之后,将jar包扔到线上环境的plugin目录下面,具体的lib,还有其他的配套文件照猫画虎配置好即可.没啥坑.比较简单.

2,关键的是配置.这块有点坑.

2.1 首先按照大佬给的配置

{
  "core": {
    "container": {
      "job": {
        "reportInterval": 10000
      },
      "taskGroup": {
        "channel": 5
      },
      "trace": {
        "enable": "true"
      }
    }
  },
  "job": {
    "setting": {
      "speed": {
        "byte": 10485760
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "elasticsearchreader",
          "parameter": {
            "endpoint": "http://192.168.17.190:9200",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-datax",
            "type": "default",
            "searchType": "dfs_query_then_fetch",
            "headers": {
            },
            "scroll": "3m",
            "search": [
              {
                "size": 5,
                "query": {
                  "bool": {
                    "must": [
                      {
                        "match": {
                          "_id": "590000001878"
                        }
                      }
                    ]
                  }
                }
              }
            ],
            "table":{
              "name": "TACHE",
              "filter": "pk != null",
              "nameCase": "UPPERCASE",
              "column": [
                {
                  "name": "flow_id",
                  "alias": "pk", 
                },
                {
                  "name": "taches",
                  "child": [
                    {
                      "name": "tch_id"
                    },
                    {
                      "name": "tch_mod"
                    },
                    {
                      "name": "flow_id"
                    }
                  ]
                }
              ]
            }
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}

是可以跑的通的.但是问题是没有数据

2.2 我魔改了一下,找了一些其他资料,很幸运看到下面的链接

https://blog.csdn.net/select_where/article/details/121019465

这里面的配置让我灵光一闪

{
    "job": {
        "setting": {
            "speed": {
                "channel": 7
            }
        },
        "content": [{
            "reader": {
                "name": "elasticsearchreader",
                "parameter": {
                    "endpoint": "http://XXX.XXX.XXX.XXX:9200",
                    "accessId": "XXXXXXX*",
                    "accessKey": "XXXXXXXXXXX",
                    "index": "XXXXXX-*",
                    "type": "_doc",
                    "scroll": "3m",
                    "headers": {
                    },
                    "search": [{
                            "query": {
                                "bool": {
                                        "filter":[
                                                   {
                                                        "range":{
                                                                "createdTime":{
                                                                        "boost":1,
                                                                        "from": "${st}", ,
                                                                        "include_lower":true,
                                                                        "include_upper":true,
                                                                        "to": "${et}"
                                                                }
                                                        }
                                                }
                                                ]
                                }
                            },
                            "size": 10
                        }],
                    "table": {
                        "column": [                            
                            {"name" : "clueId"},
                            {"name" : "brandId"},
                            {"name" : "clueEstype"}
                            ]
                    }
                }
            },
            "writer": {
                "name": "hdfswriter",
                "parameter": {
                        "defaultFS": "hdfs://${hdfs}",
                        "fileType": "ORC",
                        "path": "/user/hive/warehouse/ods.db/pr_es_test_orc",
                    "fileName": "aaaaaa",
                    "column": [            
                                        {"name" : "clueId", "type": "STRING"},
                                        {"name" : "brandId", "type": "STRING"},
                                        {"name" : "clueEstype", "type": "STRING"}

                    ],
                    "writeMode": "append",
                    "fieldDelimiter": "|",
                    "compress": "NONE"
                }
            }
        }]
    }
}

结合上面两个配置,把"type": "_doc"配上就行了.
最后测试通过.数据也正式落到目的地了.

关于整个流程,有问题可以私信我.

相关文章
|
2月前
|
存储 自然语言处理 关系型数据库
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
聚合、补全、RabbitMQ消息同步、集群、脑裂问题、集群分布式存储、黑马旅游实现过滤和搜索补全功能
ElasticSearch基础3——聚合、补全、集群。黑马旅游检索高亮+自定义分词器+自动补全+前后端消息同步
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之如何解决datax同步任务时报错ODPS-0410042:Invalid signature value
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在使用MaxCompute进行数据集成同步到OSS时,出现表名和OSS文件名不一致且多了后缀,该如何处理
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
DataWorks Java 调度
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
71 0
DataWorks产品使用合集之进行离线同步时,如何使用DataX的Reader插件来实现源端过滤
|
5月前
|
存储 DataWorks 关系型数据库
DataWorks产品使用合集之在使用数据集成中的同步任务从mysql同步表到oss,存储为csv时,最终生成的文件中没有表头,这个属性可以在哪里配置么
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
5月前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之数据源同步时,使用脚本模式采集mysql数据到odps中,使用querySql方式采集数据,在脚本中删除了Reader中的column,但是datax还是报错OriginalConfPretreatmentUtil - 您的配置有误。如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
5月前
|
JSON DataWorks 关系型数据库
DataWorks操作报错合集之同步Elasticsearch数据报错:Cat response did not contain a JSON Array,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
6月前
|
Kubernetes 关系型数据库 MySQL
实时计算 Flink版产品使用合集之在Kubernetes(k8s)中同步MySQL变更到Elasticsearch该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 监控 API
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用合集之可以用来同步数据到 Elasticsearch(ES)吗
|
5月前
|
分布式计算 DataWorks API
DataWorks操作报错合集之数据集成同步到本地数据库时,出现报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。

相关产品

  • 检索分析服务 Elasticsearch版