cancal 同步mysql数据到es中

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: cancal 同步mysql数据到es中

1.环境:

windocs service2012 、 jdk版本1.8 、canal版本1.5、mysql版本5.7、

注意:canal版本1.5需要的jdk是1.8   如果你下载的是canal1.6,jdk是1.8,那样会报错。

下载地址  Releases · alibaba/canal · GitHub  下载并上传到服务器

三个文件  

canal.adapter   客户端

canal.admin   后台web端

canal.deployer 服务端

2.mysql配置:

mysq 开启日志  mysql配置添加

user=mysql
slow_query_log = 1
log_error = /home/data/mysql57/data/mysql.err
slow_query_log  = ON     #开启慢查询
long_query_time  =4     #设置慢查询时间 超过一秒的记录
server_id = 57
log-bin=mysql-bin
log-bin-index=master-bin.index
expire_logs_days = 7 
binlog_format=row
slave_skip_errors=1062
log_slave_updates=1
max_connections = 1000
wait_timeout=864000 
interactive_timeout=864000

添加用户权限

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'Canal@123456' WITH GRANT OPTION;    
flush privileges;

创建数据库

create database bigdata default charset utf8;

3.配置服务器端  canal.deployer

在 conf/example/instance.properties

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

我只配了两个地方

启动:进入 canal.deployer的目录 ./bin/startup.sh

4.配置客户端 canal.adapter

配置:配置客户端

vi application.yml  配置两个地方,一个是要同步的mysql数据库,一个是es配置

这是配置

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/bigdata?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
      username: canal
      password: canal
    #defaultDS2:
       #url: jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true
       #username: canal
       #password: 123456        
  canalAdapters:
   # canal instance Name or mq topic name
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200
        properties:
          mode: rest # or transport 
          security.auth: 
          cluster.name: my-application

这是同步的测试脚本

全量/增量更新 es7/mytest-user.yml 文件内容

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: detail202304
  _id: _id
#  upsert: true
#  pk: id
  sql: "select a.id as _id,a.id from t_data_order_detail2020 a"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.id ={}"
  commitBatch: 3000

配置完启动客户端:进到安装目录

查看启动日志

全量更新为向Adapter发送POST更新

- postMan发送请求

curl  http://127.0.0.1:8081/etl/es7/mytest-user.yml
curl -X POST  http://127.0.0.1:8081/etl/es7/mytest-user.yml

额添加成功。剩下的就是java的事了。

可以通过jps查看

5.遇到的问题 Task not found

报错:

[root@hbyc ~]# curl -X POST  http://127.0.0.1:8081/etl/es7/t_data_order_detail202304.yml

{"succeeded":false,"errorMessage":"Task not found"}

解决:

canal的客户端 canal.adapter 在修改完配置需要重新启动客户端,要不然找不到配置文件。

每次修改配置文件需要重启客户端!!!

每次修改配置文件需要重启客户端!!!

每次修改配置文件需要重启客户端!!!

剩下的就是查看添加索引了

es添加索引命令行和浏览器添加索引--图文详解_舰长115的博客-CSDN博客

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
4天前
|
关系型数据库 MySQL API
实时计算 Flink版产品使用合集之可以通过mysql-cdc动态监听MySQL数据库的数据变动吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
79 0
|
3天前
|
关系型数据库 MySQL OLAP
实时计算 Flink版产品使用合集之可以支持 MySQL 数据源的增量同步到 Hudi 吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
28 4
|
1天前
|
自然语言处理 监控 关系型数据库
mysql造数据占用临时表空间
【5月更文挑战第20天】MySQL在处理复杂查询时可能使用临时表,可能导致性能下降。临时表用于排序、分组和连接操作。常见问题包括内存限制、未优化的查询、数据类型不当和临时表清理。避免过度占用的策略包括优化查询、调整系统参数、优化数据类型和事务管理。使用并行查询、分区表和监控工具也能帮助管理临时表空间。通过智能问答工具如通义灵码,可实时续写SQL和获取优化建议。注意监控`Created_tmp_tables`和`Created_tmp_disk_tables`以了解临时表使用状况。
113 5
|
2天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之用CTAS从mysql同步数据到hologres,改了字段长度,报错提示需要全部重新同步如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
45 8
|
3天前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
12 2
|
4天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之2.2.1版本同步mysql数据写入doris2.0 ,同步完了之后增量的数据延迟能达到20分钟甚至一直不写入如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
6天前
|
关系型数据库 MySQL 数据库
docker MySQL删除数据库时的错误(errno: 39)
docker MySQL删除数据库时的错误(errno: 39)
60 0
|
6天前
|
Java 关系型数据库 MySQL
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口(下)
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口
43 0
|
6天前
|
Java 关系型数据库 MySQL
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口(上)
【MySQL × SpringBoot 突发奇想】全面实现流程 · xlsx文件,Excel表格导入数据库的接口
47 0
|
6天前
|
前端开发 关系型数据库 MySQL
【MySQL × SpringBoot 突发奇想】全面实现流程 · 数据库导出Excel表格文件的接口
【MySQL × SpringBoot 突发奇想】全面实现流程 · 数据库导出Excel表格文件的接口
37 0