springcloud如何使用canal监听mysql数据库操作 canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。
演示的是canal项目单独作为一个微服务
配置
安装与解压
- 下载地址:https://github.com/alibaba/canal/releases/tag/canal-1.1.4
- 将压缩包放入linux的
/usr/local/soft
文件夹中
- 解压到指定文件夹
tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal
- 解压完成后,进入 /usr/local/canal 目录,可以看到如下结构
配置
数据库配置
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
vi /etc/my.cnf
- 找到以下
[mysql]
部分,加入以下代码
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 1234
- 进入mysql(使用navicat连接也行),输入以下代码
- 授权 canal 链接 MySQL 账号(演示的账号为canal)具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
- 这里建立的账号和密码,要和canal配置里信息对应
CREATE USER canal IDENTIFIED BY 'T19980220t-'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; 123
canal配置
- 配置修改
vi conf/example/instance.properties
- 修改以下两处地方,账号密码则是对应mysql创建的拥有权限的那个账号
- ip地址填写 canal监听的mysql数据库所在的ip地址(一般canal和mysql放在同一个ip比较好)
- 启动canal服务(必须改完配置以后再启动,若先启动后才改配置,则需要重新启动)
sh bin/startup.sh
- 测试端口,看到11111端口则代表启动成功
netstat -ntpl
- 重启mysql服务(细节)
systemctl restart mysqld
pom.xml
<dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
bootstrap.yml
- Canal服务部署的地址 填写安装了canal的linux服务器上的ip地址
- 在服务器上的canal的配置文件中 配置要监听哪个ip地址的mysql数据库,详细至canal组件配置
spring: application: name: canal-service cloud: nacos: config: server-addr: 192.168.8.12:8848 #nacos中心地址 file-extension: yaml # 配置文件格式 shared-configs: - data-id: nacos-discovery-config-dev.yaml - data-id: redis-config-dev.yaml profiles: active: dev # 环境标识
canal-service-dev.yaml
server: port: 8021 canal: server: 192.168.8.12:11111 destination: example logging: level: root: info top: javatool: canal: client: client: AbstractCanalClient: error
实体类及其对应数据库表
- 数据库表的结构
- 实体类
@Setter @Getter @Table(name = "t_order_info") public class OrderInfo implements Serializable { public static final Integer STATUS_ARREARAGE = 0;//未付款 public static final Integer STATUS_ACCOUNT_PAID = 1;//已付款 public static final Integer STATUS_CANCEL = 2;//手动取消订单 public static final Integer STATUS_TIMEOUT = 3;//超时取消订单 public static final Integer STATUS_REFUND = 4;//已退款 public static final int PAYTYPE_ONLINE = 0;//在线支付 public static final int PAYTYPE_INTERGRAL = 1;//积分支付 @Column(name = "order_no") private String orderNo;//订单编号 @Column(name = "user_id") private Long userId;//用户ID @Column(name = "product_id") private Long productId;//商品ID @Column(name = "delivery_addr_id") private Long deliveryAddrId;//收货地址 @Column(name = "product_name") private String productName;//商品名称 @Column(name = "product_img") private String productImg;//商品图片 @Column(name = "product_count") private Integer productCount;//商品总数 @Column(name = "product_price") private BigDecimal productPrice;//商品原价 @Column(name = "seckill_price") private BigDecimal seckillPrice;//秒杀价格 @Column(name = "intergral") private Long intergral;//消耗积分 @Column(name = "status") private Integer status = STATUS_ARREARAGE;//订单状态 @Column(name = "create_date") private Date createDate;//订单创建时间 @Column(name = "pay_date") private Date payDate;//订单支付时间 @Column(name = "pay_type") private Integer payType;//支付方式 1-在线支付 2-积分支付 @Column(name = "seckill_date") private Date seckillDate;//秒杀的日期 @Column(name = "seckill_time") private Integer seckillTime;// 秒杀场次 @Column(name = "seckill_id") private Long seckillId;//秒杀商品ID }
监听类
- 服务器上已经配置好监听的是哪个ip的mysql数据库
- @CanalTable(“t_order_info”)定义用于监听哪个表
- EntryHandler ,
<>
中写入监听数据库表对应的实体类 - 重写接口不同的方法,来监听不同的操作,具体如下
@Component @CanalTable("t_order_info") public class OrderInfoHandler implements EntryHandler<OrderInfo> { @Autowired private StringRedisTemplate redisTemplate; @Override public void insert(OrderInfo orderInfo) { System.out.println("新增操作"); } @Override public void update(OrderInfo before, OrderInfo after) { System.out.println("编辑操作"); // before 修改之前 只有修改的那列有值 // 修改之后的 修改之后的对象 所有值都有 System.out.println(before); System.out.println(after); } @Override public void delete(OrderInfo orderInfo) { System.out.println("删除操作"); } } / 修改之后的 修改之后的对象 所有值都有 System.out.println(before); System.out.println(after); } @Override public void delete(OrderInfo orderInfo) { System.out.println("删除操作"); } }