Elastic实战:canal同步mysql到es之父子表数据同步|对象型数组同步|nested数组同步

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 最近在做mysql到es的数据同步,涉及到父子表数据同步,特此记录,以供后续参考

0. 引言

最近在做mysql到es的数据同步,涉及到父子表数据同步,特此记录,以供后续参考

关于mysql同步到es的操作明细可参考我之前的博客:
Elastic实战:通过canal1.1.5实现mysql8.0数据增量/全量同步到elasticsearch7.x

1.环境

canal 1.1.5
elasticsearch7.13
mysql 8.0

2. 基础类型数组同步

相关配置实际上在官方文档中都有示例,以下也是基于这些示例来实现的

这种方式针对的是数组中的数据为基础类型,比如List,List等

2.1 sql配置说明

sql支持多表关联自由组合, 但是有一定的限制:

1、主表不能为子查询语句

2、只能使用left outer join即最左表一定要是主表

3、关联从表如果是子查询不能有多张表

4、主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)

5、关联条件只允许主外键的'='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1

6、关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

7、Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射

2.2 配置步骤

es mappings(已剔除部分字段)

{
  "service_comment_driver" : {
    "mappings" : {
      "properties" : {
        "id" : {
          "type" : "keyword"
        },
        "avg" : {
          "type" : "double"
        },
        "comment" : {
          "type" : "text"
        },
        "createTime" : {
          "type" : "date"
        },
        "labels" : {
          "type" : "text",
          "analyzer" : "ik_smart"
        }
      }
    }
  }
}

1、sql
将子表数据通过left join关联,并且将要查询的字段通过group_concat函数拼接起来,group_concat函数的作用是将group by产生的同一个分组中的值连接起来,返回一个字符串结果,并且不同行之间用separator指定的符号隔离

select
          t.id as _id,
          t.avg as avg, 
          t.create_time as createTime,
          t.comment as comment,
          l.labels
 from
          t_service_comment_driver t
 left join 
           (select bussiness_id,group_concat(label order by id desc separator ';') as labels from t_service_comment_label 
           where type=0 group by bussiness_id) l
 on 
         t.id = l.bussiness_id

2、adapter配置文件中添加配置

 objFields:
    labels: array:;           # 数组属性, array:; 代表字段以;分隔的

整体的canal-adapter/conf/es7中的配置文件:comment.yml

dataSourceKey: duola_bussness # 这里的key与上述application.yml中配置的数据源保持一致
outerAdapterKey: esKey # 与上述application.yml中配置的outerAdapters.key一直
destination: example # 默认为example,与application.yml中配置的instance保持一致
groupId:
esMapping:
  _index: service_comment_driver
  _type: _doc
  _id: _id
  sql: "select
          t.id as _id,
          t.avg as avg, 
          t.create_time as createTime,
          t.comment as comment,
          l.labels
        from
          t_service_comment_driver t
        left join 
           (select bussiness_id,group_concat(label order by id desc separator ';') as labels from t_service_comment_label 
           where type=0 group by bussiness_id) l
        on t.id = l.bussiness_id"
  objFields:
    labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
  #etlCondition: "where t.create_time>='{0}'"
  commitBatch: 3000

3、启动adapter

./bin/startup.sh

4、修改对应的数据库表中的数据,然后查看日志

cat logs/adapter/adapter.log

发现已经有更新数据了
在这里插入图片描述
5、查看es中的数据

GET service_comment_driver/_search 

发现labels中的数据已经同步更新了,并且是数组形式,修改子表数据后也会同步更新
在这里插入图片描述

2.3 常见报错

1. Unknown column '_v._id' in 'where clause'

将配置文件中的_id映射调整为_id即可,注意sql中的别名一样要为_id。

_id: _id

sql

select t.id as _id ...

3. 对象型数组同步

3.1 思路

这种方式针对的是数组中是自定义对象的数据,比如List<Object>
对比到es中的结构就是 List<Nested>

针对这一类型的同步,官方没有明确的示例说明能够支持,但是观察官方文档会发现官方提供了一个对象型字段的同步

objFields:
  <field>: object

虽然官方的描述这一类型更针对的是一对一的json型字符串,但是不妨尝试一下,看看是否能够支持json型数组

canal中object是识别的json型字符串,所以我们的思路就是将子表数据转换为json字符串,然后通过object

3.2 配置步骤

1、es mapping

{
  "service_comment_owner" : {
    "mappings" : {
      "properties" : {
        "avg" : {
          "type" : "double"
        },
        "comment" : {
          "type" : "text"
        }, 
        "createTime" : {
          "type" : "date"
        }, 
        "id" : {
          "type" : "keyword"
        }, 
        "labels" : {
          "type" : "nested",
          "properties" : {
            "id" : {
              "type" : "long"
            },
            "label" : {
              "type" : "text",
              "analyzer" : "ik_smart"
            },
            "type" : {
              "type" : "integer"
            }
          }
        }
      }
    }
  }
}

2、sql

select
    t.id as _id, 
    t.avg as avg, 
    t.create_time as createTime, 
    t.comment as comment,
    CONCAT('[',l.labels,']') as labels
from
    t_service_comment_owner t
left join 
    (select bussiness_id,group_concat(json_object('id',id,'type',type,'label',label)) as labels from t_service_comment_label where type=1 group by bussiness_id) l 
on 
    t.id=l.bussiness_id

3、adapter配置文件

 objFields:
    labels: object

4、整体配置文件

dataSourceKey: duola_bussness # 这里的key与上述application.yml中配置的数据源保持一致
outerAdapterKey: esKey # 与上述application.yml中配置的outerAdapters.key一直
destination: example # 默认为example,与application.yml中配置的instance保持一致
groupId:
esMapping:
  _index: service_comment_owner
  _type: _doc
  _id: _id
  sql: "select
    t.id as _id, 
    t.avg as avg, 
    t.create_time as createTime, 
    t.comment as comment,
    CONCAT('[',l.labels,']') as labels
from
    t_service_comment_owner t
left join 
    (select bussiness_id,group_concat(json_object('id',id,'type',type,'label',label)) as labels from t_service_comment_label where type=1 group by bussiness_id) l 
on 
    t.id=l.bussiness_id"
  #etlCondition: "where t.update_time>='{0}'"
  commitBatch: 3000
  objFields:
    labels: object           # 数组或者对象属性

5、启动adapter

./bin/startup.sh

6、修改对应的数据库表中的数据,然后查看日志,会发现日志中有数据输出

cat logs/adapter/adapter.log

7、查询索引数据,注意因为是nested结构,所以使用nested查询

GET service_comment_owner/_search
{
  "query": {
    "nested": {
      "path": "labels",
      "query": {
        "match": {
          "labels.label": "信息"
        }
      }
    }
  }
}

会发现刚刚修改的信息已经更新上去了
在这里插入图片描述

3.3 常见报错

1. RuntimeException: com.alibaba.fastjson.JSONException: not close json text, token : ,

这个错误是因为json识别缺少必要符号导致的,因为我们上述的做法是将对象型数组转换为json数组,json数组需要在有[]符号,将这两个符号添加上就可以了

CONCAT('[',l.labels,']')

4. join型数据同步

4.1 join类型应用场景

所谓join型是指es中的join数据类型,这种类型适用于以下条件的场景
1、父子表结构的数据
2、子表数据明显多于父表数据

join类型不能像关系型数据库中的表连接那样去用,无论是has_child或者has_parent查询都会对索引的查询性能有严重的负面影响,并且会触发global ordinals。所以join类型不能遇到父子表结构就使用,先考虑上述两种方式,当子表数据远超父表数据时再考虑。

4.2 配置步骤

(因暂无应用需求,以下配置说明根据官方文档给出,后续持续更新)
1、es mappings

{
  "mappings":{
    "_doc":{
      "properties":{
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text"
        },
        "email": {
          "type": "text"
        },
        "order_id": {
          "type": "long"
        },
        "order_serial": {
          "type": "text"
        },
        "order_time": {
          "type": "date"
        },
        "customer_order":{
          "type":"join",
          "relations":{
            "customer":"order"
          }
        }
      }
    }
  }
}

2、adapter/es7/customer.yml

esMapping:
  _index: customer
  _type: _doc
  _id: id
  relations:
    customer_order:
      name: customer
  sql: "select t.id, t.name, t.email from customer t"

3、adapter/es7/order.yml配置文件

esMapping:
  _index: customer
  _type: _doc
  _id: _id
  relations:
    customer_order:
      name: order
      parent: customer_id
  sql: "select concat('oid_', t.id) as _id,
        t.customer_id,
        t.id as order_id,
        t.serial_code as order_serial,
        t.c_time as order_time
        from biz_order t"
  skips:
    - customer_id

4、启动服务

./bin/startup.sh
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
缓存 关系型数据库 MySQL
MySQL索引策略与查询性能调优实战
在实际应用中,需要根据具体的业务需求和查询模式,综合运用索引策略和查询性能调优方法,不断地测试和优化,以提高MySQL数据库的查询性能。
227 66
|
2月前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
415 1
|
3月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
134 5
|
3月前
|
架构师 关系型数据库 MySQL
MySQL最左前缀优化原则:深入解析与实战应用
【10月更文挑战第12天】在数据库架构设计与优化中,索引的使用是提升查询性能的关键手段之一。其中,MySQL的最左前缀优化原则(Leftmost Prefix Principle)是复合索引(Composite Index)应用中的核心策略。作为资深架构师,深入理解并掌握这一原则,对于平衡数据库性能与维护成本至关重要。本文将详细解读最左前缀优化原则的功能特点、业务场景、优缺点、底层原理,并通过Java示例展示其实现方式。
149 1
|
2月前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
191 0
|
3月前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
917 0
|
21天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
47 3
|
21天前
|
安全 关系型数据库 MySQL
MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!
《MySQL崩溃保险箱:探秘Redo/Undo日志确保数据库安全无忧!》介绍了MySQL中的三种关键日志:二进制日志(Binary Log)、重做日志(Redo Log)和撤销日志(Undo Log)。这些日志确保了数据库的ACID特性,即原子性、一致性、隔离性和持久性。Redo Log记录数据页的物理修改,保证事务持久性;Undo Log记录事务的逆操作,支持回滚和多版本并发控制(MVCC)。文章还详细对比了InnoDB和MyISAM存储引擎在事务支持、锁定机制、并发性等方面的差异,强调了InnoDB在高并发和事务处理中的优势。通过这些机制,MySQL能够在事务执行、崩溃和恢复过程中保持
54 3
|
21天前
|
SQL 关系型数据库 MySQL
数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog
《数据库灾难应对:MySQL误删除数据的救赎之道,技巧get起来!之binlog》介绍了如何利用MySQL的二进制日志(Binlog)恢复误删除的数据。主要内容包括: 1. **启用二进制日志**:在`my.cnf`中配置`log-bin`并重启MySQL服务。 2. **查看二进制日志文件**:使用`SHOW VARIABLES LIKE &#39;log_%&#39;;`和`SHOW MASTER STATUS;`命令获取当前日志文件及位置。 3. **创建数据备份**:确保在恢复前已有备份,以防意外。 4. **导出二进制日志为SQL语句**:使用`mysqlbinlog`
72 2
|
1月前
|
关系型数据库 MySQL 数据库
Python处理数据库:MySQL与SQLite详解 | python小知识
本文详细介绍了如何使用Python操作MySQL和SQLite数据库,包括安装必要的库、连接数据库、执行增删改查等基本操作,适合初学者快速上手。
227 15