DataHub完全兼容kafka

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: DataHub目前已全面支持kafka producer、kafka consumer

通过Kafka协议访问DataHub

基本概念

Kafka是目前主流的消息队列之一,Appache顶级开源项目。DataHub服务基于飞天平台的自研消息队列产品,集团内的用户可能很多对DataHub并不是很熟悉,但是对TimeTunnel(简称TT)会更了解一些,DataHub和TT现在底层是同一套服务,只是集团内和集团外的应用场景不同,所以部署模式不同,目前DataHub主要服务于公有云和专有云,集团内业务较少,TT主要服务于集团内的用户。

DataHub现在已经全面兼容Kafka的读写,用户使用原生的Kafka SDK即可直接通过Kafka协议访问DataHub,对于一直使用Kafka的用户,仅需要修改客户端配置参数即可,实现最小成本的迁移上云。

Datahub和Kafka的区别

DataHub和Kafka的功能基本上一致,这里不针对功能上的区别做展开,主要介绍上底层的区别。

区别1 协议不同

Kafka基于TCP的定制了一套协议,而DataHub的协议是基于HTTP的restful风格协议。

区别2 Kafka客户端直连Broker,而DataHub通过Frontend转发

DataHub:

Kafka:

如上图所示,红色虚线上方表示客户端,下方表示服务端。Kafka在读写数据时,会与Broker直接建立TCP连接,这也就要求客户端必须可以直接访问服务端的机器,这也在一定程度上降低了服务端整体的安全性。对DataHub而言,用户是无法直接访问服务端的broker地址的,所有的用户请求,都需要经过VIP分发到各个Frontend机器上,然后再转发到Broker上,相对于Kafka,集群的安全性更高。

Kafka客户端访问模式的不同就决定了,Kafka必须是用户自建或者云上的实例型产品,每个Kafka实例用户独享,而DataHub的定位就是云上服务型产品,所有的用户共享同一个集群的物理资源,并在Frontend做一系列的权限控制达到资源隔离的效果。所以决定了 所以在稳定性、弹性等方面DataHub都是远远高于Kafka的。

区别3 数据存储模式不同

Kafka和DataHub都是利用顺序读写和Page cache等特点实现的高吞吐,主要区别是存储方式不同。如上图所示,虚线框内的部分表示一台机器,Kafka leader接收到数据之后,会直接写入本地存储中,然后再由follower读取数据并分别写入各自的存储来保证分布式一致性,而DataHub的broker则是会随机的写入集群中N台机器的存储中,所以Kafka和DataHub在Broker层面都存在一定程度的通讯开销,Kafka的内部网络开销为follower和leader的数据一致性上,而DataHub的broker内部通讯开销主要在broker写其他机器上,但是DataHub可以实现非常重要的一个特点——计算存储分离

使用Kafka协议访问datahub实践

示例一: 原生Kafka迁移到DataHub的Kafka

对于使用自建Kafka的用户,只需要修改部分参数配置即可直接将业务迁移到DataHub上。具体的配置可以参考下表:

C=Consumer, P=Producer

参数 C/P 可选配置 是否必须 描述
bootstrap.servers * 参考Kafka域名列表
security.protocol * SASL_SSL 为了保证数据传输的安全性,Kafka写入DataHub默认使用SSL加密传输
sasl.mechanism * PLAIN AK认证方式,仅支持PLAIN
compression.type P LZ4 是否开启压缩传输,目前仅支持LZ4
group.id C project.topic:subId 必须和订阅的topic保持一致,否则无法读取数据
session.timeout.ms C [60000, 180000] kafka默认为10000, 但是因为DataHub限制最小为60000,所以这里默认会变为60000
heartbeat.interval.ms C 建议session.timeout.ms的 2/3 Kafka默认为3000,但是因为session.timeout.ms会被默认修改为60000,所以这里建议显示设置为40000,否则heartbeat请求会过于频繁

Producer示例:

生成kafka_client_producer_jaas.conf文件

创建文件kafka_client_producer_jaas.conf,保存到任意路径,文件内容如下。

KafkaClient{org.apache.kafka.common.security.plain.PlainLoginModulerequiredusername="accessId"password="accessKey";};
maven依赖

Kafka-client版本至少大于等于0.10.0.0,推荐2.4.0

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency>
示例代码
publicclassProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
publicstaticvoidmain(String[] args) {
Propertiesproperties=newProperties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
StringKafkaTopicName="test_project.test_topic";
Producer<String, String>producer=newKafkaProducer<String, String>(properties);
try {
List<Header>headers=newArrayList<>();
RecordHeaderheader1=newRecordHeader("key1", "value1".getBytes());
RecordHeaderheader2=newRecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String>record=newProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// sync sendproducer.send(record).get();
        } catch (InterruptedExceptione) {
e.printStackTrace();
        } catch (ExecutionExceptione) {
e.printStackTrace();
        } finally {
producer.close();
        }
    }
}
运行结果

 

运行结果

运行成功之后,可以再DataHub抽样一下,确认是否正常DataHub。

Consumer示例

生成kafka_client_producer_jaas.conf文件和maven依赖参考Producer示例

示例代码

新加入的consumer需要1分钟左右分配shard,分配完成后即可消费。

publicclassConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }
publicstaticvoidmain(String[] args) {
Propertiesproperties=newProperties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String>kafkaConsumer=newKafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String>records=kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String>record : records) {
System.out.println(record.toString());
            }
        }
    }
}
运行结果

运行成功之后,便可以在终端看到读取到的数据。

ConsumerRecord(topic = test_project.test_topic, partition =0, leaderEpoch =0, offset =0, LogAppendTime =1611040892661, serialized key size =3, serialized value size =14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly =false), key = key, value = Hello DataHub!)

注意:这里同一个请求返回的数据的LogAppendTime是相同的,是该请求返回所有的数据的写入DataHub时间的最大值

示例二: Canal采集MySQL增量数据到DataHub(Kafka)并用Flink读取

1.配置MySQL

Canal采集MySQL要求源库开启binlog,并要求配置主库地址,从库无法采集,这里不再赘述。

本文以表orders为例介绍后续的配置流程,表结构如下所示:

mysql> desc orders;

+-------+--------------+------+-----+---------+-------+

| Field | Type         | Null | Key | Default | Extra |

+-------+--------------+------+-----+---------+-------+

| oid   | int(11)      | YES  |     | 5       |       |

| pid   | int(11)      | YES  |     | NULL    |       |

| num   | int(11)      | YES  |     | NULL    |       |

+-------+--------------+------+-----+---------+-------+

2. 创建DataHub Topic

如果想要使用Kafka协议访问DataHub,在创建Topic时必须选择为shard扩展模式,表示topic的扩容方式为添加shard。

//TODO 添加截图

测试所用的project、topic、订阅id分别为: test_project、test_topic、xxxxxx

3.配置Canal

Canal可以采集MySQL的binlog并写入Kafka,但是因为开源Canal的一些限制导致Canal无法通过Kafka协议访问DataHub,所以DataHub团队对开源Canal做了一些改造,使得用户可以使用Canal直接将增量数据写入DataHub(Kafka),具体的改动内容可以参考DataHub官方文档。用户可以直接下载改造好的Canal

将canal.deployer 复制到固定目录并解压

mkdir -p /usr/local/canal

tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

修改instance配置 conf/example/instance.properties

#  按需修改成自己的数据库信息#################################################...canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码...canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...# mq configcanal.mq.topic=example
# 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0
# hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart。针对库名的动态TopicName和根据主键哈希的设置,可参考mq参数说明

修改canal配置文件 conf/canal.properties

# ...canal.serverMode = kafka
# 这里以杭州互联网地址为例kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

其中必须配置参数为canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism,其他参数用户可根据实际情况自主进行调优。

修改jass配置文件 conf/kafka_client_producer_jaas.conf

KafkaClient {

 org.apache.kafka.common.security.plain.PlainLoginModule required

 username="accessId"

 password="accessKey";

};

启动

cd /usr/local/canal/

sh bin/startup.sh

4.配置Flink

1). Flink源表

阿里云云上的Flink访问Kafka必须使用VPC Kafka域名地址,其他Flink根据实际情况选择。

createtable kafka_source(  `oid` BIGINT,  `pid` BIGINT,  `num` BIGINT) with ('connector'='kafka','topic'='test_project.test_topic1','properties.bootstrap.servers'='dh-cn-hangzhou-int-vpc.aliyuncs.com:9094','properties.group.id'='test_project.test_topic1:subId','format'='canal-json','scan.startup.mode'='earliest-offset','properties.sasl.mechanism'='PLAIN','properties.security.protocol'='SASL_SSL','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="AccessId" password="AccessKey";');
2). Flink结果表

为了方便展示结果,本例以print类型作为结果表。

CREATETABLE print_table (  oid BIGINT,  pid BIGINT,  num BIGINT) WITH ('connector'='print','logger'='true');
3). Flink sql
INSERTINTO print_canal_table select    `oid BIGINT`,    `pid BIGINT`,    `num BIGINT`
FROM kafka_canal_source;

结果展示

在MySQL中执行一条sql语句mysql> insert into orders values(1,2,3);

1). canal日志

查看 logs/canal/canal.logvi logs/canal/canal.log

2013-02-0522:45:27.967[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## start the canal server.2013-02-0522:45:28.113[main]INFOcom.alibaba.otter.canal.deployer.CanalController-## start the canal server[10.1.29.120:11111]2013-02-0522:45:28.210[main]INFOcom.alibaba.otter.canal.deployer.CanalLauncher-## the canal server is running now ......

查看instance的日志:vi logs/example/example.log

2013-02-0522:50:45.636[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[canal.properties]2013-02-0522:50:45.641[main]INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer-Loadingpropertiesfilefromclasspathresource[example/instance.properties]2013-02-0522:50:45.803[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startCannalInstancefor1-example2013-02-0522:50:45.810[main]INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring-startsuccessful....
2). DataHub写入结果
{"data":[{"oid":"1","pid":"2","num":"3"}],"database":"ggtt","es":1591092305000,"id":2,"isDdl":false,"mysqlType":{"oid":"int(11)","pid":"int(11)","num":"int(11)"},"old":null,"pkNames":null,"sql":"","sqlType":{"oid":4,"pid":4,"num":4},"table":"orders","ts":1591092305813,"type":"INSERT"}
3). Flink输出

在Flink日志中会有如下输出

2021-02-19 19:54:57,638 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I(1,2,3)

针对原生Kafka的优劣势

优势

  • DataHub的Kafka属于服务型产品,大集群共享物理资源,弹性更大,稳定性更强
  • DataHub可与众多云上产品无缝衔接,提升易用性
  • 节省大量运维成本,开箱即用,用户只需关注业务相关内容,DataHub官方保证可用性不低于99.9%
  • 按量付费,写入不收费,同步其他云产品不收费,目前仅收取存储、读取以及shard租用费,成本相对更低

劣势

  • 因为产品的差异性所以导致无法对Kafka接口完全兼容,目前DataHub只兼容Kafka读写相关接口
  • DataHub的Kafka接口目前不支持幂等性、事务性等功能

总结

目前DataHub已经全面兼容Kafka读写接口,对于一直在使用Kafka的用户,无需再投入精力学习一套其他接口,可以直接使用原生Kafka客户端实现业务的全面上云。

如果您在使用中有任何问题,可以参考官方文档兼容kafka,或者加入钉钉群咨询,群内值班同学会耐心解决您的问题。


相关文章
|
6天前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
消息中间件 Kafka
使用Kafka Producer写入数据到Datahub
DataHub已经兼容Kafka Producer协议,用户可以使用原生Kafka客户端将数据写入DataHub。
1574 0
使用Kafka Producer写入数据到Datahub
|
6天前
|
消息中间件 JSON druid
Druid:通过 Kafka 加载流数据
Druid:通过 Kafka 加载流数据
44 0
|
6天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
45 0
|
4天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 2
|
6天前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
76 2
|
2天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
21 0
|
2天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 0
|
2天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
15 1
|
4天前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1

热门文章

最新文章