基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
数据传输服务 DTS,数据同步 small 3个月
推荐场景:
数据库上云
简介: 基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输

canal的介绍

canal的历史由来

在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了canal,主要是基于trigger(触发器)的方式获取增量变更。从 2010 年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。

当前的 canal 支持的数据源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)

canal的应用场景

目前普遍基于日志增量订阅和消费的业务,主要包括

  • 基于数据库增量日志解析,提供增量数据订阅和消费
  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

canal的工作原理

在介绍canal的原理之前,我们先来了解下MySQL主从复制的原理

MySQL主从复制原理

084cc60905975a6c5f6d1d6ce8ec4ae.png

  • MySQL master 将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log events,可以通过show binlog events命令进行查看
  • MySQL slave 会将 master 的binary log中的binary log events 拷贝到它的中继日志relay log
  • MySQL slave 重读并执行relay log 中的事件,将数据变更映射到它自己的数据库表中

了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?

canal工作原理

4ccf0d7f6d93ab639121986742ca187.png

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (也就是 canal )

canal 解析 binary log 对象(数据为byte流)

基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现mysql实时增量数据传输的功能。

既然canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。

canal的Docker环境准备

因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。

什么是Docker

相信绝大多数人都使用过虚拟机Vmware,在使用Vmware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且Vmware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。

为了便于大家快速理解Docker,便与Vmware做对比来做介绍,docker 提供了一个开始,打包,运行app的平台,把app(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似Vmware的系统镜像)与容器(类似Vmware里安装的系统)

什么是Image(镜像)

  • 文件和meta data的集合(root filesystem)
  • 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
  • 不同的image可以共享相同的layer
  • Image本身是read-only的

fbbf09c8760dc1354e119dfba06c618.png

什么是Container(容器)

  • 通过Image创建(copy)
  • 在Image layer 之上建立一个container layer(可读写)
  • 类比面向对象:类和实例
  • Image负责app的存储和分发,Container负责运行app
  • 096a980a5fb626a66c43e93335950c8.png

Docker的网络介绍

Docker的网络类型有三种:

bridge:桥接网络

默认情况下启动的Docker容器,都是使用 bridge,Docker安装时创建的桥接网络,
每次Docker容器重启时,会按照顺序获取对应的IP地址,
这个就导致重启下,Docker的IP地址就变了

none:无指定网络

使用 --network=none ,docker 容器就不会分配局域网的IP

host:主机网络

使用 --network=host,此时,Docker 容器的网络会附属在主机上,两者是互通的。
例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。

创建自定义网络:(设置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork

查看存在的网络类型docker network ls

ThahadeMacBook-Pro:~ hana$ docker network ls
NETWORK ID  NAME  DRIVER  SCOPE 
547d75277780  bridge  bridge  local 
cd57bd432a9b  host  host  local 
82354ec0c7d2  mvnetwork bridge  local 
270f35f46b8f  none  null  local 

搭建canal环境

附上Docker的下载安装地址==> Docker Download

下载canal镜像docker pull canal/canal-server

rootedefault:"# docker pull canal/canal-server Using default taq: latest
latest: Pulling from canal/canal-server
1c8f9aa56c90:Downloading 2.161 MB/69.8 MB c5e21c824d1c: Dowmloading 2.047 MB/50.09 MB4ba7edb60123:Waiting5160a1c76c08:Waiting adbb838a0064:Waiting
ab50c0db3511:Waiting  

下载mysql镜像docker pull mysql,下载过的则如下图

hahadeMacBook-Pro:~ haha$ docker pull mysql Using default tag: latest
latest: Pulling from library/mysgl
Digest:sha256:415ac63da0ae6725d5aefc9669a1c02f39a00c574fdbc478dfd08 db1e97c8f1b
Status: Image is up to date for mysgl:latest  

查看已经下载好的镜像docker images

hahadeMacBook-Pro:~ haha$ docker images
REPOSITORY  TAG IMAGE ID  CR  REATED  SIZE  
3
rabbitmq  3.7.15-management 1482b87815ec  weeks ago 192MB 
mysql latest  c7109f74d339  weeks ago 443MB 
6
kibana  7.1.1 67f17df6ca3e  weeks ago 746MB 
elasticsearch 7.1.1 b0e9f9f047e6  6 weeks ago 894MB 
3
canal/canal-server  latest  b9c3d95520a5  months ago  831MB 
zookeeper 3.4.13  4ebfb9474e72  months ago  150MB 
mobz/elasticsearch-head 5-alpine  e2a76963bc18  2 years ago 78.9MB  

接下来通过镜像生成mysql容器与canal-server容器

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip   #指定分配ip

查看docker中运行的容器docker ps

hahadeMacBook-Pro:~ haha$ docker ps
CONTAINER ID  IMAGE COMMAND CREATED STATUS  PORTS NAM 
ES
8254af09f19d  mysql "docker-entry point.s." 21 hours ago  Up 11 hours 0.0.0.0:3306->33  06/tcp,33060/tcp  mys 
91
4d5bf7a14209  mobz/elasticsearch-head:5-alpine  "/bin/sh -c node_mo..." 21 hours ago  Up 11 hours 0.0.0.0:9100->91  00/tcp  ela 
sticsearch-head
ca198845f55c  canal/canal-server  "/alidata/bin /main.s..." 21 hours ago  Up 32 minutes 2222/tcp,8000/t cp,11112/tcp,0.0.0.0:11111->11111/tcp can 
al-server
sticsesren  b5c1e3a581f4  elasticsearch:7.1.1 "/usr/local/b in/dock.."  21 hours ago  Up 11 hours 0.0.0.0:9200->92  00/tcp,0.0.0.0:9300->9300/tcp 

MySQL的配置修改

以上只是初步准备好了基础的环境,但是怎么让canal伪装成salve并正确获取mysql中的binary log呢?

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,通过修改mysql配置文件来开启bin_log,使用 find / -name my.cnf 查找my.cnf, 修改文件内容如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

进入mysql容器docker exec -it mysql bash

创建链接MySQL的账号canal并授予作为 MySQL slave 的权限, 如果已有账户可直接 GRANT

mysql -uroot -proot
# 创建账号
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新并应用
FLUSH PRIVILEGES;

数据库重启后, 简单测试 my.cnf 配置是否生效

mysgl> show variables like 'log bin';
----------
Variable name |Value |
----  .+--- --- 
log_bin ION 
--  ----- ----- 
1 row in set (0.0l sec)
mysgl> show variables like 'binlog format';
------  --- 
Variable name /Value 
---
binlog format |ROW
- ------- --- 
1 row in set (0.00 sec)
mysgl> show master status;
---
| File  Position|Binlog Do DB | Binlog Ignore DB | Executed Gtid Set l  
--  --- 
---
mysql-bin.0000046160/
1 row in set (0.00 sec)
mysgl> show master status\G;
*************************** 1. roW *********************
File:mysql-bin.000004 Position:6160 Binlog_Do DB: Binlog Ignore DB: Executed Gtid Set:
I row in set (0.00 sec) 
show variables like 'log_bin';
show variables like 'log_bin';
show master status;

canal-server的配置修改

进入canal-server容器

docker exec -it canal-server bash

编辑canal-server的配置

vi canal-server/conf/example/instance.properties
#################################################
mvsal servertd_ v1 a 26+ will autoGen canal.instance.mvsgl.slaveId=1234
# enable gtid use true/false canal.instance.gtidon=false
改成自己的数据库信息
# nosition info
canal.instance.master.address=172.18.0.6:3306 canal.instance.master.iournal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysq1://127.0.0.13306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standbv.iournal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=
# username/password 改成自己的数据库信息  
canal.instance.dbUsername=canal canal.instance.dbPassword=canal
#canal.instance.defaultDatabaseName=studentdb canal.instance.connectionCharset =UTF-8# enable druid Decrypt database password
canal.instance.enableDruid=false  @程十掘金技术社区 
#canal.instance.pwdPublicKey=MFWWDQYJKoZIhVCNAQEBBQADSWAWSAJBALK4BUxdD1tRRE5/zXpVEVPUgunvscYFtEip3pmL1hrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfi

更多配置请参考==>canal配置说明

重启canal-server容器docker restart canal-server进入容器查看启动日志

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
019-07-09 17:33:09.620 [destination = example address = /172.18.0.6:3306 EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEve ntParserProxy----> find start position su essfully,EntryPosition[included=falseiournalName=binlog.000004,pos ition=1899,serverId=1,gtid=,timestamp=1562655804000] cost :120ms the next step is binlog dump019-07-09 21:26:32.816 [main] INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [canal.properties]
019-07-09 21:26:32.822 [main]INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [example/instance.properties]
019-07-09 21:26:33.304 [main] WARN o.s.beans.GenericTypeAwareProper tyDescriptor-Invalid JavaBean property 'connectionCharset' being a ccessed! Ambiguous write methods found next o actually used [public void com.alibaba.otter.canal.parse.inbound.m sql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pan inbound.msa1.AbstractMvsqEventParser.setConnectionCharsetiava.ni o.charset.charset)]
019-07-09 21:26:33.429 [main] INFO c.a.o.c.i.spring.support.Property yPlaceholderConfigurer - Loading properties file from class path reso ource [canal.properties]
919-07-09 21:26:33.430 [main] INFO c.a.o.c.i.spring.support.property paceholderConfiaurer -Loading properties file from class path reso ource [example/instance.properties919-07-09 21:26:33.986main ERROR com.alibaba.druid.pool.DruidData Source-testWhileIdle is true, validationQuery not set
019-07-09 21:26:34.729  [main] INFO c.a.otter.canal.instance.spring.  CanalInstanceWithSpring - start CannalInstance for 1-example  
019-07-09 21:26:34.752 [main] WARN  c.a.o.canal.parse.inbound.mysgl.l dbsync.LogEventConvert ---> init table filter :^.*\..*$ 
019-07-09 21:26:34.753  [main] WARN c.a.o.canal.parse.inbound.mysgl.  dbsync.LogEventConvert ---> init table black filter : 
019-07-09 21:26:34.794 [main] INFO  c.a.otter.canal.instance.core.Ab: stractCanalInstance - start successful. 
019-07-09 21:26:35.107 [destination = example   address = /172.18.0.  6:3306  EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEver ntParserProxy - ---> begin to find start po 
tion, it will be long time for reset or first position  

至此,我们的环境工作准备完成!!!

拉取数据并同步保存到ElasticSearch

本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取canal解析后的binlog数据。首先我们基于spring boot搭建一个canal demo应用。结构如下图所示

cb3da1287ac93e1770e6e229ddf9156.png

Student.java

package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
/**
 * 普通的实体domain对象
 * @Data 用户生产getter、setter方法
  */
@Data
public class Student implements Serializable {
    private String id;
    private String name;
    private int age;
    private String sex;
    private String city;
}

CanalConfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
 * 配置一些跟canal相关到配置与公共bean
 * @author haha
 */
@Configuration
public class CanalConfig {
    // @Value 获取 application.properties配置中端内容
    @Value("${canal.server.ip}")
    private String canalIp;
    @Value("${canal.server.port}")
    private Integer canalPort;
    @Value("${canal.destination}")
    private String destination;
    @Value("${elasticSearch.server.ip}")
    private String elasticSearchIp;
    @Value("${elasticSearch.server.port}")
    private Integer elasticSearchPort;
    @Value("${zookeeper.server.ip}")
    private String zkServerIp;
    /**
     * 获取简单canal-server连接
      */
    @Bean
    public CanalConnector canalSimpleConnector() {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
        return canalConnector;
    }
    /**
     * 通过连接zookeeper获取canal-server连接
      */
    @Bean
    public CanalConnector canalHaConnector() {
        CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
        return canalConnector;
    }
    /**
     * elasticsearch 7.x客户端
      */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
        );
        return client;
    }
}

CanalDataParser.java

由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从github上获取

/**
     * 元祖类型的对象定义
     * @param <A>
     * @param <B>
     */
    public static class TwoTuple<A, B> {
        public final A eventType;
        public final B columnMap;
        public TwoTuple(A a, B b) {
            eventType = a;
            columnMap = b;
        }
    }
    /**
     * 解析canal中的message对象内容
     * @param entrys
     * @return
     */
    public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
        List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
        for (Entry entry : entrys) {
            // binlog event的事件事件
            long executeTime = entry.getHeader().getExecuteTime();
            // 当前应用获取到该binlog锁延迟的时间
            long delayTime = System.currentTimeMillis() - executeTime;
            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // 当前的entry(binary log event)的条目类型属于事务
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程id,事务耗时
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()),
                                    simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(),
                                    String.valueOf(delayTime)});
                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务id
                    logger.info("----------------\n");
                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime)});
                }
                continue;
            }
            // 当前entry(binary log event)的条目类型属于原始数据
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    // 获取储存的内容
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }
                // 获取当前内容的事件类型
                EventType eventType = rowChage.getEventType();
                logger.info(row_format,
                        new Object[]{entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                                entry.getHeader().getTableName(), eventType,
                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                entry.getHeader().getGtid(), String.valueOf(delayTime)});
                // 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }
                printXAInfo(rowChage.getPropsList());
                // 循环当前内容条目的具体数据
                for (RowData rowData : rowChage.getRowDatasList()) {
                    List<CanalEntry.Column> columns;
                    // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }
                    HashMap<String, Object> map = new HashMap<>(16);
                    // 循环把列的name与value放入map中
                    for (Column column: columns){
                        map.put(column.getName(), column.getValue());
                    }
                    rows.add(new TwoTuple<>(eventType, map));
                }
            }
        }
        return rows;
    }

ElasticUtils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;
/**
 * es的crud工具类
 * @author haha
 */
@Slf4j
@Component
public class ElasticUtils {
    @Autowired
    private  RestHighLevelClient restHighLevelClient;
    /**
     * 新增
     * @param student
     * @param index 索引
     */
    public  void saveEs(Student student, String index) {
        IndexRequest indexRequest = new IndexRequest(index)
                .id(student.getId())
                .source(JSON.toJSONString(student), XContentType.JSON)
                .opType(DocWriteRequest.OpType.CREATE);
        try {
            IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("保存数据至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("保存数据至elasticSearch失败: {}", e);
        }
    }
    /**
     * 查看
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void getEs(String index, String id) {
        GetRequest getRequest = new GetRequest(index, id);
        GetResponse response = null;
        try {
            response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            Map<String, Object> fields = response.getSource();
            for (Map.Entry<String, Object> entry : fields.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue());
            }
        } catch (Exception e) {
            log.error("从elasticSearch获取数据失败: {}", e);
        }
    }
    /**
     * 更新
     * @param student
     * @param index 索引
     * @throws Exception
     */
    public  void updateEs(Student student, String index)  {
        UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
        updateRequest.doc(JSON.toJSONString(student), XContentType.JSON);
        UpdateResponse response = null;
        try {
            response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("更新数据至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("更新数据至elasticSearch失败: {}", e);
        }
    }
    /**
     * 根据id删除数据
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void DeleteEs(String index, String id) {
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        DeleteResponse response = null;
        try {
            response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            log.info("从elasticSearch删除数据成功:{}", response.getId());
        } catch (Exception e) {
            log.error("从elasticSearch删除数据失败: {}", e);
        }
    }
}

BinLogElasticSearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
 * 获取binlog数据并发送到es中
 *
 * @author haha
 */
@Slf4j
@Component
public class BinLogElasticSearch {
    @Autowired
    private CanalConnector canalSimpleConnector;
    @Autowired
    private ElasticUtils elasticUtils;
    //@Qualifier("canalHaConnector")使用名为canalHaConnector的bean
    @Autowired
    @Qualifier("canalHaConnector")
    private CanalConnector canalHaConnector;
    public void binLogToElasticSearch() throws IOException {
        openCanalConnector(canalSimpleConnector);
        // 轮询拉取数据
        Integer batchSize = 5 * 1024;
        while (true) {
//            Message message = canalHaConnector.getWithoutAck(batchSize);
            Message message = canalSimpleConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();
            log.info("当前监控到binLog消息数量{}", size);
            if (id == -1 || size == 0) {
                try {
                    // 等待4秒
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //1. 解析message对象
                List<CanalEntry.Entry> entries = message.getEntries();
                List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
                for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
                    if (tuple.eventType == CanalEntry.EventType.INSERT) {
                        Student student = createStudent(tuple);
                        // 2。将解析出的对象同步到elasticSearch中
                        elasticUtils.saveEs(student, "student_index");
                        // 3.消息确认已处理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.UPDATE) {
                        Student student = createStudent(tuple);
                        elasticUtils.updateEs(student, "student_index");
                        // 3.消息确认已处理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.DELETE) {
                        elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                }
            }
        }
    }
    /**
     * 封装数据至Student对象中
     *
     * @param tuple
     * @return
     */
    private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
        Student student = new Student();
        student.setId(tuple.columnMap.get("id").toString());
        student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
        student.setName(tuple.columnMap.get("name").toString());
        student.setSex(tuple.columnMap.get("sex").toString());
        student.setCity(tuple.columnMap.get("city").toString());
        return student;
    }
    /**
     * 打开canal连接
     *
     * @param canalConnector
     */
    private void openCanalConnector(CanalConnector canalConnector) {
        //连接CanalServer
        canalConnector.connect();
        // 订阅destination
        canalConnector.subscribe();
    }
    /**
     * 关闭canal连接
     *
     * @param canalConnector
     */
    private void closeCanalConnector(CanalConnector canalConnector) {
        //关闭连接CanalServer
        canalConnector.disconnect();
        // 注销订阅destination
        canalConnector.unsubscribe();
    }
}

CanalDemoApplication.java(spring boot 启动类)

package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 应用的启动类
 * @author haha
 */
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
    @Autowired
    private BinLogElasticSearch binLogElasticSearch;
    public static void main(String[] args) {
        SpringApplication.run(CanalDemoApplication.class, args);
    }
    // 程序启动则执行run方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        binLogElasticSearch.binLogToElasticSearch();
    }
}

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200

canal集群高可用的搭建

通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canal的多实例集群方式如何搭建呢!

基于zookeeper获取canal实例

准备zookeeper的docker镜像与容器

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

最终效果如图

hahadeMacBook-Pro:~ haha$ docker ps
CONTAINER ID  IMAGE COMMAND CREATED STATUS  PORTS 
NAMES
3f920cfac809  zookeeper zookeeper "/docker-entr rypoint..." 13 hours ago  Up 13 hours 2888/tcp,3888/t ccp,0.0.0.0:2181->2181/tcp,8080/tcp 
75f69fbd93a9  canal-server2:1.0 "/alidata/bin /main.s..." 22 hours ago  Up 12 hours 2222/tcp,8000/t tcp,11111-11112/tcp,  0.0.0.0:11113->11113/tcp  
canal-server2
8254af09f19d  mysql "docker-entry ypoint.s..."  47 hours ago  Up 13 hours 0.0.0.0:3306->33  306/tcp,33060/tcp 
mysql
4d5bf7a14209  mobz/elasticsearch-head:5-alpine  "/bin/sh -c node_mo..." 47 hours ago  Up 13 hours 0.0.0.0:9100->91  100/tcp 
elasticsearch-head
ca198845f55c  canal/canal-server  "/alidata/bin /main.s..." 47 hours ago  Up 2 hours  2222/tcp,8000/t tcp,11112/tcp,  0.0.0.0:11111->11111/tcp  
b5c1e3a581f4  elasticsearch:7.1.1 "/usr/local/b in/dock..." 47 hours ago  Up 13 hours 0.0.0.0:9200->92  200/tcp,0.0.0.0:9300->9300/tcp
  1. 机器准备
  • 运行canal的容器ip: 172.18.0.4 , 172.18.0.8
  • zookeeper容器ip:172.18.0.3:2181
  • mysql容器ip:172.18.0.6:3306
  1. 按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
  2. 修改canal.properties,加上zookeeper配置并修改canal端口
canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  1. 创建example目录,并修改instance.properties
canal.instance.mysql.slaveId = 1235 
#之前的canal slaveId是1234,保证slaveId不重复即可
canal.instance.master.address = 172.18.0.6:3306

注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置

启动两个不同容器的canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是 172.18.0.4

2019-07-09 21:26:36.215 [destination = example , address = /172.18.0.6:3306,EventParser WARN c.a.o.c.p.inbound.mvsal.rds.RdsBinloaE entParserProxy - ---> find start position suc cessfully,EntryPosition[included=falsejournalName=binlog.000004,po osition=2098,serverId=1,gtid=,timestamp=1562672817000] cost :1085ms, the next step is binlog dump2019-07-09 22:21:27.458Thread-7INFO c.a.otter.canal.instance.co ore.AbstractCanalInstance - stop CannalInstance for null-example2019-07-09 22:21:27.586Thread-7] INFO c.a.otter.canal.instance.co are.AbstractCanalInstance - stop successful....
2019-07-09 22:57:58.581 [main] INFO c.a.o.c.i.spring.support.Proper tyPlaceholderConfigurer-Loading properties file from class path re  esource [canal.properties]  
2019-07-09 22:57:58.602 [main] INFO c.a.o.c.i.spring.support.Proper tvplaceholderConfigurer -Loading properties file from class path re source [example/instance.properties]  
2019-07-09 22:57:59.278 [main] WARN o.s.beans.GenericTypeAwarePrope ertyDescriptor -Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound. mysq1.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pars e.inbound.mvsa1.AbstractMvsaEventParser.setConnectionCharsetiava.n io.charset.Charset)]
2019-07-09 22:57:59.366 [main] INFO c.a.o.c.i.spring.support.Proper rtyPlaceholderConfigurer - Loading properties file from class path re source [canal.properties] 
2019-07-09 22:57:59.367 [main] INFO c.a.o.c.i.spring.support.proper tyPlaceholderConfigurer -Loading properties file from class path re source [example/instance.properties]  
2019-07-09 22:57:59.836[main] ERROR com.alibaba.druid.pool.DruidDat aSource - testWhileIdle is true, validationQuerv not set
2019-07-09 22:58:00.497 [main] INFO c.a.otter.canal.instance.spring Cana InstanceWithSpring - start Canna instance for 1-example  
2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] dbsvnc.ogEventConvert ---> init table filter:^,*\.,*$ 
2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] adbsync.LogEventConvert - --> init table black filter : 
2019-07-09 22:58:00.569 [main] INFO c.a.otter.canal.instance.core.A ostractCanalInstance -start successful....  
2019-07-09 22:58:00.894 [destination = example , address = /172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - ---> begin to find start pos ition, it will be long time for reset or first position
2019-07-09 22:58:00.900 [destination = example address=/172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - prepare to find start positi
on just show master status

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running  
{"active":true,"address":"172.18.0.4:11111","cid":1}

客户端链接, 消费数据

可以通过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}

对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");

链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等 (聪明的你,应该也可以发现,canal client也可以支持HA功能)

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001}

数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点.  (下次你重启client时,会从这最后一个位点继续进行消费)

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}

停止正在工作的172.18.0.4的canal server

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh

这时172.18.0.8会立马启动example instance,提供新的数据服务

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}

与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成

异常与总结

elasticsearch-head无法访问elasticsearch

es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml  
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重启es服务

elasticsearch-head查询报406 Not Acceptable

▼ General
RequestURL: http://127.0.0.1:9200/student index/ search Request Method: POST
Status Code:  406 Not Acceptable  
Remote Address: 127.0.0.1:9200
Referrer Policy: no-referrer-when-downgrade
▼Response Headers
access-control-allow-origin: *
content-encoding:qzip
解决方法:
1、进入head安装目录;
2、cd _site/
3、编辑vendor.js  共有两处
     #6886行   contentType: "application/x-www-form-urlencoded
    改成 contentType: "application/json;charset=UTF-8"
     #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
    改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&

使用elasticsearch-rest-high-level-client报org.elasticsearch.action.index.IndexRequest.ifSeqNo

#pom中除了加入依赖
<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>7.1.1</version>
</dependency>
#还需加入
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.1.1</version>
</dependency>

相关参考 git hub issues

为什么ElasticSearch要在7.X版本不能使用type?

参考:为什么ElasticSearch要在7.X版本去掉type?

使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考RestHighLevelClient API

设置docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always [containerID]

docker for Mac network host 模式不生效

host 模式是为了性能,但是这却对 docker 的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用 --netwokr host 开启 Host 模式,但需要注意的是,如果你用 Windows 或 Mac 本地启动容器的话,会遇到 host 模式失效的问题。原因是 host 模式只支持 Linux 宿主机。

参见官方文档:docs.docker.com/network/hos…

客户端连接zookeeper报authenticate using SASL(unknow error)

Opening socket connection to server 192.168.124.5/192.168.124.5:2181. Will not attempt to authenticate using SASL (unknown error):Session 0x0 for server null, unexpected error, closing soc ket connection and attempting reconnect
  • zookeeper.jar与dokcer中的zookeeper版本不一致
  • zookeeper.jar 使用了3.4.6之前的版本

出现这个错的意思是zookeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。

在项目代码中加入

System.setProperty("zookeeper.sasl.client", "false");

,如果是spring boot 项目可以在application.properties中加入

zookeeper.sasl.client=false

参考:Increased CPU usage by unnecessary SASL checks

如果更换canal.client.jar中依赖的zookeeper.jar的版本

把canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git,然后修改client模块下pom.xml文件中关于zookeeper的内容,然后重新mvn install

<artifactId>netty-all</artifactId></dependency><!-- zk --><dependency>
<groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency><dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId></dependency>
<!-- external --><dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId></dependency><dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId></dependency><dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId></dependency><dependency>
<groupId>com.alibaba</groupId>  

把自己项目依赖的包替换为刚刚mvn install生产的包

<version>7.1.1</version></dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
<dependency>
<groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId> k!_-<version>1.1.3</version>-
<version>1.1.4-SNAPSH0T</version></dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId><version>2.6</version></dependency>
project >dependencies >dependency >artifactld

zookeeper返回的是docker容器中的ip,而宿主机ip与容器ip不是同一个网段,无法ping通

修改hosts文件只可以实现域名到ip的映射(域名重定向),iptables可以实现端口的重定向,但是这个问题是要通过ip到ip的重定向可以解决,但是研究了一下没找到怎么设置(windows、mac),所以我们修改canal的官方源码来达到我们想要的目的。修改ClusterCanalConnector.java中的connect()方法。

以下是修改后内容对比图

es = 0; Erue){
currentConnector=new SimpleCanalConnector( address: null, username,password,(
@Override
public SocketAddress getNextAddress() {
return accessStrategy.nextNode();
};
currentConnector.setSoTimeout(soTimeout);
currentConnector.setIdleTimeout(idleTimeout); if (filter != null){
currentConnector.setFilter(filter);
@Override
public SocketAddress getNextAddress() {
SocketAddress address = accessStrategy.nextNode(); if (address instanceof InetSocketAddress){
String hostName =((InetSocketAddress)address).getHostName(); int port =((InetSocketAddress) address).getPort();
// 截取zookeeper上canalServer的ip前3位与宿主机ip前3位做比较
if(hostName !=null&&!hostName.substring(0,3).equals(AddressUti
// 如果不是同一网段则使用宿主机IP
address=newInetSocketAddress(AddressUtils.getHostIp(),port}
return address;
return address

关于选型的取舍

a1c771adc8ab9f0983989f68aa4529e.png

本文示例项目源代码==>canal-elasticsearch-sync



相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
关系型数据库 MySQL Linux
Docker安装Mysql5.7,解决无法访问DockerHub问题
当 Docker Hub 无法访问时,可以通过配置国内镜像加速来解决应用安装失败和镜像拉取超时的问题。本文介绍了如何在 CentOS 上一键配置国内镜像加速,并成功拉取 MySQL 5.7 镜像。
281 2
Docker安装Mysql5.7,解决无法访问DockerHub问题
|
15天前
|
关系型数据库 MySQL Docker
docker环境下mysql镜像启动后权限更改问题的解决
在Docker环境下运行MySQL容器时,权限问题是一个常见的困扰。通过正确设置目录和文件的权限,可以确保MySQL容器顺利启动并正常运行。本文提供了多种解决方案,包括在主机上设置正确的权限、使用Dockerfile和Docker Compose进行配置、在容器启动后手动更改权限以及使用 `init`脚本自动更改权限。根据实际情况选择合适的方法,可以有效解决MySQL容器启动后的权限问题。希望本文对您在Docker环境下运行MySQL容器有所帮助。
29 1
|
1月前
|
关系型数据库 MySQL 数据库
使用Docker部署的MySQL数据库,数据表里的中文读取之后变成问号,如何处理?
【10月更文挑战第1天】使用Docker部署的MySQL数据库,数据表里的中文读取之后变成问号,如何处理?
59 3
|
1月前
|
关系型数据库 MySQL 数据库
使用Docker部署的MySQL数据库如何设置忽略表名大小写?
【10月更文挑战第1天】使用Docker部署的MySQL数据库如何设置忽略表名大小写?
137 1
|
1月前
|
弹性计算 关系型数据库 MySQL
Docker安装MySQL
这篇文章详细介绍了如何使用Docker安装MySQL数据库服务,包括拉取镜像、配置数据卷以及启动容器的步骤。
286 0
Docker安装MySQL
|
1月前
|
关系型数据库 MySQL 数据库
如何使用Docker部署MySQL数据库?
【10月更文挑战第1天】如何使用Docker部署MySQL数据库?
164 0
|
1月前
|
关系型数据库 MySQL 数据库
docker mysql表名和数据库名不区分大小写
docker mysql表名和数据库名不区分大小写
18 0
|
6月前
|
SQL 分布式计算 监控
在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
【2月更文挑战第32天】在数据传输服务(DTS)中,要查看每个小时源端产生了多少条数据
66 6
|
6月前
|
存储 SQL NoSQL
数据传输DTS同步问题之同步失败如何解决
数据传输服务(DTS)是一项专注于数据迁移和同步的云服务,在使用过程中可能遇到多种问题,本合集精选常见的DTS数据传输问题及其答疑解惑,以助用户顺利实现数据流转。
|
6月前
|
Cloud Native NoSQL 关系型数据库
数据传输DTS校验问题之校验报错如何解决
数据传输服务(DTS)是一项专注于数据迁移和同步的云服务,在使用过程中可能遇到多种问题,本合集精选常见的DTS数据传输问题及其答疑解惑,以助用户顺利实现数据流转。