需求描述
MaxCompute表jsonArray数据使用数据集成ElasticSearch writer插件写入es,嵌套属性,如图
需求实现
1.建源表
-- maxcompute源表 nested测试数据初始化
CREATE TABLE es_nested(id INT ,nested_data STRING );
INSERT OVERWRITE TABLE es_nested
values (1,
"[{ \"storeId\": 30, \"userId\": 12790, \"bizTime\": 1640351395000 }, { \"storeId\": 156, \"userId\": 12790, \"bizTime\": 1640521467000 }]")
;
2.建目标索引
-- 建目的es索引
PUT /hhh_nested3
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
},
"mappings": {
"hhh_nested3": {
"properties": {
"id": {"type": "long"},
"nested_data":{
"type":"nested",
"properties":{
"storeId":{"type":"long"},
"userId":{"type":"long"},
"bizTime":{"type":"long"}
}
}
}
}
}
}
-- 查看es结构
GET /hhh_nested3/hhh_nested3/_mapping
-- 返回
{
"hhh_nested3" : {
"mappings" : {
"hhh_nested3" : {
"properties" : {
"id" : {
"type" : "long"
},
"nested_data" : {
"type" : "nested",
"properties" : {
"bizTime" : {
"type" : "long"
},
"storeId" : {
"type" : "long"
},
"userId" : {
"type" : "long"
},
"""{"bizTime":1640351395000,"storeId":30,"userId":12790}""" : {
"properties" : {
"bizTime" : {
"type" : "long"
},
"storeId" : {
"type" : "long"
},
"userId" : {
"type" : "long"
}
}
}
}
}
}
}
}
}
}
3.同步脚本配置,属性类型配置为nested
{
"name": "nested_data",
"type": "nested"
}
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "odps",
"parameter": {
"partition": [],
"datasource": "odps_first",
"envType": 0,
"column": [
"id",
"nested_data"
],
"table": "es_nested"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "elasticsearch",
"parameter": {
"column": [
{
"name": "id",
"type": "long"
},
{
"name": "nested_data",
"type": "nested"
}
],
"index": "hhh_nested3",
"type": "hhh_nested3",
"splitter": ",",
"actionType": "index",
"indexType": "",
"cleanup": false,
"datasource": "onaliyun_elasticsearch01",
"envType": 0,
"discovery": false,
"primaryKeyInfo": {
"fieldDelimiterOrigin": ",",
"column": [
"id"
],
"type": "specific",
"fieldDelimiter": ","
},
"dynamic": false,
"batchSize": 1024
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 2
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
4.运行查看结果
GET /hhh_nested3/hhh_nested3/_search
{
"query": {
"match": {
"id": 1
}
}
}
返回
{
"took" : 33,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 1.0,
"hits" : [
{
"_index" : "hhh_nested3",
"_type" : "hhh_nested3",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"nested_data" : [
{
"bizTime" : 1640351395000,
"storeId" : 30,
"userId" : 12790
},
{
"bizTime" : 1640521467000,
"storeId" : 156,
"userId" : 12790
}
]
}
}
]
}
}