Elastic实战:通过pipeline实现mysql同步数据到es的数据预处理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

0. 引言

最近在将公司的一部分mysql数据同步到es中,采用了logstash-input-jdbc实现全量同步canal实现增量同步,但是还有一个问题就是es中的数据结构需要重新设计,也就导致部分mysql字段需要经过转换,然后同步到es中

首先canal是支持自定义客户端的,需要引入如下依赖,这种方式适合数据转换规则比较复杂,具有强定制性的场景,但是考虑到我这里还要做logstash的数据同步,因此需要一个比较通用的方式来实现数据转换处理,因此我用到了es的pipeline来做预处理

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
AI 代码解读

1. pipeline实现数据预处理

首先pipeline的作用就是在数据进入索引之前进行预处理,而且其也支持类java的painless语法,可以满足我们当前的业务需求。

下面我以用户表的处理来举例示范。为方便演示和脱敏,已经剔除掉部分数据

1.1 mysql中user结构

mysql8.0

id: Long
code: varchar
real_name: varchar
role_id: varchar ,多个id用逗号隔开
dept_id: varchar ,多个id用逗号隔开
post_id: varchar ,多个id用逗号隔开
create_time: datetime
AI 代码解读

1.2 es中的user结构

以下演示基于es7.13.0

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      }, 
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },   
      "roleId": {
        "type": "long"
      },
      "deptId": {
        "type": "keyword"
      },
      "postId": {
        "type": "long"
      },  
      "userSource": {
        "type": "integer"
      }
    }
  }
}
AI 代码解读

1.3 目标

我们需要进行的处理包括:
1、将role_id、dept_id、post_id由字符串转换为数组
2、因为还涉及到要从另外一张微信用户表数据同步到es中,为了区分是来自微信还是pc,通过nickName字段来判定,因为nickName是微信用户表独有的字段。当它存在时说明用户来自于微信表,将userSource标注为1,否则标注为0

1.4 书写pipeline

可以看到直接通过split函数实现字符串转数组,通过自定义脚本来标注userSource的值

更多关于pipeline的使用,可以参考官方文档:ingest pipeline

关于painless语法的使用,也可参考官方文档:painless guide

如果对于pipeline或者自定义脚本的书写有疑惑的,可以留言讨论

PUT _ingest/pipeline/user_mysql_pipeline
{
  "description": "用户数据mysql导入转换为es结构",
  "processors": [
    {
      "split": {
        "field": "roleId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "deptId",
        "separator": ","
      }
    },
    {
      "split": {
        "field": "postId",
        "separator": ","
      }
    },
    {
      "script": {
        "lang": "painless", 
        "source": """ 
          if(ctx.containsKey('nickName')){
            ctx.name = ctx.nickName;
            ctx.remove('nickName');
            ctx.userSource = 1;
          }
        """
      }
    }
  ]
}
AI 代码解读

1.5 调用pipeline

1、使用pipeline需要在es中添加ignest角色,修改es配置文件

node.roles: [ignest]
AI 代码解读

2、在user的settings中指定pipeline

PUT user
{
  "mappings": {
    "properties": {
      "code": {
        "type": "keyword"
      },
      "userType": {
        "type": "long"
      },
      "account": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "realName": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "email": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "phone": {
        "type": "keyword"
      },
      "sex": {
        "type": "integer"
      },
      "roleIds": {
        "type": "long"
      },
      "deptIds": {
        "type": "keyword"
      },
      "postIds": {
        "type": "long"
      },
      "parentDeptIds": {
        "type": "keyword"
      },
      "thirdPlatformUserId": {
        "type": "keyword"
      },
      "tenantUserId": {
        "type": "long"
      },
      "userSource": {
        "type": "integer"
      },
      "tenantId": {
        "type": "keyword"
      },
      "createUser": {
        "type": "long"
      },
      "createDept": {
        "type": "keyword"
      },
      "createTime": {
        "type": "date"
      }
    }
  },
  "settings": {
    "default_pipeline": "user_mysql_pipeline",
    "number_of_replicas": 0,  // 因为我测试用的单节点,所以将副本分片设置为0
    "number_of_shards": 1
  }
}
AI 代码解读

或者还可以在插入数据的时候指定pipeline,这里因为是自动同步,所以这种方式不适用

PUT user/_doc/1?pipeline=user_mysql_pipeline
{
   ...
}
AI 代码解读

3、将上述语句在kibana或者其他es客户端中执行后,再启动canal,logstash同步数据,es就会对数据进行预处理了

4、测试,可以看到数据转换成功

GET user/_search?size=100
AI 代码解读

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
428
分享
相关文章
MySQL索引策略与查询性能调优实战
在实际应用中,需要根据具体的业务需求和查询模式,综合运用索引策略和查询性能调优方法,不断地测试和优化,以提高MySQL数据库的查询性能。
265 66
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
135 16
MySQL中的模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其在处理搜索功能时。虽然Elasticsearch(ES)等搜索引擎在处理文本搜索方面表现出色,但在一些场景下,直接使用MySQL数据库实现模糊匹配也是一个经济且高效的选择。本文将分享如何在不引入ES的情况下,利用MySQL实现模糊匹配的五大步骤和十个实战案例。
230 1
MySQL模糊匹配技巧:无需ES的高效实现
在数据库应用中,模糊匹配是一个常见的需求,尤其是在不引入Elasticsearch(ES)等外部搜索引擎的情况下。MySQL作为强大的关系型数据库,提供了多种实现模糊匹配的方法。本文将分享如何在MySQL中实现模糊匹配,并提供五大步骤和十个实战案例,帮助你提升查询效率和性能。
372 1
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
507 1
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
163 5
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第26天】数据库作为现代应用系统的核心组件,其性能优化至关重要。本文主要探讨MySQL的索引策略与查询性能调优。通过合理创建索引(如B-Tree、复合索引)和优化查询语句(如使用EXPLAIN、优化分页查询),可以显著提升数据库的响应速度和稳定性。实践中还需定期审查慢查询日志,持续优化性能。
278 0
【深入了解MySQL】优化查询性能与数据库设计的深度总结
本文详细介绍了MySQL查询优化和数据库设计技巧,涵盖基础优化、高级技巧及性能监控。
91 0
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
64 3

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等