DataHub完全兼容kafka

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: DataHub目前已全面支持kafka producer、kafka consumer

通过Kafka协议访问DataHub

基本概念

Kafka是目前主流的消息队列之一,Appache顶级开源项目。DataHub服务基于飞天平台的自研消息队列产品,集团内的用户可能很多对DataHub并不是很熟悉,但是对TimeTunnel(简称TT)会更了解一些,DataHub和TT现在底层是同一套服务,只是集团内和集团外的应用场景不同,所以部署模式不同,目前DataHub主要服务于公有云和专有云,集团内业务较少,TT主要服务于集团内的用户。

DataHub现在已经全面兼容Kafka的读写,用户使用原生的Kafka SDK即可直接通过Kafka协议访问DataHub,对于一直使用Kafka的用户,仅需要修改客户端配置参数即可,实现最小成本的迁移上云。

Datahub和Kafka的区别

DataHub和Kafka的功能基本上一致,这里不针对功能上的区别做展开,主要介绍上底层的区别。

区别1 协议不同

Kafka基于TCP的定制了一套协议,而DataHub的协议是基于HTTP的restful风格协议。

区别2 Kafka客户端直连Broker,而DataHub通过Frontend转发

DataHub:

Kafka:

如上图所示,红色虚线上方表示客户端,下方表示服务端。Kafka在读写数据时,会与Broker直接建立TCP连接,这也就要求客户端必须可以直接访问服务端的机器,这也在一定程度上降低了服务端整体的安全性。对DataHub而言,用户是无法直接访问服务端的broker地址的,所有的用户请求,都需要经过VIP分发到各个Frontend机器上,然后再转发到Broker上,相对于Kafka,集群的安全性更高。

Kafka客户端访问模式的不同就决定了,Kafka必须是用户自建或者云上的实例型产品,每个Kafka实例用户独享,而DataHub的定位就是云上服务型产品,所有的用户共享同一个集群的物理资源,并在Frontend做一系列的权限控制达到资源隔离的效果。所以决定了 所以在稳定性、弹性等方面DataHub都是远远高于Kafka的。

区别3 数据存储模式不同

Kafka和DataHub都是利用顺序读写和Page cache等特点实现的高吞吐,主要区别是存储方式不同。如上图所示,虚线框内的部分表示一台机器,Kafka leader接收到数据之后,会直接写入本地存储中,然后再由follower读取数据并分别写入各自的存储来保证分布式一致性,而DataHub的broker则是会随机的写入集群中N台机器的存储中,所以Kafka和DataHub在Broker层面都存在一定程度的通讯开销,Kafka的内部网络开销为follower和leader的数据一致性上,而DataHub的broker内部通讯开销主要在broker写其他机器上,但是DataHub可以实现非常重要的一个特点——计算存储分离

使用Kafka协议访问datahub实践

示例一: 原生Kafka迁移到DataHub的Kafka

对于使用自建Kafka的用户,只需要修改部分参数配置即可直接将业务迁移到DataHub上。具体的配置可以参考下表:

C=Consumer, P=Producer

参数 C/P 可选配置 是否必须 描述
bootstrap.servers * 参考Kafka域名列表
security.protocol * SASL_SSL 为了保证数据传输的安全性,Kafka写入DataHub默认使用SSL加密传输
sasl.mechanism * PLAIN AK认证方式,仅支持PLAIN
compression.type P LZ4 是否开启压缩传输,目前仅支持LZ4
group.id C project.topic:subId 必须和订阅的topic保持一致,否则无法读取数据
session.timeout.ms C [60000, 180000] kafka默认为10000, 但是因为DataHub限制最小为60000,所以这里默认会变为60000
heartbeat.interval.ms C 建议session.timeout.ms的 2/3 Kafka默认为3000,但是因为session.timeout.ms会被默认修改为60000,所以这里建议显示设置为40000,否则heartbeat请求会过于频繁

Producer示例:

生成kafka_client_producer_jaas.conf文件

创建文件kafka_client_producer_jaas.conf,保存到任意路径,文件内容如下。

KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModulerequiredusername="accessId"password="accessKey";};
maven依赖

Kafka-client版本至少大于等于0.10.0.0,推荐2.4.0

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency>
示例代码
publicclassProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
publicstaticvoidmain(String[] args) {
Propertiesproperties=newProperties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
StringKafkaTopicName="test_project.test_topic";
Producer<String, String>producer=newKafkaProducer<String, String>(properties);
try {
List<Header>headers=newArrayList<>();
RecordHeaderheader1=newRecordHeader("key1", "value1".getBytes());
RecordHeaderheader2=newRecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String>record=newProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// sync sendproducer.send(record).get();
        } catch (InterruptedExceptione) {
e.printStackTrace();
        } catch (ExecutionExceptione) {
e.printStackTrace();
        } finally {
producer.close();
        }
    }
}
运行结果

 

运行结果

运行成功之后,可以再DataHub抽样一下,确认是否正常DataHub。

Consumer示例

生成kafka_client_producer_jaas.conf文件和maven依赖参考Producer示例

示例代码

新加入的consumer需要1分钟左右分配shard,分配完成后即可消费。

publicclassConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
publicstaticvoidmain(String[] args) {
Propertiesproperties=newProperties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String>kafkaConsumer=newKafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String>records=kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String>record : records) {
System.out.println(record.toString());
            }
        }
    }
}
运行结果

运行成功之后,便可以在终端看到读取到的数据。

ConsumerRecord(topic = test_project.test_topic, partition =0, leaderEpoch =0, offset =0, LogAppendTime =1611040892661, serialized key size =3, serialized value size =14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly =false), key = key, value = Hello DataHub!)

注意:这里同一个请求返回的数据的LogAppendTime是相同的,是该请求返回所有的数据的写入DataHub时间的最大值

示例二: Canal采集MySQL增量数据到DataHub(Kafka)并用Flink读取

1.配置MySQL

Canal采集MySQL要求源库开启binlog,并要求配置主库地址,从库无法采集,这里不再赘述。

本文以表orders为例介绍后续的配置流程,表结构如下所示:

mysql> desc orders;

+-------+--------------+------+-----+---------+-------+

| Field | Type         | Null | Key | Default | Extra |

+-------+--------------+------+-----+---------+-------+

| oid   | int(11)      | YES  |     | 5       |       |

| pid   | int(11)      | YES  |     | NULL    |       |

| num   | int(11)      | YES  |     | NULL    |       |

+-------+--------------+------+-----+---------+-------+

2. 创建DataHub Topic

如果想要使用Kafka协议访问DataHub,在创建Topic时必须选择为shard扩展模式,表示topic的扩容方式为添加shard。

//TODO 添加截图

测试所用的project、topic、订阅id分别为: test_project、test_topic、xxxxxx

3.配置Canal

Canal可以采集MySQL的binlog并写入Kafka,但是因为开源Canal的一些限制导致Canal无法通过Kafka协议访问DataHub,所以DataHub团队对开源Canal做了一些改造,使得用户可以使用Canal直接将增量数据写入DataHub(Kafka),具体的改动内容可以参考DataHub官方文档。用户可以直接下载改造好的Canal

将canal.deployer 复制到固定目录并解压

mkdir -p /usr/local/canal

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

修改instance配置 conf/example/instance.properties

#  按需修改成自己的数据库信息#################################################...canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码...canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...# mq configcanal.mq.topic=example
# 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0
# hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart。针对库名的动态TopicName和根据主键哈希的设置,可参考mq参数说明

修改canal配置文件 conf/canal.properties

# ...canal.serverMode = kafka
# 这里以杭州互联网地址为例kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism,其他参数用户可根据实际情况自主进行调优。

修改jass配置文件 conf/kafka_client_producer_jaas.conf

KafkaClient {

 org.apache.kafka.common.security.plain.PlainLoginModule required

 username="accessId"

 password="accessKey";

};

启动

cd /usr/local/canal/

sh bin/startup.sh

4.配置Flink

1). Flink源表

阿里云云上的Flink访问Kafka必须使用VPC Kafka域名地址,其他Flink根据实际情况选择。

createtable kafka_source(  `oid` BIGINT,  `pid` BIGINT,  `num` BIGINT) with ('connector'='kafka','topic'='test_project.test_topic1','properties.bootstrap.servers'='dh-cn-hangzhou-int-vpc.aliyuncs.com:9094','properties.group.id'='test_project.test_topic1:subId','format'='canal-json','scan.startup.mode'='earliest-offset','properties.sasl.mechanism'='PLAIN','properties.security.protocol'='SASL_SSL','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="AccessId" password="AccessKey";');
2). Flink结果表

为了方便展示结果,本例以print类型作为结果表。

CREATETABLE print_table (  oid BIGINT,  pid BIGINT,  num BIGINT) WITH ('connector'='print','logger'='true');
3). Flink sql
INSERTINTO print_canal_table select    `oid BIGINT`,    `pid BIGINT`,    `num BIGINT`
FROM kafka_canal_source;

结果展示

在MySQL中执行一条sql语句mysql> insert into orders values(1,2,3);

1). canal日志

查看 logs/canal/canal.logvi logs/canal/canal.log

2013-02-0522:45:27.967[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## start the canal server.2013-02-0522:45:28.113[main]INFOcom.alibaba.otter.canal.deployer.CanalController-## start the canal server[10.1.29.120:11111]2013-02-0522:45:28.210[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## the canal server is running now ......

查看instance的日志:vi logs/example/example.log

2013-02-0522:50:45.636[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[canal.properties]2013-02-0522:50:45.641[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[example/instance.properties]2013-02-0522:50:45.803[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startCannalInstancefor1-example2013-02-0522:50:45.810[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startsuccessful....
2). DataHub写入结果
{"data":[{"oid":"1","pid":"2","num":"3"}],"database":"ggtt","es":1591092305000,"id":2,"isDdl":false,"mysqlType":{"oid":"int(11)","pid":"int(11)","num":"int(11)"},"old":null,"pkNames":null,"sql":"","sqlType":{"oid":4,"pid":4,"num":4},"table":"orders","ts":1591092305813,"type":"INSERT"}
3). Flink输出

在Flink日志中会有如下输出

2021-02-19 19:54:57,638 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I(1,2,3)

针对原生Kafka的优劣势

优势

  • DataHub的Kafka属于服务型产品,大集群共享物理资源,弹性更大,稳定性更强
  • DataHub可与众多云上产品无缝衔接,提升易用性
  • 节省大量运维成本,开箱即用,用户只需关注业务相关内容,DataHub官方保证可用性不低于99.9%
  • 按量付费,写入不收费,同步其他云产品不收费,目前仅收取存储、读取以及shard租用费,成本相对更低

劣势

  • 因为产品的差异性所以导致无法对Kafka接口完全兼容,目前DataHub只兼容Kafka读写相关接口
  • DataHub的Kafka接口目前不支持幂等性、事务性等功能

总结

目前DataHub已经全面兼容Kafka读写接口,对于一直在使用Kafka的用户,无需再投入精力学习一套其他接口,可以直接使用原生Kafka客户端实现业务的全面上云。

如果您在使用中有任何问题,可以参考官方文档兼容kafka,或者加入钉钉群咨询,群内值班同学会耐心解决您的问题。


相关文章
|
2月前
|
消息中间件 存储 缓存
玩转Kafka—Kafka高性能原因分析
玩转Kafka—Kafka高性能原因分析
28 0
|
4月前
|
消息中间件 关系型数据库 Kafka
Flink CDC可以从Kafka消费数据并写入到Doris中
Flink CDC可以从Kafka消费数据并写入到Doris中
260 2
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
72 2
|
18天前
|
消息中间件 存储 大数据
【Kafka】Kafka为什么不支持读写分离?
【4月更文挑战第7天】【Kafka】Kafka为什么不支持读写分离?
|
2月前
|
消息中间件 分布式计算 Java
|
4月前
|
消息中间件 关系型数据库 MySQL
2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)
2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)
50 0
|
5月前
|
消息中间件 Kafka Serverless
ffc的kafka触发器和kafka的connector的主要区别
c的kafka触发器和kafka的connector的主要区别
32 1
|
消息中间件 弹性计算 分布式计算
Kafka 数据如何同步到 MaxCompute | 学习笔记(一)
快速学习 Kafka 数据如何同步到 MaxCompute
348 0
Kafka 数据如何同步到 MaxCompute | 学习笔记(一)
|
消息中间件 SQL JSON
Kafka 数据如何同步到 MaxCompute | 学习笔记(二)
快速学习 Kafka 数据如何同步到 MaxCompute
234 0
|
消息中间件 弹性计算 分布式计算
Kafka 数据如何同步到 MaxCompute | 学习笔记
快速学习 Kafka 数据如何同步到 MaxCompute,介绍了 Kafka 数据如何同步到 MaxCompute系统机制, 以及在实际应用过程中如何使用。
146 0
Kafka 数据如何同步到 MaxCompute | 学习笔记