网上关于此方面的内容较少,而且比较零散,正好项目中需要对ES7做预研,所以整理出了这篇文章。
ElasticSearch版本为7.13.2
分如下几个主题:
1.Datax的es导入插件elasticsearchwriter
1.1.从datax的github地址(https://github.com/alibaba/DataX)下载源码工程(Datax-master)
工程内容很多,如果在idea中构建,需要的时间比较长,需要耐心等待。如果不需要构建,那直接在文件夹中打开文件,进行第2步的操作。
1.2.修改父工程的pom.xml,配置modules模块,按需保留elasticsearchwriter模块
<modules>
<module>common</module>
<module>core</module>
<module>transformer</module>
<module>elasticsearchwriter</module>
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
</modules>
1.3.执行打包命令,idea或者命令行均可
mvn clean install -Dmaven.test.skip=true
1.4.将编译后的插件安装到 datax中
找到编译后的插件,目录为:
DataX-master/elasticsearchwriter/target/datax/plugin/writer/elasticsearchwriter
这个插件我会提供下载,你可以直接用我的,也可以自己打包。
下载地址:elasticsearchwriter.zip_elasticsearchwriter-其它文档类资源-CSDN下载
将这个文件夹拷贝到datax的plugin的writer目录下
2.Datax配置es导入job
任务类型选择Datax任务
3.Job的JSON文件编写
要配置导入导出数据源
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "oracle数据库账号",
"password": "oracle数据库密码",
"connection": [
{
"querySql": [
"select id,name from 数据库表名"
],
"jdbcUrl": [
"jdbc:oracle:thin:@//127.0.0.1:1521/orcl"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId": "elastic",
"accessKey": "XXXXXX",
"index": "test_index",
"type": "_doc",
"cleanup": false,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 10000,
"splitter": ",",
"column": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "text"
}
]
}
}
}
]
}
}
4.需要注意的地方
4.1.querySql查询的字段要与column一一对应,不能多也不能少,顺序最好也一样
4.2. endpoint地址是es的http访问地址,端口为9200,不要配置成9300
4.3.accessId和accessKey必须要配,如果es没有密码,那随便填,但必须要有
4.4.其他注意事项
4.4.1.ES日期字段创建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis
{
"name": "CREATE_DATE",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
4.4.2.日期数据导入时,text写入为日期格式,long写入为时间戳
4.4.3.注意时区问题 写入时指定时区 或 对UTC时间戳进行转换
指定:“2019-03-12T12:12:12.123+0800”
转换:东八区时间戳 = 3600000*8 + UTC时间戳
4.5.json文件格式不对,数组越界错误
sql和column字段匹配不上就会数组越界
4.6.使用数据库id作为es中记录的_id
{"name": "pk", "type": "id"},
name指定为id,type也指定为id,这样就会把数据库的id作为es中的id了。
不需要再指定name为id的字段了,不然会报错
5.参数描述
endpoint 描述:ElasticSearch的连接地址 必选:是 默认值:无
accessId 描述:http auth中的user 必选:否 默认值:空
accessKey 描述:http auth中的password 必选:否 默认值:空
index 描述:elasticsearch中的index名 必选:是 默认值:无
type 描述:elasticsearch中index的type名 必选:否 默认值:index名
cleanup 描述:是否删除原表 必选:否 默认值:false
batchSize 描述:每次批量数据的条数 必选:否 默认值:1000
trySize 描述:失败后重试的次数 必选:否 默认值:30
timeout 描述:客户端超时时间 必选:否 默认值:600000
discovery 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。 必选:否 默认值:false
compression 描述:http请求,开启压缩 必选:否 默认值:true
multiThread 描述:http请求,是否有多线程 必选:否 默认值:true
ignoreWriteError 描述:忽略写入错误,不重试,继续写入 必选:否 默认值:false
ignoreParseError 描述:忽略解析数据格式错误,继续写入 必选:否 默认值:true
alias 描述:数据导入完成后写入别名 必选:否 默认值:无
aliasMode 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个) 必选:否 默认值:append
settings 描述:创建index时候的settings, 与elasticsearch官方相同 必选:否 默认值:无
splitter 描述:如果插入数据是array,就使用指定分隔符 必选:否 默认值:-,-
column 描述:elasticsearch所支持的字段类型,样例中包含了全部 必选:是
dynamic 描述: 不使用datax的mappings,使用es自己的自动mappings 必选: 否 默认值: false
六、使用dbeaver 配置 jdbc 连接 es
报错 current license is non-compliant for [jdbc]
修改成30天试用版,https://www.elastic.co/guide/en/elasticsearch/reference/master/start-trial.html
POST "localhost:9200/_license/start_trial?acknowledge=true&pretty
查看服务器es的license信息
GET http://localhost:9200/_license
可以愉快使用了
七、使用es的动态模板
datax从mysql同步数据到elasticsearch(使用es的动态模板)
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9200",
"index": "myindex",
"type": "data",
"cleanup": true, #true表示插入前清空,即覆盖同步;false则追加同步
"dynamic": true, #这里一定要指定为true,否则使用的是datax的模板(就是下面定义的字段类型),而不会使用es的模板
"settings": {"index" :{"number_of_shards": 2, "number_of_replicas": 1}},
"batchSize": 10000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},#指定第一个字段为rowkey
{"name": "province", "type": "text"},
{ "name": "city", "type": "text"},
{ "name": "area", "type": "text"},
{ "name": "longitude","type":"double" },
{ "name": "latitude","type": "double" },
{ "name": "location","type": "geo_point" }
]
}
}
参考文档:
通过datax导入数据到elasticsearch - 简书
通过DataX同步数据至Elasticsearch - 奋斗的一线码农 - 博客园
GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
通过datax导入数据到elasticsearch - 简书
通过DataX同步数据至Elasticsearch - 走看看
current license is non-compliant for [jdbc]_数据库人生的博客-CSDN博客
datax从mysql同步数据到elasticsearch(使用es的动态模板)_ASN_forever的博客-CSDN博客