Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

0. 引言

最近在将公司的一部分mysql数据同步到es中,采用了logstash-input-jdbc实现全量同步canal实现增量同步,但是还有一个问题就是es中的数据结构需要重新设计,也就导致部分mysql字段需要经过转换,然后同步到es中

首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

1. pipeline实现数据预处理

首先pipeline的作用就是在数据进入索引之前进行预处理,而且其也支持类java的painless语法,可以满足我们当前的业务需求。

下面我以用户表的处理来举例示范。为方便演示和脱敏,已经剔除掉部分数据

1.1 mysql中user结构

mysql8.0

id: Long
code: varchar
real_name: varchar
role_id: varchar ,多个id用逗号隔开
dept_id: varchar ,多个id用逗号隔开
post_id: varchar ,多个id用逗号隔开
create_time: datetime

1.2 es中的user结构

以下演示基于es7.13.0

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      }, 
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },   
      "roleId": {
        "type": "long"
      },
      "deptId": {
        "type": "keyword"
      },
      "postId": {
        "type": "long"
      },  
      "userSource": {
        "type": "integer"
      }
    }
  }
}

1.3 目标

我们需要进行的处理包括:
1、将role_id、dept_id、post_id由字符串转换为数组
2、因为还涉及到要从另外一张微信用户表数据同步到es中,为了区分是来自微信还是pc,通过nickName字段来判定,因为nickName是微信用户表独有的字段。当它存在时说明用户来自于微信表,将userSource标注为1,否则标注为0

1.4 书写pipeline

可以看到直接通过split函数实现字符串转数组,通过自定义脚本来标注userSource的值

更多关于pipeline的使用,可以参考官方文档:ingest pipeline

关于painless语法的使用,也可参考官方文档:painless guide

如果对于pipeline或者自定义脚本的书写有疑惑的,可以留言讨论

PUT _ingest/pipeline/user_mysql_pipeline
{
  "description": "用户数据mysql导入转换为es结构",
  "processors": [
    {
      "split": {
        "field": "roleId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "deptId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "postId",
        "separator": ","
      }
    },
    {
      "script": {
        "lang": "painless", 
        "source": """ 
          if(ctx.containsKey('nickName')){
            ctx.name = ctx.nickName;
            ctx.remove('nickName');
            ctx.userSource = 1;
          }
        """
      }
    }
  ]
}

1.5 调用pipeline

1、使用pipeline需要在es中添加ignest角色,修改es配置文件

node.roles: [ignest]

2、在user的settings中指定pipeline

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "userType": {
        "type": "long"
      },
      "account": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "email": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "phone": {
        "type": "keyword"
      },
      "sex": {
        "type": "integer"
      },
      "roleIds": {
        "type": "long"
      },
      "deptIds": {
        "type": "keyword"
      },
      "postIds": {
        "type": "long"
      },
      "parentDeptIds": {
        "type": "keyword"
      },
      "thirdPlatformUserId": {
        "type": "keyword"
      },
      "tenantUserId": {
        "type": "long"
      },
      "userSource": {
        "type": "integer"
      },
      "tenantId": {
        "type": "keyword"
      },
      "createUser": {
        "type": "long"
      },
      "createDept": {
        "type": "keyword"
      },
      "createTime": {
        "type": "date"
      }
    }
  },
  "settings": {
    "default_pipeline": "user_mysql_pipeline",
    "number_of_replicas": 0,  // 因为我测试用的单节点,所以将副本分片设置为0
    "number_of_shards": 1
  }
}

或者还可以在插入数据的时候指定pipeline,这里因为是自动同步,所以这种方式不适用

PUT user/_doc/1?pipeline=user_mysql_pipeline
{
   ...
}

3、将上述语句在kibana或者其他es客户端中执行后,再启动canal,logstash同步数据,es就会对数据进行预处理了

4、测试,可以看到数据转换成功

GET user/_search?size=100

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
11天前
|
关系型数据库 MySQL Linux
【一键解锁神秘力量!】CentOS 7 通过编译源码方式安装 MySQL 数据库 —— 从零到英雄的数据库安装实战秘籍!
【8月更文挑战第9天】随着业务增长,对数据库的需求日益提高。在 CentOS 7 中,通过编译源码安装 MySQL 可提供更高定制性和灵活性。本文详细介绍从准备环境、下载源码、配置编译参数到安装 MySQL 的全过程,并对比 RPM 包安装方法,帮助读者根据需求选择合适方案。实践时需注意备份数据、选择合适版本、确保安全性和调优性能等要点。
53 1
|
1天前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
28 0
|
4天前
|
存储 关系型数据库 MySQL
Mysql表结构同步存储过程(适用于模版表)
Mysql表结构同步存储过程(适用于模版表)
13 0
|
8天前
|
存储 关系型数据库 MySQL
MySQL中的DISTINCT与GROUP BY:效率之争与实战应用
【8月更文挑战第12天】在数据库查询优化中,DISTINCT和GROUP BY常常被用来去重或聚合数据,但它们在实现方式和性能表现上却各有千秋。本文将深入探讨两者在MySQL中的效率差异,结合工作学习中的实际案例,为您呈现一场技术干货分享。
44 0
|
6天前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
23 0
|
1天前
|
SQL 存储 关系型数据库
数据库-MySQL-01(一)
数据库-MySQL-01(一)
13 4
|
6天前
|
缓存 NoSQL Redis
一天五道Java面试题----第九天(简述MySQL中索引类型对数据库的性能的影响--------->缓存雪崩、缓存穿透、缓存击穿)
这篇文章是关于Java面试中可能会遇到的五个问题,包括MySQL索引类型及其对数据库性能的影响、Redis的RDB和AOF持久化机制、Redis的过期键删除策略、Redis的单线程模型为何高效,以及缓存雪崩、缓存穿透和缓存击穿的概念及其解决方案。
|
8天前
|
Oracle 关系型数据库 MySQL
Mysql和Oracle数据库死锁查看以及解决
【8月更文挑战第11天】本文介绍了解决MySQL与Oracle数据库死锁的方法。MySQL可通过`SHOW ENGINE INNODB STATUS`查看死锁详情,并自动回滚一个事务解除死锁;也可手动KILL事务。Oracle则通过查询V$LOCK与V$SESSION视图定位死锁,并用`ALTER SYSTEM KILL SESSION`命令终止相关会话。预防措施包括遵循ACID原则、优化索引及拆分大型事务。
|
1天前
|
SQL 存储 关系型数据库
数据库-MySQL-03
数据库-MySQL-03
7 0
|
1天前
|
SQL 数据库
数据库-MySQL-02(二)
数据库-MySQL-02(二)
4 0