因为这个功能,产品刚从医院出来,但我想再送他回去

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
日志服务 SLS,月写入数据量 50GB 1个月
简介: 因为这个功能,产品刚从医院出来,但我想再送他回去

前段时间遇到这么个事

产品和我说项目里的人员搜索功能他看着有点不对劲,说罢,用需要加需求的眼神看着我...

哥们虎躯一震...


搞了半天,原来是因为他在人员搜索框内填写部门名称,结果搜索器觉着他是傻子,没理他,所以他边哭喊着爸爸,边来找我

我说那地方本来就是填人名的呀,正常人都填人名...


他回复道,钉钉有啊,钉钉有的我们也要有,如果我们没有,那就是对像他这种有追求的产品职业操守上的侮辱...

我像看弱智一样看了他一会,然后道,“好吧 就为了你的节操...”


后来我查了下源码和数据发现,搜索的数据存储在ES里,但ES里的部门数据只有ID没有名称,所以面临的问题可能需要同步数据,从mysql同步到ES

可是问题是,如果只是同步数据的话还简单,但是同步数据之后需要维护的事情还有很多

就比如,如果当前人换部门了或者部门名称变了,在变更这些数据的时候同时都要考虑ES要怎么更新,这样会越来越繁琐

万一之后的需求有一项极复杂的骚修改...那真是恶心它儿子回家

这倒让我想起了之前的一件恶心事,也是和ES数据同步有关


倒不是问题有多恶心,而是没太看懂前人的代码,他的代码里没有ES的数据更新,但是在修改完mysql数据后,神奇的事情却发生了

我反复走了好几遍代码,拦截器也翻看了好几遍,都没有操作ES的地方,这确实让我挺懵的...


好巧不巧,那二货中午的时候给我发过来一个并夕夕友尽链接,以此为要挟下才只知道原来果然是魔法!

用的是阿里的一个中间件canal,功能确实比较神奇,它会伪装成mysql集群里的一个子节点,当主节点向子节点同步binlog日志的时候,canal可以解析binlog日志,然后发送一条消息到消息队列来同步es数据

具体来说呢,如此操作

Mysql

首先要有个mysql服务器,肯定有集群才有master和slave

然后在MySQL中需要创建一个用户,并授权

// 使用命令登录:mysql -u root -p
// 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
// 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

下一步在MySQL配置文件my.cnf设置如下信息

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:

show variables like 'log_bin'
//查看bin日志文件列表
show binary logs
//查看当前正在写入的binlog文件
show master status

canal

去官网下载页面进行下载:https://github.com/alibaba/canal/releases

解压canal.deployer-1.1.4.tar.gz,我们可以看到里面有四个文件夹 bin conf lib logs

在bin目录下找到startup.bat启动就可以了

java客户端操作

引入maven依赖

<dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.1.4</version>
</dependency>

在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet():

@Component
public class CannalClient implements InitializingBean {
   private final static int BATCH_SIZE = 1000;
   @Override
   public void afterPropertiesSet() throws Exception {
       // 创建链接
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
       try {
           //打开链接
           connector.connect();
           //订阅全部表
           connector.subscribe(".*\\..*");
           //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
           connector.rollback();
           while (true) {
               // 获取指定数量的数据
               Message message = connector.getWithoutAck(BATCH_SIZE);
               //获取批量ID
               long batchId = message.getId();
               //获取批量的数量
               int size = message.getEntries().size();
               //如果没有数据
               if (batchId == -1 || size == 0) {
                   try {
                       //线程休眠2秒
                       Thread.sleep(2000);
                  } catch (InterruptedException e) {
                       e.printStackTrace();
                  }
              } else {
                   //如果有数据,处理数据
                   printEntry(message.getEntries());
              }
               //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
               connector.ack(batchId);
          }
      } catch (Exception e) {
           e.printStackTrace();
      } finally {
           connector.disconnect();
      }
  }
   /**
    * 打印canal server解析binlog获得的实体类信息
    */
   private static void printEntry(List<Entry> entrys) {
       for (Entry entry : entrys) {
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
               //开启/关闭事务的实体类型,跳过
               continue;
          }
           //RowChange对象,包含了一行数据变化的所有特征
           //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
           RowChange rowChage;
           try {
               rowChage = RowChange.parseFrom(entry.getStoreValue());
          } catch (Exception e) {
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
          }
           //获取操作类型:insert/update/delete类型
           EventType eventType = rowChage.getEventType();
           //打印Header信息
           System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                   eventType));
           //判断是否是DDL语句
           if (rowChage.getIsDdl()) {
               System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
          }
           //获取RowChange对象里的每一行数据,打印出来
           for (RowData rowData : rowChage.getRowDatasList()) {
               //删除语句
               if (eventType == EventType.DELETE) {
                   printColumn(rowData.getBeforeColumnsList());
                   //新增语句
              } else if (eventType == EventType.INSERT) {
                   printColumn(rowData.getAfterColumnsList());
                   //更新的语句
              } else {
                   //变更前的数据
                   System.out.println("------->; before");
                   printColumn(rowData.getBeforeColumnsList());
                   //变更后的数据
                   System.out.println("------->; after");
                   printColumn(rowData.getAfterColumnsList());
              }
          }
      }
  }
   private static void printColumn(List<Column> columns) {
       for (Column column : columns) {
           System.out.println(column.getName() + " : " + column.getValue() + "   update=" + column.getUpdated());
      }
  }
}

这样执行就可以了!

canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,是很多企业一种比较常见的数据同步的方案

以上

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
24天前
|
监控 安全 物联网
不会代码,中小企业管理员如何实现巡逻巡更系统
在现代安全管理中,园区、物业和工厂的巡逻巡更至关重要。本文对比了草料二维码、诺怀云系统和邦巡系统三款主流巡逻巡更系统,分析了它们的技术集成、系统复杂度、移动办公与物联网支持、价格与成本等方面的特点,帮助用户选择最适合的解决方案。草料二维码适合低成本、快速部署的中小企业;诺怀云系统适合需要全面移动协同办公的物业管理公司;邦巡系统则适合需要全场景、多业务、高效率巡检协作的企业。
|
3月前
|
存储 算法 C++
【C/C++】C/C++ KTV点歌系统设计与实现(源码+数据+报告)【独一无二】
【C/C++】C/C++ KTV点歌系统设计与实现(源码+数据+报告)【独一无二】
|
6月前
希望阿里的小伙伴在控制台的易用性多上点心,每次问客服好像都是外包人员,啥也不会
希望阿里的小伙伴在控制台的易用性多上点心,每次问客服好像都是外包人员,啥也不会
144 2
|
6月前
|
小程序 JavaScript Java
铁路订票平台小程序|基于微信小程序的铁路订票平台小程序设计与实现(源码+数据库+文档)
铁路订票平台小程序|基于微信小程序的铁路订票平台小程序设计与实现(源码+数据库+文档)
59 0
|
6月前
|
小程序 JavaScript Java
口腔助手|口腔挂号预约小程序|基于微信小程序的口腔门诊预约系统的设计与实现(源码+数据库+文档)
口腔助手|口腔挂号预约小程序|基于微信小程序的口腔门诊预约系统的设计与实现(源码+数据库+文档)
92 0
|
供应链 数据可视化
只用2个小时,我把公司的进销存流程全部搬到了线上!
只用2个小时,我把公司的进销存流程全部搬到了线上!
115 0
|
6月前
|
开发框架 移动开发 小程序
医院智慧导诊系统(源码):让你告别不知道挂什么科的烦恼!
医院智慧导诊系统(源码):让你告别不知道挂什么科的烦恼!
86 1
如何开发自主体育直播足球竞猜系统?说难不难,做好这三步就行了
随着网络技术的发展,体育直播已经成为人们观看体育比赛的主要方式之一。对于想要开发自主体育直播系统的企业或个人来说,以下三步是必须要做的。
|
新零售 人工智能 开发框架
东郊到家预约上门系统开发成熟技术丨方案项目丨案例介绍丨开发功能丨源码版
  新零售就是企业借助互联网,通过大数据、人工智能等一些手段,对产品的生产、流通以及销售的过程俩进行升级改造,从而可以把线上服务、线下服务以及现代的物流进行深度的融合的新零售模式。
|
安全 数据管理 测试技术
同城预约上门理疗推拿按摩系统功能开发实例源码规则解析
同城预约上门理疗推拿按摩系统功能开发实例源码规则解析