数据的异构实战(一) 基于canal进行日志的订阅和转换

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
日志服务 SLS,月写入数据量 50GB 1个月
简介: 数据的异构实战(一) 基于canal进行日志的订阅和转换

什么是数据的异构处理。简单说就是为了满足我们业务的扩展性,将数据从某种特定的格式转换到新的数据格式中来。


为什么会有这种需求出现呢?


传统的企业中,主要都是将数据存储在了关系型数据库中,例如说MySQL这种数据库,但是为了满足需求的扩展,查询的维度会不断地增加,那么这个时候我们就需要做数据的异构处理了。


常见的数据异构有哪些?


例如MySQL数据转储到Redis,MySQL数据转储到es等等,也是因为这种数据异构的场景开始出现,陆陆续续有了很多中间件在市场中冒出,例如说rocketMq,kafka,canal这种组件。


下边有一张通俗易懂的数据异构过程图:


网络异常,图片无法展示
|


canal进行数据同步


首先,我们需要正确地打开canal服务器去订阅binlog日志。


关于binlog日志查看常用的几条命令如下:


是否启用了日志
mysql>show variables like 'log_bin';
怎样知道当前的日志
mysql> show master status;
查看mysql binlog模式
show variables like 'binlog_format';
获取binlog文件列表
show binary logs;
查看当前正在写入的binlog文件
show master status\G
查看指定binlog文件的内容
show binlog events in 'mysql-bin.000002';
复制代码


注意binlog日志格式要求为row格式:


ROW格式日志的特点


记录sql语句和每个字段变动的前后情况,能够清楚每行数据的变化历史,占用较多的空间,不会记录对数据没有影响的sql,例如说select语句就不会记录。可以使用mysqlbinlog工具去查看内部信息。


STATEMENT模式的日志内容


STATEMENT格式的日志就和它本身的命名有点类似,只是单独地记录了sql的内容,但是没有记录上下文信息,在数据会UI福的时候可能会导致数据丢失。


MIX模式模式的日志内容


这种模式的日志内容比较灵活,当遇到了表结构变更的时候,就会记录为statement模式,如果遇到了数据修改的话就会变为row模式。


如何配置canal的相关信息?


比较简单,首先通过下载好canal的安装包,然后我们需要在canal的配置文件上边做一些手脚:


canal的example文件夹下边的properties文件
canal.instance.master.address=**.***.***.**:3306
# 日志的文件名称
canal.instance.master.journal.name=master-96-bin.000009
canal.instance.dbUsername=****
canal.instance.dbPassword=****
复制代码


启动我们的canal程序,然后查看日志,如果显示下边这些内容就表示启动成功了:


2019-10-13 16:00:30.072 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set
2019-10-13 16:00:30.734 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2019-10-13 16:00:30.783 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
复制代码


ps:关于canal入门安装的教程网上有很多,这里我就不做过多的阐述了。


canal服务器搭建起来之后,我们便进入了java端的程序编码部分:


接着再来查看我们的客户端代码,客户端中我们需要通过java程序获取canal服务器的连接,然后进入监听binlog日志的状态。


可以参考下边的程序代码:


package com.sise.client.simple;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.sise.common.dto.TypeDTO;
import com.sise.common.handle.CanalDataHandler;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
/**
 * 简单版本的canal监听客户端
 *
 * @author idea
 * @date 2019/10/12
 */
public class SImpleCanalClient {
    private static String SERVER_ADDRESS = "127.0.0.1";
    private static Integer PORT = 11111;
    private static String DESTINATION = "example";
    private static String USERNAME = "";
    private static String PASSWORD = "";
    public static void main(String[] args) throws InterruptedException {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
        canalConnector.connect();
        canalConnector.subscribe(".*\\..*");
        canalConnector.rollback();
        for (; ; ) {
            Message message = canalConnector.getWithoutAck(100);
            long batchId = message.getId();
            if(batchId!=-1){
//                System.out.println(message.getEntries());
                System.out.println(batchId);
                printEntity(message.getEntries());
            }
        }
    }
    public static void printEntity(List<CanalEntry.Entry> entries){
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType()!=CanalEntry.EntryType.ROWDATA){
                continue;
            }
            try {
                CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println(rowChange.getEventType());
                    switch (rowChange.getEventType()){
                    //如果希望监听多种事件,可以手动增加case
                        case INSERT:
                            String tableName = entry.getHeader().getTableName();
                            //测试选用t_type这张表进行映射处理
                            if ("t_type".equals(tableName)) {
                                TypeDTO typeDTO = CanalDataHandler.convertToBean(rowData.getAfterColumnsList(), TypeDTO.class);
                                System.out.println(typeDTO);
                            }
                            System.out.println("this is INSERT");
                            break;
                        default:
                            break;
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 打印内容
     *
     * @param columns
     */
    private static void printColums(List<CanalEntry.Column> columns){
        String line=columns.stream().map(column -> column.getName()+"="+column.getValue())
                .collect(Collectors.joining(","));
        System.out.println(line);
    }
}
复制代码


本地监听到了canal的example文件夹中配置的监听的日志信息之后,就会自动将该日志里面记录的数据进行打印读取。


那么这个时候我们还需要做多一步处理,那就是将坚听到的数据转换为可识别的对象,然后进行对象转移处理。


其实光是链接获取到canal的binlog日志并不困难,接着我们还需要将binlog日志进行统一的封装处理,需要编写一个特定的处理器将日志的内容转换为我们常用的DTO类:

下边这个工具类可以借鉴一下:


package com.sise.common.handle;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.sise.common.dto.CourseDetailDTO;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 基于canal的数据处理器
 *
 * @author idea
 * @data 2019/10/13
 */
@Slf4j
public  class CanalDataHandler extends TypeConvertHandler {
    /**
     * 将binlog的记录解析为一个bean对象
     *
     * @param columnList
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T convertToBean(List<CanalEntry.Column> columnList, Class<T> clazz) {
        T bean = null;
        try {
            bean = clazz.newInstance();
            Field[] fields = clazz.getDeclaredFields();
            Field.setAccessible(fields, true);
            Map<String, Field> fieldMap = new HashMap<>(fields.length);
            for (Field field : fields) {
                fieldMap.put(field.getName().toLowerCase(), field);
            }
            if (fieldMap.containsKey("serialVersionUID")) {
                fieldMap.remove("serialVersionUID".toLowerCase());
            }
            System.out.println(fieldMap.toString());
            for (CanalEntry.Column column : columnList) {
                String columnName = column.getName();
                String columnValue = column.getValue();
                System.out.println(columnName);
                if (fieldMap.containsKey(columnName)) {
                    //基础类型转换不了
                    Field field = fieldMap.get(columnName);
                    Class<?> type = field.getType();
                    if(BEAN_FIELD_TYPE.containsKey(type)){
                        switch (BEAN_FIELD_TYPE.get(type)) {
                            case "Integer":
                                field.set(bean, parseToInteger(columnValue));
                                break;
                            case "Long":
                                field.set(bean, parseToLong(columnValue));
                                break;
                            case "Double":
                                field.set(bean, parseToDouble(columnValue));
                                break;
                            case "String":
                                field.set(bean, columnValue);
                                break;
                            case "java.handle.Date":
                                field.set(bean, parseToDate(columnValue));
                                break;
                            case "java.sql.Date":
                                field.set(bean, parseToSqlDate(columnValue));
                                break;
                            case "java.sql.Timestamp":
                                field.set(bean, parseToTimestamp(columnValue));
                                break;
                            case "java.sql.Time":
                                field.set(bean, parseToSqlTime(columnValue));
                                break;
                        }
                    }else{
                        field.set(bean, parseObj(columnValue));
                    }
                }
            }
        } catch (InstantiationException | IllegalAccessException e) {
            log.error("[CanalDataHandler]convertToBean,初始化对象出现异常,对象无法被实例化,异常为{}", e);
        }
        return bean;
    }
    public static void main(String[] args) throws IllegalAccessException {
        CourseDetailDTO courseDetailDTO = new CourseDetailDTO();
        Class clazz = courseDetailDTO.getClass();
        Field[] fields = clazz.getDeclaredFields();
        Field.setAccessible(fields, true);
        System.out.println(courseDetailDTO);
        for (Field field : fields) {
            if ("java.lang.String".equals(field.getType().getName())) {
                field.set(courseDetailDTO, "name");
            }
        }
        System.out.println(courseDetailDTO);
    }
    /**
     * 其他类型自定义处理
     *
     * @param source
     * @return
     */
    public static Object parseObj(String source){
        return null;
    }
}
复制代码


接着是canal的核心处理器,主要的目的是将binlog转换为我们所希望的实体类对象,该类目前主要考虑兼容的数据类型为目前8种,比较有限,如果读者后续在实际开发中还遇到某些特殊的数据类型可以手动添加到map中。


package com.sise.common.handle;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
 * 类型转换器
 *
 * @author idea
 * @data 2019/10/13
 */
public class TypeConvertHandler {
    public static final Map<Class, String> BEAN_FIELD_TYPE;
    static {
        BEAN_FIELD_TYPE = new HashMap<>(8);
        BEAN_FIELD_TYPE.put(Integer.class, "Integer");
        BEAN_FIELD_TYPE.put(Long.class, "Long");
        BEAN_FIELD_TYPE.put(Double.class, "Double");
        BEAN_FIELD_TYPE.put(String.class, "String");
        BEAN_FIELD_TYPE.put(Date.class, "java.handle.Date");
        BEAN_FIELD_TYPE.put(java.sql.Date.class, "java.sql.Date");
        BEAN_FIELD_TYPE.put(java.sql.Timestamp.class, "java.sql.Timestamp");
        BEAN_FIELD_TYPE.put(java.sql.Time.class, "java.sql.Time");
    }
    protected static final Integer parseToInteger(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        return Integer.valueOf(source);
    }
    protected static final Long parseToLong(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        return Long.valueOf(source);
    }
    protected static final Double parseToDouble(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        return Double.valueOf(source);
    }
    protected static final Date parseToDate(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        if (source.length() == 10) {
            source = source + " 00:00:00";
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date;
        try {
            date = sdf.parse(source);
        } catch (ParseException e) {
            return null;
        }
        return date;
    }
    protected static final java.sql.Date parseToSqlDate(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        java.sql.Date sqlDate;
        Date utilDate;
        try {
            utilDate = sdf.parse(source);
        } catch (ParseException e) {
            return null;
        }
        sqlDate = new java.sql.Date(utilDate.getTime());
        return sqlDate;
    }
    protected static final java.sql.Timestamp parseToTimestamp(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date date;
        java.sql.Timestamp timestamp;
        try {
            date = sdf.parse(source);
        } catch (ParseException e) {
            return null;
        }
        timestamp = new java.sql.Timestamp(date.getTime());
        return timestamp;
    }
    protected static final java.sql.Time parseToSqlTime(String source) {
        if (isSourceNull(source)) {
            return null;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        Date date;
        java.sql.Time time;
        try {
            date = sdf.parse(source);
        } catch (ParseException e) {
            return null;
        }
        time = new java.sql.Time(date.getTime());
        return time;
    }
    private static boolean isSourceNull(String source) {
        if (source == "" || source == null) {
            return true;
        }
        return false;
    }
}
复制代码


ps: t_type表是一张我们用于做测试时候使用的表,这里我们可以根据自己实际的业务需要定制不同的实体类对象


现在我们已经可以通过binlog转换为实体类了,那么接下来就是如何将实体类做额外的传输和处理了。数据的传输我们通常会借助mq这类型的中间件来进行操作,关于这部分的内容我会在后续的文章中做详细的输出。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
SQL 人工智能 运维
在阿里云日志服务轻松落地您的AI模型服务——让您的数据更容易产生洞见和实现价值
您有大量的数据,数据的存储和管理消耗您大量的成本,您知道这些数据隐藏着巨大的价值,但是您总觉得还没有把数据的价值变现出来,对吗?来吧,我们用一系列的案例帮您轻松落地AI模型服务,实现数据价值的变现......
139 3
|
1月前
|
SQL 安全 数据库
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
69 0
|
2月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
40 0
|
2月前
|
开发者 前端开发 编解码
Vaadin解锁移动适配新境界:一招制胜,让你的应用征服所有屏幕!
【8月更文挑战第31天】在移动互联网时代,跨平台应用开发备受青睐。作为一款基于Java的Web应用框架,Vaadin凭借其组件化设计和强大的服务器端渲染能力,助力开发者轻松构建多设备适应的Web应用。本文探讨Vaadin与移动设备的适配策略,包括响应式布局、CSS媒体查询、TouchKit插件及服务器端优化,帮助开发者打造美观且实用的移动端体验。通过这些工具和策略的应用,可有效应对屏幕尺寸、分辨率及操作系统的多样性挑战,满足广大移动用户的使用需求。
40 0
|
2月前
|
存储 运维 监控
Entity Framework Core 实现审计日志记录超棒!多种方法助你跟踪数据变化、监控操作,超实用!
【8月更文挑战第31天】在软件开发中,审计日志记录对于跟踪数据变化、监控用户操作及故障排查至关重要。Entity Framework Core (EF Core) 作为强大的对象关系映射框架,提供了多种实现审计日志记录的方法。例如,可以使用 EF Core 的拦截器在数据库操作前后执行自定义逻辑,记录操作类型、时间和执行用户等信息。此外,也可通过在实体类中添加审计属性(如 `CreatedBy`、`CreatedDate` 等),并在保存实体时更新这些属性来记录审计信息。这两种方法都能有效帮助我们追踪数据变更并满足合规性和安全性需求。
27 0
|
2月前
|
Kubernetes Ubuntu Windows
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
【Azure K8S | AKS】分享从AKS集群的Node中查看日志的方法(/var/log)
|
29天前
|
Java
日志框架log4j打印异常堆栈信息携带traceId,方便接口异常排查
日常项目运行日志,异常栈打印是不带traceId,导致排查问题查找异常栈很麻烦。
|
1月前
|
存储 监控 数据可视化
SLS 虽然不是直接使用 OSS 作为底层存储,但它凭借自身独特的存储架构和功能,为用户提供了一种专业、高效的日志服务解决方案。
【9月更文挑战第2天】SLS 虽然不是直接使用 OSS 作为底层存储,但它凭借自身独特的存储架构和功能,为用户提供了一种专业、高效的日志服务解决方案。
80 9
|
2月前
|
开发框架 .NET Docker
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
|
2月前
|
存储 监控 安全