数据的异构实战(二)手写迷你版同步工程

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: 数据的异构实战(二)手写迷你版同步工程

上一期讲到了通过canal订阅mysql的binlog日志并且转换为对象,那么这一次我们将订阅来的对象通过RocketMQ发送消息,接收方接受消息之后同时存储到其他类型的数据源当中,完成一个简单的数据异构的过程。


image.png



什么是Java消息服务?


两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。


在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果,我们将会在接下来的教程中详细介绍。


jms的消息传送模型


常见的消息传送模型有以下两种:


点对点消息传送模型


在点对点消息传送模型中,应用程序由消息队列,发送者,接收者组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。如下图所示:


image.png


发布订阅消息传送模型


在发布订阅模型中,消费者需要订阅相关的topic才能接收到生产者的信息。生产者会将信息传输到topic中,然后消费者只需要从topic中获取数据即可。如下图所示:


image.png


RocketMQ消息队列使用


这次使用的消息中间件为RocketMQ的使用场景。RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。


RocketMQ在使用之前,需要我们引入相关的依赖配置:


<!-- 整合RocketMq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>


关于RocketMQ的安装在这里就不做过多的讲解了。


通过mq的方式来进行数据异构通常是比较简单的方案,首先我们需要在项目里面独立一个模块专门用于监听mysql的binlog日志,这个模块我暂且称之为datahandle-core模块


image.png


整个工程采用了springboot的结构来构建,主要的核心也是在core工程中。


首先是监听canal的日志状态模块了,采用了上一节中讲解到的客户端代码进行数据监听,并且将其转换为对象然后发送往mq中:


package com.sise.datahandle.core;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import static com.sise.datahandle.constants.CanalConstants.*;
/**
 * @author idea
 * @date 2019/10/20
 */
@Component
@Slf4j
public class CanalListener implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;
    @Override
    public void run(String... args) throws Exception {
      log.info("=============canal监听器开启===============");
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
        canalConnector.connect();
        canalConnector.subscribe(".*\\..*");
        canalConnector.rollback();
        for (; ; ) {
            Message message = canalConnector.getWithoutAck(100);
            long batchId = message.getId();
            if (batchId != -1) {
                canalClient.entityHandle(message.getEntries());
            }
        }
    }
}


ps:这里面的CanalClient代码主要来自上一篇的canal客户端代码,文末会有完整项目代码链接,需要的读者可以前往查看。


在CanalClient里面,有一个函数是专门用于处理将订阅的数据发送到mq消息队列中:


package com.sise.datahandle.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import com.sise.datahandle.handler.CanalDataHandler;
import com.sise.datahandle.model.TypeDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
 * canal监听客户端变化
 *
 * @author idea
 * @date 2019/10/12
 */
@Slf4j
@Service
public class CanalClient {
    @Autowired
    private DefaultMQProducer rocketMqProducer;
    /**
     * 处理binlog日志的监听
     *
     * @param entries
     */
    public void entityHandle(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }
            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    switch (rowChange.getEventType()) {
                        case INSERT:
                            String tableName = entry.getHeader().getTableName();
                            //测试选用t_type这张表进行映射处理
                            if ("t_type".equals(tableName)) {
                                TypeDTO typeDTO = CanalDataHandler.convertToBean(rowData.getAfterColumnsList(), TypeDTO.class);
                                org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message();
                                message.setTopic("canal-test-topic");
                                message.setTags("canal-test-tag");
                                String json = JSON.toJSONString(typeDTO);
                                message.setBody(json.getBytes());
                                SendResult sendResult = rocketMqProducer.send(message);
                                log.info("[mq消息发送结果]----" + sendResult);
                            }
                            break;
                        default:
                            break;
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                log.error("[CanalClient]监听数据过程出现异常,异常信息为{}", e);
            } catch (InterruptedException | RemotingException | MQClientException | MQBrokerException e) {
                log.error("[CanalClient] mq发送信息出现异常:{}", e);
            }
        }
    }
}


这里面主要是监听binlog记录为插入数据事件的时候做发送mq操作。


接下来便是常见的mq配置了,本工程主要是一个模拟的简单案例,因此我将consumer和producer都放在了一起方便测试。


image.png


通过springboot自身的properties文件对mq进行参数初始化配置之后便可以构建一个基本的consumer和producer了。这里我们一个TypeDto类来进行树异构的测试,consumer端的核心代码为:


package com.sise.datahandle.mq.rocketmq.consumer;
import com.sise.datahandle.model.TypeDTO;
import com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle;
import com.sise.datahandle.redis.RedisService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
 * @author idea
 * @date 2019/10/20
 */
@Component
@Slf4j
public class RocketMqConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    @Autowired
    private RedisService redisService;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if(CollectionUtils.isEmpty(msgs)){
            log.info("接受到的消息为空,不处理,直接返回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgs.get(0);
        System.out.println("接受到的消息为:"+messageExt.toString());
        if("canal-test-topic".equals(messageExt.getTopic())){
            if("canal-test-tag".equals(messageExt.getTags())){
                int reconsume = messageExt.getReconsumeTimes();
                if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                TypeDTO typeDTO = RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class);
                //存储进入redis中
                redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO);
            }
        }
        // 如果没有return success ,consumer会重新消费该消息,直到return success
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}


通过订阅mq的信息,读取相关的数据再次写入到redis里面,完成一个简单过程的数据异构。


整个迷你工程写下来,比较核心的地方就在于对binlog日志的解析器部分,如何将日志订阅之后转换为相应的对象进行处理。


通常采用mq的方式进行数据异构会相对简单,实际上是在监听binlog为写DB的同时去写一次MQ,但是这种方式不能够保证数据一致性,就是不能保证跨资源的事务。注:调用第三方远程RPC的操作一定不要放到事务中。


完整案例的代码链接如下(点击阅读原文直达):


https://gitee.com/IdeaHome_admin/wfw


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
XML 数据库 数据格式
嵌入式工作流程开发!工作流 Activiti 框架中子流程的使用指南
本篇文章对工作流Activiti框架中的子流程进行的详尽的分析和说明,在工作流Activiti集成到项目中开发时,可以嵌入子流程进行使用。子流程包括了事件子流程,事务子流程以及调用活动子流程。通过对内嵌子流程的方式的学习,可以帮助我们将工作流框架很好地应用在继承式建模的流程场景下。
1061 0
嵌入式工作流程开发!工作流 Activiti 框架中子流程的使用指南
|
1月前
|
存储 C++ UED
【实战指南】4步实现C++插件化编程,轻松实现功能定制与扩展
本文介绍了如何通过四步实现C++插件化编程,实现功能定制与扩展。主要内容包括引言、概述、需求分析、设计方案、详细设计、验证和总结。通过动态加载功能模块,实现软件的高度灵活性和可扩展性,支持快速定制和市场变化响应。具体步骤涉及配置文件构建、模块编译、动态库入口实现和主程序加载。验证部分展示了模块加载成功的日志和配置信息。总结中强调了插件化编程的优势及其在多个方面的应用。
256 64
|
6月前
|
缓存 测试技术 Android开发
构建高效的Android应用:从设计到实现
【5月更文挑战第2天】 在移动设备日益普及的今天,打造一个既快速又流畅的Android应用对于开发者而言至关重要。本文将深入探讨如何优化Android应用的性能,涵盖UI设计的最佳实践、代码层面的性能提升技巧以及利用最新的Android框架和工具进行应用开发的策略。我们将通过实例分析,揭示那些影响应用响应速度和稳定性的关键因素,并提出切实可行的解决方案,帮助开发者构建出色的用户体验。
|
6月前
|
算法 测试技术 数据处理
【C++ 设计思路】优化C++项目:高效解耦库接口的实战指南
【C++ 设计思路】优化C++项目:高效解耦库接口的实战指南
182 5
|
数据库
易搭工作流引擎用是什么开源 还是阿里自研产品,零代码平台场景页面映射数据库表是动态创建,采用什么框架处理,怎么让系统产生高并发能力。易搭权限有没有了解,求解。
易搭工作流引擎用是什么开源 还是阿里自研产品,零代码平台场景页面映射数据库表是动态创建,采用什么框架处理,怎么让系统产生高并发能力。易搭权限有没有了解,求解。
|
负载均衡
如何手写一个简单的分布式系统框架?
如何手写一个简单的分布式系统框架?
141 0
|
SQL 数据采集 数据库连接
kettle基础概念理解
kettle基础概念理解
134 0
|
SQL Oracle 关系型数据库
「集成架构」2020年最好的15个ETL工具(第二部)
「集成架构」2020年最好的15个ETL工具(第二部)
|
测试技术 Python
【第五篇-完结篇】XiaoZaiMultiAutoAiDevices之改造扩展
在前面系列文章中有讲到,使用configparser,ini格式的文件作为配置文件,在新增或者删除其中的值时,会丢失所有注释,所以在框架源码注释中我有写到,如果对这方面比较介意或者是有需求的话,可以进行更改配置文件。
130 0
|
消息中间件 canal 存储
数据的异构实战(二)手写迷你版同步工程
数据的异构实战(二)手写迷你版同步工程
159 0
下一篇
无影云桌面