基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输

canal的介绍

canal的历史由来

在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了canal,主要是基于trigger(触发器)的方式获取增量变更。从 2010 年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。

当前的 canal 支持的数据源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)

canal的应用场景

目前普遍基于日志增量订阅和消费的业务,主要包括

  • 基于数据库增量日志解析,提供增量数据订阅和消费
  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

canal的工作原理

在介绍canal的原理之前,我们先来了解下MySQL主从复制的原理

MySQL主从复制原理

084cc60905975a6c5f6d1d6ce8ec4ae.png

  • MySQL master 将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log events,可以通过show binlog events命令进行查看
  • MySQL slave 会将 master 的binary log中的binary log events 拷贝到它的中继日志relay log
  • MySQL slave 重读并执行relay log 中的事件,将数据变更映射到它自己的数据库表中

了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?

canal工作原理

4ccf0d7f6d93ab639121986742ca187.png

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (也就是 canal )

canal 解析 binary log 对象(数据为byte流)

基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现mysql实时增量数据传输的功能。

既然canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。

canal的Docker环境准备

因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。

什么是Docker

相信绝大多数人都使用过虚拟机Vmware,在使用Vmware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且Vmware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。

为了便于大家快速理解Docker,便与Vmware做对比来做介绍,docker 提供了一个开始,打包,运行app的平台,把app(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似Vmware的系统镜像)与容器(类似Vmware里安装的系统)

什么是Image(镜像)

  • 文件和meta data的集合(root filesystem)
  • 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
  • 不同的image可以共享相同的layer
  • Image本身是read-only的

fbbf09c8760dc1354e119dfba06c618.png

什么是Container(容器)

  • 通过Image创建(copy)
  • 在Image layer 之上建立一个container layer(可读写)
  • 类比面向对象:类和实例
  • Image负责app的存储和分发,Container负责运行app
  • 096a980a5fb626a66c43e93335950c8.png

Docker的网络介绍

Docker的网络类型有三种:

bridge:桥接网络

默认情况下启动的Docker容器,都是使用 bridge,Docker安装时创建的桥接网络,
每次Docker容器重启时,会按照顺序获取对应的IP地址,
这个就导致重启下,Docker的IP地址就变了

none:无指定网络

使用 --network=none ,docker 容器就不会分配局域网的IP

host:主机网络

使用 --network=host,此时,Docker 容器的网络会附属在主机上,两者是互通的。
例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。

创建自定义网络:(设置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork

查看存在的网络类型docker network ls

ThahadeMacBook-Pro:~ hana$ docker network ls
NETWORK ID  NAME  DRIVER  SCOPE 
547d75277780  bridge  bridge  local 
cd57bd432a9b  host  host  local 
82354ec0c7d2  mvnetwork bridge  local 
270f35f46b8f  none  null  local 

搭建canal环境

附上Docker的下载安装地址==> Docker Download

下载canal镜像docker pull canal/canal-server

rootedefault:"# docker pull canal/canal-server Using default taq: latest
latest: Pulling from canal/canal-server
1c8f9aa56c90:Downloading 2.161 MB/69.8 MB c5e21c824d1c: Dowmloading 2.047 MB/50.09 MB4ba7edb60123:Waiting5160a1c76c08:Waiting adbb838a0064:Waiting
ab50c0db3511:Waiting  

下载mysql镜像docker pull mysql,下载过的则如下图

hahadeMacBook-Pro:~ haha$ docker pull mysql Using default tag: latest
latest: Pulling from library/mysgl
Digest:sha256:415ac63da0ae6725d5aefc9669a1c02f39a00c574fdbc478dfd08 db1e97c8f1b
Status: Image is up to date for mysgl:latest  

查看已经下载好的镜像docker images

hahadeMacBook-Pro:~ haha$ docker images
REPOSITORY  TAG IMAGE ID  CR  REATED  SIZE  
3
rabbitmq  3.7.15-management 1482b87815ec  weeks ago 192MB 
mysql latest  c7109f74d339  weeks ago 443MB 
6
kibana  7.1.1 67f17df6ca3e  weeks ago 746MB 
elasticsearch 7.1.1 b0e9f9f047e6  6 weeks ago 894MB 
3
canal/canal-server  latest  b9c3d95520a5  months ago  831MB 
zookeeper 3.4.13  4ebfb9474e72  months ago  150MB 
mobz/elasticsearch-head 5-alpine  e2a76963bc18  2 years ago 78.9MB  

接下来通过镜像生成mysql容器与canal-server容器

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip   #指定分配ip

查看docker中运行的容器docker ps

hahadeMacBook-Pro:~ haha$ docker ps
CONTAINER ID  IMAGE COMMAND CREATED STATUS  PORTS NAM 
ES
8254af09f19d  mysql "docker-entry point.s." 21 hours ago  Up 11 hours 0.0.0.0:3306->33  06/tcp,33060/tcp  mys 
91
4d5bf7a14209  mobz/elasticsearch-head:5-alpine  "/bin/sh -c node_mo..." 21 hours ago  Up 11 hours 0.0.0.0:9100->91  00/tcp  ela 
sticsearch-head
ca198845f55c  canal/canal-server  "/alidata/bin /main.s..." 21 hours ago  Up 32 minutes 2222/tcp,8000/t cp,11112/tcp,0.0.0.0:11111->11111/tcp can 
al-server
sticsesren  b5c1e3a581f4  elasticsearch:7.1.1 "/usr/local/b in/dock.."  21 hours ago  Up 11 hours 0.0.0.0:9200->92  00/tcp,0.0.0.0:9300->9300/tcp 

MySQL的配置修改

以上只是初步准备好了基础的环境,但是怎么让canal伪装成salve并正确获取mysql中的binary log呢?

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,通过修改mysql配置文件来开启bin_log,使用 find / -name my.cnf 查找my.cnf, 修改文件内容如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

进入mysql容器docker exec -it mysql bash

创建链接MySQL的账号canal并授予作为 MySQL slave 的权限, 如果已有账户可直接 GRANT

mysql -uroot -proot
# 创建账号
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新并应用
FLUSH PRIVILEGES;

数据库重启后, 简单测试 my.cnf 配置是否生效

mysgl> show variables like 'log bin';
----------
Variable name |Value |
----  .+--- --- 
log_bin ION 
--  ----- ----- 
1 row in set (0.0l sec)
mysgl> show variables like 'binlog format';
------  --- 
Variable name /Value 
---
binlog format |ROW
- ------- --- 
1 row in set (0.00 sec)
mysgl> show master status;
---
| File  Position|Binlog Do DB | Binlog Ignore DB | Executed Gtid Set l  
--  --- 
---
mysql-bin.0000046160/
1 row in set (0.00 sec)
mysgl> show master status\G;
*************************** 1. roW *********************
File:mysql-bin.000004 Position:6160 Binlog_Do DB: Binlog Ignore DB: Executed Gtid Set:
I row in set (0.00 sec) 
show variables like 'log_bin';
show variables like 'log_bin';
show master status;

canal-server的配置修改

进入canal-server容器

docker exec -it canal-server bash

编辑canal-server的配置

vi canal-server/conf/example/instance.properties
#################################################
mvsal servertd_ v1 a 26+ will autoGen canal.instance.mvsgl.slaveId=1234
# enable gtid use true/false canal.instance.gtidon=false
改成自己的数据库信息
# nosition info
canal.instance.master.address=172.18.0.6:3306 canal.instance.master.iournal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysq1://127.0.0.13306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standbv.iournal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=
# username/password 改成自己的数据库信息  
canal.instance.dbUsername=canal canal.instance.dbPassword=canal
#canal.instance.defaultDatabaseName=studentdb canal.instance.connectionCharset =UTF-8# enable druid Decrypt database password
canal.instance.enableDruid=false  @程十掘金技术社区 
#canal.instance.pwdPublicKey=MFWWDQYJKoZIhVCNAQEBBQADSWAWSAJBALK4BUxdD1tRRE5/zXpVEVPUgunvscYFtEip3pmL1hrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfi

更多配置请参考==>canal配置说明

重启canal-server容器docker restart canal-server进入容器查看启动日志

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
019-07-09 17:33:09.620 [destination = example address = /172.18.0.6:3306 EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEve ntParserProxy----> find start position su essfully,EntryPosition[included=falseiournalName=binlog.000004,pos ition=1899,serverId=1,gtid=,timestamp=1562655804000] cost :120ms the next step is binlog dump019-07-09 21:26:32.816 [main] INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [canal.properties]
019-07-09 21:26:32.822 [main]INFO c.a.o.c.i.spring.support.Propert PlaceholderConfigurer -Loading properties file from class path res ource [example/instance.properties]
019-07-09 21:26:33.304 [main] WARN o.s.beans.GenericTypeAwareProper tyDescriptor-Invalid JavaBean property 'connectionCharset' being a ccessed! Ambiguous write methods found next o actually used [public void com.alibaba.otter.canal.parse.inbound.m sql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pan inbound.msa1.AbstractMvsqEventParser.setConnectionCharsetiava.ni o.charset.charset)]
019-07-09 21:26:33.429 [main] INFO c.a.o.c.i.spring.support.Property yPlaceholderConfigurer - Loading properties file from class path reso ource [canal.properties]
919-07-09 21:26:33.430 [main] INFO c.a.o.c.i.spring.support.property paceholderConfiaurer -Loading properties file from class path reso ource [example/instance.properties919-07-09 21:26:33.986main ERROR com.alibaba.druid.pool.DruidData Source-testWhileIdle is true, validationQuery not set
019-07-09 21:26:34.729  [main] INFO c.a.otter.canal.instance.spring.  CanalInstanceWithSpring - start CannalInstance for 1-example  
019-07-09 21:26:34.752 [main] WARN  c.a.o.canal.parse.inbound.mysgl.l dbsync.LogEventConvert ---> init table filter :^.*\..*$ 
019-07-09 21:26:34.753  [main] WARN c.a.o.canal.parse.inbound.mysgl.  dbsync.LogEventConvert ---> init table black filter : 
019-07-09 21:26:34.794 [main] INFO  c.a.otter.canal.instance.core.Ab: stractCanalInstance - start successful. 
019-07-09 21:26:35.107 [destination = example   address = /172.18.0.  6:3306  EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEver ntParserProxy - ---> begin to find start po 
tion, it will be long time for reset or first position  

至此,我们的环境工作准备完成!!!

拉取数据并同步保存到ElasticSearch

本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取canal解析后的binlog数据。首先我们基于spring boot搭建一个canal demo应用。结构如下图所示

cb3da1287ac93e1770e6e229ddf9156.png

Student.java

package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
/**
 * 普通的实体domain对象
 * @Data 用户生产getter、setter方法
  */
@Data
public class Student implements Serializable {
    private String id;
    private String name;
    private int age;
    private String sex;
    private String city;
}

CanalConfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
 * 配置一些跟canal相关到配置与公共bean
 * @author haha
 */
@Configuration
public class CanalConfig {
    // @Value 获取 application.properties配置中端内容
    @Value("${canal.server.ip}")
    private String canalIp;
    @Value("${canal.server.port}")
    private Integer canalPort;
    @Value("${canal.destination}")
    private String destination;
    @Value("${elasticSearch.server.ip}")
    private String elasticSearchIp;
    @Value("${elasticSearch.server.port}")
    private Integer elasticSearchPort;
    @Value("${zookeeper.server.ip}")
    private String zkServerIp;
    /**
     * 获取简单canal-server连接
      */
    @Bean
    public CanalConnector canalSimpleConnector() {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
        return canalConnector;
    }
    /**
     * 通过连接zookeeper获取canal-server连接
      */
    @Bean
    public CanalConnector canalHaConnector() {
        CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
        return canalConnector;
    }
    /**
     * elasticsearch 7.x客户端
      */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
        );
        return client;
    }
}

CanalDataParser.java

由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从github上获取

/**
     * 元祖类型的对象定义
     * @param <A>
     * @param <B>
     */
    public static class TwoTuple<A, B> {
        public final A eventType;
        public final B columnMap;
        public TwoTuple(A a, B b) {
            eventType = a;
            columnMap = b;
        }
    }
    /**
     * 解析canal中的message对象内容
     * @param entrys
     * @return
     */
    public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
        List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
        for (Entry entry : entrys) {
            // binlog event的事件事件
            long executeTime = entry.getHeader().getExecuteTime();
            // 当前应用获取到该binlog锁延迟的时间
            long delayTime = System.currentTimeMillis() - executeTime;
            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // 当前的entry(binary log event)的条目类型属于事务
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程id,事务耗时
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()),
                                    simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(),
                                    String.valueOf(delayTime)});
                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务id
                    logger.info("----------------\n");
                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    logger.info(transaction_format,
                            new Object[]{entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime)});
                }
                continue;
            }
            // 当前entry(binary log event)的条目类型属于原始数据
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    // 获取储存的内容
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }
                // 获取当前内容的事件类型
                EventType eventType = rowChage.getEventType();
                logger.info(row_format,
                        new Object[]{entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                                entry.getHeader().getTableName(), eventType,
                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                entry.getHeader().getGtid(), String.valueOf(delayTime)});
                // 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }
                printXAInfo(rowChage.getPropsList());
                // 循环当前内容条目的具体数据
                for (RowData rowData : rowChage.getRowDatasList()) {
                    List<CanalEntry.Column> columns;
                    // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }
                    HashMap<String, Object> map = new HashMap<>(16);
                    // 循环把列的name与value放入map中
                    for (Column column: columns){
                        map.put(column.getName(), column.getValue());
                    }
                    rows.add(new TwoTuple<>(eventType, map));
                }
            }
        }
        return rows;
    }

ElasticUtils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;
/**
 * es的crud工具类
 * @author haha
 */
@Slf4j
@Component
public class ElasticUtils {
    @Autowired
    private  RestHighLevelClient restHighLevelClient;
    /**
     * 新增
     * @param student
     * @param index 索引
     */
    public  void saveEs(Student student, String index) {
        IndexRequest indexRequest = new IndexRequest(index)
                .id(student.getId())
                .source(JSON.toJSONString(student), XContentType.JSON)
                .opType(DocWriteRequest.OpType.CREATE);
        try {
            IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("保存数据至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("保存数据至elasticSearch失败: {}", e);
        }
    }
    /**
     * 查看
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void getEs(String index, String id) {
        GetRequest getRequest = new GetRequest(index, id);
        GetResponse response = null;
        try {
            response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            Map<String, Object> fields = response.getSource();
            for (Map.Entry<String, Object> entry : fields.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue());
            }
        } catch (Exception e) {
            log.error("从elasticSearch获取数据失败: {}", e);
        }
    }
    /**
     * 更新
     * @param student
     * @param index 索引
     * @throws Exception
     */
    public  void updateEs(Student student, String index)  {
        UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
        updateRequest.doc(JSON.toJSONString(student), XContentType.JSON);
        UpdateResponse response = null;
        try {
            response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("更新数据至ElasticSearch成功:{}", response.getId());
        } catch (Exception e) {
            log.error("更新数据至elasticSearch失败: {}", e);
        }
    }
    /**
     * 根据id删除数据
     * @param index 索引
     * @param id _id
     * @throws Exception
     */
    public  void DeleteEs(String index, String id) {
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        DeleteResponse response = null;
        try {
            response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            log.info("从elasticSearch删除数据成功:{}", response.getId());
        } catch (Exception e) {
            log.error("从elasticSearch删除数据失败: {}", e);
        }
    }
}

BinLogElasticSearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
 * 获取binlog数据并发送到es中
 *
 * @author haha
 */
@Slf4j
@Component
public class BinLogElasticSearch {
    @Autowired
    private CanalConnector canalSimpleConnector;
    @Autowired
    private ElasticUtils elasticUtils;
    //@Qualifier("canalHaConnector")使用名为canalHaConnector的bean
    @Autowired
    @Qualifier("canalHaConnector")
    private CanalConnector canalHaConnector;
    public void binLogToElasticSearch() throws IOException {
        openCanalConnector(canalSimpleConnector);
        // 轮询拉取数据
        Integer batchSize = 5 * 1024;
        while (true) {
//            Message message = canalHaConnector.getWithoutAck(batchSize);
            Message message = canalSimpleConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();
            log.info("当前监控到binLog消息数量{}", size);
            if (id == -1 || size == 0) {
                try {
                    // 等待4秒
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //1. 解析message对象
                List<CanalEntry.Entry> entries = message.getEntries();
                List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
                for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
                    if (tuple.eventType == CanalEntry.EventType.INSERT) {
                        Student student = createStudent(tuple);
                        // 2。将解析出的对象同步到elasticSearch中
                        elasticUtils.saveEs(student, "student_index");
                        // 3.消息确认已处理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.UPDATE) {
                        Student student = createStudent(tuple);
                        elasticUtils.updateEs(student, "student_index");
                        // 3.消息确认已处理
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                    if (tuple.eventType == CanalEntry.EventType.DELETE) {
                        elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
                        canalSimpleConnector.ack(id);
//                        canalHaConnector.ack(id);
                    }
                }
            }
        }
    }
    /**
     * 封装数据至Student对象中
     *
     * @param tuple
     * @return
     */
    private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
        Student student = new Student();
        student.setId(tuple.columnMap.get("id").toString());
        student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
        student.setName(tuple.columnMap.get("name").toString());
        student.setSex(tuple.columnMap.get("sex").toString());
        student.setCity(tuple.columnMap.get("city").toString());
        return student;
    }
    /**
     * 打开canal连接
     *
     * @param canalConnector
     */
    private void openCanalConnector(CanalConnector canalConnector) {
        //连接CanalServer
        canalConnector.connect();
        // 订阅destination
        canalConnector.subscribe();
    }
    /**
     * 关闭canal连接
     *
     * @param canalConnector
     */
    private void closeCanalConnector(CanalConnector canalConnector) {
        //关闭连接CanalServer
        canalConnector.disconnect();
        // 注销订阅destination
        canalConnector.unsubscribe();
    }
}

CanalDemoApplication.java(spring boot 启动类)

package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * 应用的启动类
 * @author haha
 */
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
    @Autowired
    private BinLogElasticSearch binLogElasticSearch;
    public static void main(String[] args) {
        SpringApplication.run(CanalDemoApplication.class, args);
    }
    // 程序启动则执行run方法
    @Override
    public void run(ApplicationArguments args) throws Exception {
        binLogElasticSearch.binLogToElasticSearch();
    }
}

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200

canal集群高可用的搭建

通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canal的多实例集群方式如何搭建呢!

基于zookeeper获取canal实例

准备zookeeper的docker镜像与容器

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

最终效果如图

hahadeMacBook-Pro:~ haha$ docker ps
CONTAINER ID  IMAGE COMMAND CREATED STATUS  PORTS 
NAMES
3f920cfac809  zookeeper zookeeper "/docker-entr rypoint..." 13 hours ago  Up 13 hours 2888/tcp,3888/t ccp,0.0.0.0:2181->2181/tcp,8080/tcp 
75f69fbd93a9  canal-server2:1.0 "/alidata/bin /main.s..." 22 hours ago  Up 12 hours 2222/tcp,8000/t tcp,11111-11112/tcp,  0.0.0.0:11113->11113/tcp  
canal-server2
8254af09f19d  mysql "docker-entry ypoint.s..."  47 hours ago  Up 13 hours 0.0.0.0:3306->33  306/tcp,33060/tcp 
mysql
4d5bf7a14209  mobz/elasticsearch-head:5-alpine  "/bin/sh -c node_mo..." 47 hours ago  Up 13 hours 0.0.0.0:9100->91  100/tcp 
elasticsearch-head
ca198845f55c  canal/canal-server  "/alidata/bin /main.s..." 47 hours ago  Up 2 hours  2222/tcp,8000/t tcp,11112/tcp,  0.0.0.0:11111->11111/tcp  
b5c1e3a581f4  elasticsearch:7.1.1 "/usr/local/b in/dock..." 47 hours ago  Up 13 hours 0.0.0.0:9200->92  200/tcp,0.0.0.0:9300->9300/tcp
  1. 机器准备
  • 运行canal的容器ip: 172.18.0.4 , 172.18.0.8
  • zookeeper容器ip:172.18.0.3:2181
  • mysql容器ip:172.18.0.6:3306
  1. 按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
  2. 修改canal.properties,加上zookeeper配置并修改canal端口
canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  1. 创建example目录,并修改instance.properties
canal.instance.mysql.slaveId = 1235 
#之前的canal slaveId是1234,保证slaveId不重复即可
canal.instance.master.address = 172.18.0.6:3306

注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置

启动两个不同容器的canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是 172.18.0.4

2019-07-09 21:26:36.215 [destination = example , address = /172.18.0.6:3306,EventParser WARN c.a.o.c.p.inbound.mvsal.rds.RdsBinloaE entParserProxy - ---> find start position suc cessfully,EntryPosition[included=falsejournalName=binlog.000004,po osition=2098,serverId=1,gtid=,timestamp=1562672817000] cost :1085ms, the next step is binlog dump2019-07-09 22:21:27.458Thread-7INFO c.a.otter.canal.instance.co ore.AbstractCanalInstance - stop CannalInstance for null-example2019-07-09 22:21:27.586Thread-7] INFO c.a.otter.canal.instance.co are.AbstractCanalInstance - stop successful....
2019-07-09 22:57:58.581 [main] INFO c.a.o.c.i.spring.support.Proper tyPlaceholderConfigurer-Loading properties file from class path re  esource [canal.properties]  
2019-07-09 22:57:58.602 [main] INFO c.a.o.c.i.spring.support.Proper tvplaceholderConfigurer -Loading properties file from class path re source [example/instance.properties]  
2019-07-09 22:57:59.278 [main] WARN o.s.beans.GenericTypeAwarePrope ertyDescriptor -Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound. mysq1.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.pars e.inbound.mvsa1.AbstractMvsaEventParser.setConnectionCharsetiava.n io.charset.Charset)]
2019-07-09 22:57:59.366 [main] INFO c.a.o.c.i.spring.support.Proper rtyPlaceholderConfigurer - Loading properties file from class path re source [canal.properties] 
2019-07-09 22:57:59.367 [main] INFO c.a.o.c.i.spring.support.proper tyPlaceholderConfigurer -Loading properties file from class path re source [example/instance.properties]  
2019-07-09 22:57:59.836[main] ERROR com.alibaba.druid.pool.DruidDat aSource - testWhileIdle is true, validationQuerv not set
2019-07-09 22:58:00.497 [main] INFO c.a.otter.canal.instance.spring Cana InstanceWithSpring - start Canna instance for 1-example  
2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] dbsvnc.ogEventConvert ---> init table filter:^,*\.,*$ 
2019-07-09 22:58:00.527 [main] WARN c.a.o.canal.parse.inbound.mysq] adbsync.LogEventConvert - --> init table black filter : 
2019-07-09 22:58:00.569 [main] INFO c.a.otter.canal.instance.core.A ostractCanalInstance -start successful....  
2019-07-09 22:58:00.894 [destination = example , address = /172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - ---> begin to find start pos ition, it will be long time for reset or first position
2019-07-09 22:58:00.900 [destination = example address=/172.18.0.6:3306EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEv entParserProxy - prepare to find start positi
on just show master status

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running  
{"active":true,"address":"172.18.0.4:11111","cid":1}

客户端链接, 消费数据

可以通过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}

对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");

链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等 (聪明的你,应该也可以发现,canal client也可以支持HA功能)

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001}

数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点.  (下次你重启client时,会从这最后一个位点继续进行消费)

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}

停止正在工作的172.18.0.4的canal server

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh

这时172.18.0.8会立马启动example instance,提供新的数据服务

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}

与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成

异常与总结

elasticsearch-head无法访问elasticsearch

es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml  
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重启es服务

elasticsearch-head查询报406 Not Acceptable

▼ General
RequestURL: http://127.0.0.1:9200/student index/ search Request Method: POST
Status Code:  406 Not Acceptable  
Remote Address: 127.0.0.1:9200
Referrer Policy: no-referrer-when-downgrade
▼Response Headers
access-control-allow-origin: *
content-encoding:qzip
解决方法:
1、进入head安装目录;
2、cd _site/
3、编辑vendor.js  共有两处
     #6886行   contentType: "application/x-www-form-urlencoded
    改成 contentType: "application/json;charset=UTF-8"
     #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
    改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&

使用elasticsearch-rest-high-level-client报org.elasticsearch.action.index.IndexRequest.ifSeqNo

#pom中除了加入依赖
<dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>7.1.1</version>
</dependency>
#还需加入
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.1.1</version>
</dependency>

相关参考 git hub issues

为什么ElasticSearch要在7.X版本不能使用type?

参考:为什么ElasticSearch要在7.X版本去掉type?

使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考RestHighLevelClient API

设置docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always [containerID]

docker for Mac network host 模式不生效

host 模式是为了性能,但是这却对 docker 的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用 --netwokr host 开启 Host 模式,但需要注意的是,如果你用 Windows 或 Mac 本地启动容器的话,会遇到 host 模式失效的问题。原因是 host 模式只支持 Linux 宿主机。

参见官方文档:docs.docker.com/network/hos…

客户端连接zookeeper报authenticate using SASL(unknow error)

Opening socket connection to server 192.168.124.5/192.168.124.5:2181. Will not attempt to authenticate using SASL (unknown error):Session 0x0 for server null, unexpected error, closing soc ket connection and attempting reconnect
  • zookeeper.jar与dokcer中的zookeeper版本不一致
  • zookeeper.jar 使用了3.4.6之前的版本

出现这个错的意思是zookeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。

在项目代码中加入

System.setProperty("zookeeper.sasl.client", "false");

,如果是spring boot 项目可以在application.properties中加入

zookeeper.sasl.client=false

参考:Increased CPU usage by unnecessary SASL checks

如果更换canal.client.jar中依赖的zookeeper.jar的版本

把canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git,然后修改client模块下pom.xml文件中关于zookeeper的内容,然后重新mvn install

<artifactId>netty-all</artifactId></dependency><!-- zk --><dependency>
<groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.5</version></dependency><dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId></dependency>
<!-- external --><dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId></dependency><dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId></dependency><dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId></dependency><dependency>
<groupId>com.alibaba</groupId>  

把自己项目依赖的包替换为刚刚mvn install生产的包

<version>7.1.1</version></dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
<dependency>
<groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId> k!_-<version>1.1.3</version>-
<version>1.1.4-SNAPSH0T</version></dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId><version>2.6</version></dependency>
project >dependencies >dependency >artifactld

zookeeper返回的是docker容器中的ip,而宿主机ip与容器ip不是同一个网段,无法ping通

修改hosts文件只可以实现域名到ip的映射(域名重定向),iptables可以实现端口的重定向,但是这个问题是要通过ip到ip的重定向可以解决,但是研究了一下没找到怎么设置(windows、mac),所以我们修改canal的官方源码来达到我们想要的目的。修改ClusterCanalConnector.java中的connect()方法。

以下是修改后内容对比图

es = 0; Erue){
currentConnector=new SimpleCanalConnector( address: null, username,password,(
@Override
public SocketAddress getNextAddress() {
return accessStrategy.nextNode();
};
currentConnector.setSoTimeout(soTimeout);
currentConnector.setIdleTimeout(idleTimeout); if (filter != null){
currentConnector.setFilter(filter);
@Override
public SocketAddress getNextAddress() {
SocketAddress address = accessStrategy.nextNode(); if (address instanceof InetSocketAddress){
String hostName =((InetSocketAddress)address).getHostName(); int port =((InetSocketAddress) address).getPort();
// 截取zookeeper上canalServer的ip前3位与宿主机ip前3位做比较
if(hostName !=null&&!hostName.substring(0,3).equals(AddressUti
// 如果不是同一网段则使用宿主机IP
address=newInetSocketAddress(AddressUtils.getHostIp(),port}
return address;
return address

关于选型的取舍

a1c771adc8ab9f0983989f68aa4529e.png

本文示例项目源代码==>canal-elasticsearch-sync



相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
5月前
|
canal 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行整库同步MySQL数据到StarRocks时,遇到全量数据可以同步,但增量数据无法同步,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
474 4
|
3月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
651 0
|
4月前
|
关系型数据库 MySQL 数据库
关系型数据库mysql数据增量恢复
【7月更文挑战第3天】
159 2
|
5月前
|
分布式计算 大数据 关系型数据库
MaxCompute产品使用问题之如何实现MySQL的实时增量同步
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
canal 关系型数据库 MySQL
蓝易云 - 详解canal同步MySQL增量数据到ES
以上就是使用Canal同步MySQL增量数据到Elasticsearch的基本步骤。在实际操作中,可能还需要根据具体的业务需求和环境进行一些额外的配置和优化。
151 2
|
18天前
|
存储 SQL 关系型数据库
Mysql学习笔记(二):数据库命令行代码总结
这篇文章是关于MySQL数据库命令行操作的总结,包括登录、退出、查看时间与版本、数据库和数据表的基本操作(如创建、删除、查看)、数据的增删改查等。它还涉及了如何通过SQL语句进行条件查询、模糊查询、范围查询和限制查询,以及如何进行表结构的修改。这些内容对于初学者来说非常实用,是学习MySQL数据库管理的基础。
74 6
|
15天前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
50 3
Mysql(4)—数据库索引
|
18天前
|
SQL Ubuntu 关系型数据库
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
本文为MySQL学习笔记,介绍了数据库的基本概念,包括行、列、主键等,并解释了C/S和B/S架构以及SQL语言的分类。接着,指导如何在Windows和Ubuntu系统上安装MySQL,并提供了启动、停止和重启服务的命令。文章还涵盖了Navicat的使用,包括安装、登录和新建表格等步骤。最后,介绍了MySQL中的数据类型和字段约束,如主键、外键、非空和唯一等。
56 3
Mysql学习笔记(一):数据库详细介绍以及Navicat简单使用
|
4天前
|
存储 关系型数据库 MySQL
MySQL vs. PostgreSQL:选择适合你的开源数据库
在众多开源数据库中,MySQL和PostgreSQL无疑是最受欢迎的两个。它们都有着强大的功能、广泛的社区支持和丰富的生态系统。然而,它们在设计理念、性能特点、功能特性等方面存在着显著的差异。本文将从这三个方面对MySQL和PostgreSQL进行比较,以帮助您选择更适合您需求的开源数据库。
18 4

热门文章

最新文章