datax-elasticsearch 同步踩坑记录
为了用datax同步es数据到其他地方,踩了不少坑.记录一下.
1 打jar包
1,官方的datax没有es的reader插件,只有es的writer插件.
2,git上有大佬写了一个
https://github.com/Kestrong/datax-elasticsearch/blob/master/elasticsearchreader/doc/elasticsearchreader.md
3,问题在打包.
**
1,将官方datax的src下载下来;
2,将git上面的reader放到合适的位置.这块没啥好说的.
3,整体打包.这块遇到的坑特别多.
3.1 datax很多pom的依赖都out of date了,需要自行替换成新版本
3.2 涉及${relative_path}的,这种一般是因为缺少properties,手动补全即可
3.3 涉及Assembly的报错,需要修改package.xml
3.4 涉及 surefire 和 junit的报错,需要在pom中增加如下代码
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
<skipTests>true</skipTests>
</configuration>
</plugin>
**
2.打包后的配置
1,打包好了之后,将jar包扔到线上环境的plugin目录下面,具体的lib,还有其他的配套文件照猫画虎配置好即可.没啥坑.比较简单.
2,关键的是配置.这块有点坑.
2.1 首先按照大佬给的配置
{
"core": {
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "true"
}
}
},
"job": {
"setting": {
"speed": {
"byte": 10485760
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "elasticsearchreader",
"parameter": {
"endpoint": "http://192.168.17.190:9200",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-datax",
"type": "default",
"searchType": "dfs_query_then_fetch",
"headers": {
},
"scroll": "3m",
"search": [
{
"size": 5,
"query": {
"bool": {
"must": [
{
"match": {
"_id": "590000001878"
}
}
]
}
}
}
],
"table":{
"name": "TACHE",
"filter": "pk != null",
"nameCase": "UPPERCASE",
"column": [
{
"name": "flow_id",
"alias": "pk",
},
{
"name": "taches",
"child": [
{
"name": "tch_id"
},
{
"name": "tch_mod"
},
{
"name": "flow_id"
}
]
}
]
}
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
是可以跑的通的.但是问题是没有数据
2.2 我魔改了一下,找了一些其他资料,很幸运看到下面的链接
https://blog.csdn.net/select_where/article/details/121019465
这里面的配置让我灵光一闪
{
"job": {
"setting": {
"speed": {
"channel": 7
}
},
"content": [{
"reader": {
"name": "elasticsearchreader",
"parameter": {
"endpoint": "http://XXX.XXX.XXX.XXX:9200",
"accessId": "XXXXXXX*",
"accessKey": "XXXXXXXXXXX",
"index": "XXXXXX-*",
"type": "_doc",
"scroll": "3m",
"headers": {
},
"search": [{
"query": {
"bool": {
"filter":[
{
"range":{
"createdTime":{
"boost":1,
"from": "${st}", ,
"include_lower":true,
"include_upper":true,
"to": "${et}"
}
}
}
]
}
},
"size": 10
}],
"table": {
"column": [
{"name" : "clueId"},
{"name" : "brandId"},
{"name" : "clueEstype"}
]
}
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://${hdfs}",
"fileType": "ORC",
"path": "/user/hive/warehouse/ods.db/pr_es_test_orc",
"fileName": "aaaaaa",
"column": [
{"name" : "clueId", "type": "STRING"},
{"name" : "brandId", "type": "STRING"},
{"name" : "clueEstype", "type": "STRING"}
],
"writeMode": "append",
"fieldDelimiter": "|",
"compress": "NONE"
}
}
}]
}
}
结合上面两个配置,把"type": "_doc"配上就行了.
最后测试通过.数据也正式落到目的地了.
关于整个流程,有问题可以私信我.