Kafka源表的实现来源于自社区的kafka版本实现。
注意:本文档只适合独享模式下使用。
Kafka需要定义的DDL如下。
create table kafka_stream(
messageKey VARBINARY,
`message` VARBINARY,
topic varchar,
`partition` int,
`offset` bigint
) with (
type ='kafka010',
topic = 'xxx',
`group.id` = 'xxx',
bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
);
注意:以上表中的五个字段顺序务必保持一致。
WITH参数
通用配置
参数 | 注释说明 | 备注 |
---|---|---|
type | Kafka对应版本 | 推荐使用KAFKA010 |
topic | 读取的单个topic | topic名称 |
必选配置
(1)kafka08必选配置:
参数 | 注释说明 | 备注 |
---|---|---|
group.id | 无 | 消费组id |
zookeeper.connect | zk链接地址 | zk连接id |
(2)kafka09/kafka010/kafka011必选配置:
参数 | 注释说明 | 备注 |
---|---|---|
group.id | 无 | 消费组id |
bootstrap.servers | kafka集群地址 | kafka集群地址 |
Kafka集群地址:
如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。
如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。
可选配置
"consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"
注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs
kafka版本对应关系
Type | Kafka 版本 |
---|---|
Kafka08 | 0.8.22 |
Kafka09 | 0.9.0.1 |
Kafka010 | 0.10.2.1 |
Kafka011 | 0.11.0.2 |
Kafka消息解析
默认Kafka读到的消息:
messageKey varbianry,
message varbianry,
topic varchar,
partition int,
offset bigint
这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。
参数 | 注释说明 | 备注 |
---|---|---|
parserUdtf | 自定义解析函数 | 用于解析从kafka读到的消息映射到ddl具体对应的类型 |
如何写一个parserUdtf参见自定义表值函数(UDTF)。
自建kafka
与阿里云Kafka消息队列一样,DDL定义相同。
示例:
create table kafka_stream(
messageKey VARBINARY,
`message` VARBINARY,
topic varchar,
`partition` int,
`offset` bigint
) with (
type ='kafka011',
topic = 'kafka_01',
`group.id` = 'CID_blink',
bootstrap.servers = '192.168.0.251:9092'
);
WITH参数
关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers
参数需要填写自建的地址和端口号。
注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。
- 实时计算
阿里云实时计算(Alibaba Cloud Realtime Compute)是一套基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理、DataLake计算等多种场景。阿里云实时计算产品彻底规避繁重的底层流式处理逻辑开发工作,助力中国企业向实时化、智能化大数据计算升级转型。
- MaxCompute
大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。
- DataWorks(数据工场)
数据工场DataWorks (原大数据开发套件Data IDE) 是基
本文转自实时计算——创建消息队列(Kafka)源表