Maxwell - 增量数据同步工具(1)https://developer.aliyun.com/article/1532368
3.3、Maxwell 进程启动
3.3.1、使用命令行参数启动
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
- user:连接 mysql 的用户(上面我们已经创建了)
- password:连接 mysql 的用户密码
- host: mysql 安装的主机名
- producer:生产者模式(stdout:控制台,kafka:kafka 集群)
查看 maxwell 输出:
我们可以看到,我们的 insert 语句被 maxwell 转为了 json 被输出到了控制台。
3.3.2、定制化配置文件启动
编辑配置文件 config.properties:
log_level=info producer=stdout kafka.bootstrap.servers=localhost:9092 # mysql login info host=hadoop102 user=maxwell password=123456
启动 Maxwell:
4、Maxwell 使用
4.1、监控 MySQL 数据并在控制台打印
上面我们已经演示过了,也就是首先在 mysql 的配置文件中指定需要打印 binlog 的数据库,然后通过命令或者配置文件来开启 Maxwell 进程:
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout
接着我们对被监听的数据库的操作信息就会以 json 的格式打印出来:
4.2、监控 MySQL 数据并输出到 Kafka
监控 MySQL 数据并输出到 Kafka,这种需求是我们实际工作用的最多的,这也是我们离线数仓和实时数仓所必须的。
4.2.1、实例
1)启动 Zookeeper 和 Kafka
2)启动 Maxwell 监控 binlog
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=kafka --kafka.bootstrap.servers=hadoop102:9092 --kafka_topic=maxwell
注:这里我们制定了 Kafka 集群地址为 hadoop102:9092 ,主题为 maxwell,虽然主题并不存在,但是 kafka 会自动创建我们指定的主题
3)在 hadoop103 消费 maxwell 主题
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
4)在 MySQL 中增加一条数据再删除:
可以看到我们的数据成功被 Kafka 消费,说明业务数据日志成功传输到 Kafka 。
4.2.2、Kafak 主题数据的分区控制
上面的案例中,我们不论 MySQL 的那个数据库,哪张表,都被保存到了 Kafka 的 maxwell 主题,这样显然很乱。所以这里我们需要解决一下:
在生产环境中,我们一般都会用 maxwell 监控多个 mysql 库的数据,然后将这些数据发往 Kafka 的一个主题,并且这个主题肯定是多分区的,为了提高并发度。那么如何控制这些数据的分区问题,那就变得至关重要,实现步骤如下:
(1)修改 maxwell 的配置文件,定制化启动 maxwell 进程
log_level=info producer=kafka kafka.bootstrap.servers=hadoop102:9092 # mysql login info host=hadoop102 user=maxwell password=123456 # kafka 主题 kafka_topic=maxwell3 # 默认配好的 kafka.compression.type=snappy kafka.retries=0 kafka.acks=1 #producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column] # 常用的按照库分区、按照表分区,这里我们测试按照库分区 producer_partition_by=database
(2)手动创建 Kafka 多分区主题
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3
查看主题信息:
这样,我们不同数据库的操作记录就会保存到 Kafka 不同的分区当中, 分区可以使得集群负载均衡,还可以提高提高读写效率。
4.3、监控 MySQL 指定表输出控制台
(1)运行 maxwell 来监控 mysql 指定表数据更新
监控 test 库下的 staff 表
bin/maxwell --user='maxwell' --password='123456' --host='hadoop102' --producer=stdout --filter 'exclude: *.*,include:test.staff'
(2)向 staff 表中插入数据,查看 maxwell 控制台输出
(3)向 ws2 表中插入一条数据,查看
注:表 ws2 并不是我们指定监控的表
结果:maxwell 控制台并没有任何输出
注:还可以设置 include.test.* 来表示监控 test 库下的所有表
4.4、监控 MySQL 指定表全量输出到控制台
Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增以及变化的,但是 Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据来对 MySQL 中的某张表进行数据初始化,也就是我们常说的全量同步。具体操作如下:
需求:将 test 库下的 staff 表中的数据全量导入到 maxwell 控制台打印。
我们可以在数据库 maxwell (这个数据库是我们初始化 maxwell 元数据库时创建的)中查看元数据表 positions 的内容:
这里的 server_id 是我们在 mysql 的配置文件 /etc/my.cnf 中配置的;这里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 ;这里的 binlog_position 的值为 2802 代表目前已经读到了第 2802 个字节的位置,意思就说我们现在是从这个偏移量之后开始监听的。
注:也可以通过 SQL:show master status; 来查看当前的 binlog 状态。
1. 向元数据表 maxwell.bootstrap 中插入数据
insert into maxwell.bootstrap(database_name,table_name) values('test','staff');
这就相当于告诉 maxwell,下一次启动 maxwell 作业时,就会调用一个初始化作业,把我们指定的这个 test 数据库下的 staff 表进行一次增量同步。
我们可以看到,通过 bootstrap 表插入数据来完成初始化,这样插入的数据是 json 中 type 字段的值是 "bootstrap-insert" 而不是 "insert"。
再次查看 bootstrap 表:
我们可以发现,is_complete 字段的值从默认值 0 变成了 1,inserted_rows 从 0 变成了 6,后面的 started_at 和 completed_at 字段的值也不再为空了。
2. 使用 maxwell-bootstrap 脚本工具
除了上面的直接向 maxwell 元数据库中的 bootstrap 表插入数据,也可以使用 maxwell 提供的 maxwell-bootstrap 脚本,本质其实是一样的,具体看你觉得哪个省事。
bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties
这样,这个初始化作业就算完成了,当我们下次打开 maxwell 进程时,它并不会再次执行这个初始化任务,因为它已经完成过了。
总结
至此,Maxwell 的学习也已经结束了,待会吧离线数仓项目里的 maxwell 部分完成。总体来说,这个工具还是很好用且简单的,希望有一天自己也可以用屎山堆砌出这么一个自己的框架!