通过Kafka协议访问DataHub
基本概念
Kafka是目前主流的消息队列之一,Appache顶级开源项目。DataHub服务基于飞天平台的自研消息队列产品,集团内的用户可能很多对DataHub并不是很熟悉,但是对TimeTunnel(简称TT)会更了解一些,DataHub和TT现在底层是同一套服务,只是集团内和集团外的应用场景不同,所以部署模式不同,目前DataHub主要服务于公有云和专有云,集团内业务较少,TT主要服务于集团内的用户。
DataHub现在已经全面兼容Kafka的读写,用户使用原生的Kafka SDK即可直接通过Kafka协议访问DataHub,对于一直使用Kafka的用户,仅需要修改客户端配置参数即可,实现最小成本的迁移上云。
Datahub和Kafka的区别
DataHub和Kafka的功能基本上一致,这里不针对功能上的区别做展开,主要介绍上底层的区别。
区别1 协议不同
Kafka基于TCP的定制了一套协议,而DataHub的协议是基于HTTP的restful风格协议。
区别2 Kafka客户端直连Broker,而DataHub通过Frontend转发
DataHub:
Kafka:
如上图所示,红色虚线上方表示客户端,下方表示服务端。Kafka在读写数据时,会与Broker直接建立TCP连接,这也就要求客户端必须可以直接访问服务端的机器,这也在一定程度上降低了服务端整体的安全性。对DataHub而言,用户是无法直接访问服务端的broker地址的,所有的用户请求,都需要经过VIP分发到各个Frontend机器上,然后再转发到Broker上,相对于Kafka,集群的安全性更高。
Kafka客户端访问模式的不同就决定了,Kafka必须是用户自建或者云上的实例型产品,每个Kafka实例用户独享,而DataHub的定位就是云上服务型产品,所有的用户共享同一个集群的物理资源,并在Frontend做一系列的权限控制达到资源隔离的效果。所以决定了 所以在稳定性、弹性等方面DataHub都是远远高于Kafka的。
区别3 数据存储模式不同
Kafka和DataHub都是利用顺序读写和Page cache等特点实现的高吞吐,主要区别是存储方式不同。如上图所示,虚线框内的部分表示一台机器,Kafka leader接收到数据之后,会直接写入本地存储中,然后再由follower读取数据并分别写入各自的存储来保证分布式一致性,而DataHub的broker则是会随机的写入集群中N台机器的存储中,所以Kafka和DataHub在Broker层面都存在一定程度的通讯开销,Kafka的内部网络开销为follower和leader的数据一致性上,而DataHub的broker内部通讯开销主要在broker写其他机器上,但是DataHub可以实现非常重要的一个特点——计算存储分离。
使用Kafka协议访问datahub实践
示例一: 原生Kafka迁移到DataHub的Kafka
对于使用自建Kafka的用户,只需要修改部分参数配置即可直接将业务迁移到DataHub上。具体的配置可以参考下表:
C=Consumer, P=Producer
参数 | C/P | 可选配置 | 是否必须 | 描述 |
bootstrap.servers | * | 参考Kafka域名列表 | 是 | |
security.protocol | * | SASL_SSL | 是 | 为了保证数据传输的安全性,Kafka写入DataHub默认使用SSL加密传输 |
sasl.mechanism | * | PLAIN | 是 | AK认证方式,仅支持PLAIN |
compression.type | P | LZ4 | 否 | 是否开启压缩传输,目前仅支持LZ4 |
group.id | C | project.topic:subId | 是 | 必须和订阅的topic保持一致,否则无法读取数据 |
session.timeout.ms | C | [60000, 180000] | 否 | kafka默认为10000, 但是因为DataHub限制最小为60000,所以这里默认会变为60000 |
heartbeat.interval.ms | C | 建议session.timeout.ms的 2/3 | 否 | Kafka默认为3000,但是因为session.timeout.ms 会被默认修改为60000,所以这里建议显示设置为40000,否则heartbeat请求会过于频繁 |
Producer示例:
生成kafka_client_producer_jaas.conf文件
创建文件kafka_client_producer_jaas.conf,保存到任意路径,文件内容如下。
KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModulerequiredusername="accessId"password="accessKey";};
maven依赖
Kafka-client版本至少大于等于0.10.0.0,推荐2.4.0
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency>
示例代码
publicclassProducerExample { static { System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf"); } publicstaticvoidmain(String[] args) { Propertiesproperties=newProperties(); properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092"); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("compression.type", "lz4"); StringKafkaTopicName="test_project.test_topic"; Producer<String, String>producer=newKafkaProducer<String, String>(properties); try { List<Header>headers=newArrayList<>(); RecordHeaderheader1=newRecordHeader("key1", "value1".getBytes()); RecordHeaderheader2=newRecordHeader("key2", "value2".getBytes()); headers.add(header1); headers.add(header2); ProducerRecord<String, String>record=newProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers); // sync sendproducer.send(record).get(); } catch (InterruptedExceptione) { e.printStackTrace(); } catch (ExecutionExceptione) { e.printStackTrace(); } finally { producer.close(); } } } 运行结果
运行结果
运行成功之后,可以再DataHub抽样一下,确认是否正常DataHub。
Consumer示例
生成kafka_client_producer_jaas.conf文件和maven依赖参考Producer示例
示例代码
新加入的consumer需要1分钟左右分配shard,分配完成后即可消费。
publicclassConsumerExample { static { System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf"); } publicstaticvoidmain(String[] args) { Propertiesproperties=newProperties(); properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092"); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put("group.id", "test_project.test_topic:1611039998153N71KM"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "60000"); properties.put("heartbeat.interval.ms", "40000"); properties.put("ssl.endpoint.identification.algorithm", ""); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String>kafkaConsumer=newKafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic")); while (true) { ConsumerRecords<String, String>records=kafkaConsumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String>record : records) { System.out.println(record.toString()); } } } }
运行结果
运行成功之后,便可以在终端看到读取到的数据。
ConsumerRecord(topic = test_project.test_topic, partition =0, leaderEpoch =0, offset =0, LogAppendTime =1611040892661, serialized key size =3, serialized value size =14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly =false), key = key, value = Hello DataHub!)
注意:这里同一个请求返回的数据的LogAppendTime是相同的,是该请求返回所有的数据的写入DataHub时间的最大值
示例二: Canal采集MySQL增量数据到DataHub(Kafka)并用Flink读取
1.配置MySQL
Canal采集MySQL要求源库开启binlog,并要求配置主库地址,从库无法采集,这里不再赘述。
本文以表orders为例介绍后续的配置流程,表结构如下所示:
mysql> desc orders;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| oid | int(11) | YES | | 5 | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+
2. 创建DataHub Topic
如果想要使用Kafka协议访问DataHub,在创建Topic时必须选择为shard扩展模式,表示topic的扩容方式为添加shard。
//TODO 添加截图
测试所用的project、topic、订阅id分别为: test_project、test_topic、xxxxxx
3.配置Canal
Canal可以采集MySQL的binlog并写入Kafka,但是因为开源Canal的一些限制导致Canal无法通过Kafka协议访问DataHub,所以DataHub团队对开源Canal做了一些改造,使得用户可以使用Canal直接将增量数据写入DataHub(Kafka),具体的改动内容可以参考DataHub官方文档。用户可以直接下载改造好的Canal。
将canal.deployer 复制到固定目录并解压
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
修改instance配置 conf/example/instance.properties
# 按需修改成自己的数据库信息#################################################...canal.instance.master.address=192.168.1.20:3306 # username/password,数据库的用户名和密码...canal.instance.dbUsername = canal canal.instance.dbPassword = canal ...# mq configcanal.mq.topic=example # 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0 # hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#################################################
对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart。针对库名的动态TopicName和根据主键哈希的设置,可参考mq参数说明。
修改canal配置文件 conf/canal.properties
# ...canal.serverMode = kafka # 这里以杭州互联网地址为例kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.security.protocol = SASL_SSL kafka.sasl.mechanism = PLAIN
其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism
,其他参数用户可根据实际情况自主进行调优。
修改jass配置文件 conf/kafka_client_producer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
启动
cd /usr/local/canal/
sh bin/startup.sh
4.配置Flink
1). Flink源表
阿里云云上的Flink访问Kafka必须使用VPC Kafka域名地址,其他Flink根据实际情况选择。
createtable kafka_source( `oid` BIGINT, `pid` BIGINT, `num` BIGINT) with ('connector'='kafka','topic'='test_project.test_topic1','properties.bootstrap.servers'='dh-cn-hangzhou-int-vpc.aliyuncs.com:9094','properties.group.id'='test_project.test_topic1:subId','format'='canal-json','scan.startup.mode'='earliest-offset','properties.sasl.mechanism'='PLAIN','properties.security.protocol'='SASL_SSL','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="AccessId" password="AccessKey";');
2). Flink结果表
为了方便展示结果,本例以print类型作为结果表。
CREATETABLE print_table ( oid BIGINT, pid BIGINT, num BIGINT) WITH ('connector'='print','logger'='true');
3). Flink sql
INSERTINTO print_canal_table select `oid BIGINT`, `pid BIGINT`, `num BIGINT` FROM kafka_canal_source;
结果展示
在MySQL中执行一条sql语句mysql> insert into orders values(1,2,3);
1). canal日志
查看 logs/canal/canal.logvi logs/canal/canal.log
2013-02-0522:45:27.967[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## start the canal server.2013-02-0522:45:28.113[main]INFOcom.alibaba.otter.canal.deployer.CanalController-## start the canal server[10.1.29.120:11111]2013-02-0522:45:28.210[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## the canal server is running now ......
查看instance的日志:vi logs/example/example.log
2013-02-0522:50:45.636[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[canal.properties]2013-02-0522:50:45.641[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[example/instance.properties]2013-02-0522:50:45.803[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startCannalInstancefor1-example2013-02-0522:50:45.810[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startsuccessful....
2). DataHub写入结果
{"data":[{"oid":"1","pid":"2","num":"3"}],"database":"ggtt","es":1591092305000,"id":2,"isDdl":false,"mysqlType":{"oid":"int(11)","pid":"int(11)","num":"int(11)"},"old":null,"pkNames":null,"sql":"","sqlType":{"oid":4,"pid":4,"num":4},"table":"orders","ts":1591092305813,"type":"INSERT"}
3). Flink输出
在Flink日志中会有如下输出
2021-02-19 19:54:57,638 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I(1,2,3)
针对原生Kafka的优劣势
优势
- DataHub的Kafka属于服务型产品,大集群共享物理资源,弹性更大,稳定性更强
- DataHub可与众多云上产品无缝衔接,提升易用性
- 节省大量运维成本,开箱即用,用户只需关注业务相关内容,DataHub官方保证可用性不低于99.9%
- 按量付费,写入不收费,同步其他云产品不收费,目前仅收取存储、读取以及shard租用费,成本相对更低
劣势
- 因为产品的差异性所以导致无法对Kafka接口完全兼容,目前DataHub只兼容Kafka读写相关接口
- DataHub的Kafka接口目前不支持幂等性、事务性等功能
总结
目前DataHub已经全面兼容Kafka读写接口,对于一直在使用Kafka的用户,无需再投入精力学习一套其他接口,可以直接使用原生Kafka客户端实现业务的全面上云。
如果您在使用中有任何问题,可以参考官方文档兼容kafka,或者加入钉钉群咨询,群内值班同学会耐心解决您的问题。