创建消息队列(Kafka)源表

简介:

Kafka源表的实现来源于自社区的kafka版本实现。

注意:本文档只适合独享模式下使用。

Kafka需要定义的DDL如下。

 
  
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka010',
  9. topic = 'xxx',
  10. `group.id` = 'xxx',
  11. bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
  12. );

注意:以上表中的五个字段顺序务必保持一致。

WITH参数

通用配置

参数 注释说明 备注
type Kafka对应版本 推荐使用KAFKA010
topic 读取的单个topic topic名称

必选配置

(1)kafka08必选配置:

参数 注释说明 备注
group.id 消费组id
zookeeper.connect zk链接地址 zk连接id

(2)kafka09/kafka010/kafka011必选配置:

参数 注释说明 备注
group.id 消费组id
bootstrap.servers kafka集群地址 kafka集群地址

Kafka集群地址:

如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。

如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。

可选配置

 
   
  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

kafka版本对应关系

Type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2

Kafka消息解析

默认Kafka读到的消息:

 
   
  1. messageKey varbianry,
  2. message varbianry,
  3. topic varchar,
  4. partition int,
  5. offset bigint

这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。

参数 注释说明 备注
parserUdtf 自定义解析函数 用于解析从kafka读到的消息映射到ddl具体对应的类型

如何写一个parserUdtf参见自定义表值函数(UDTF)

自建kafka

与阿里云Kafka消息队列一样,DDL定义相同。

示例:

 
   
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka011',
  9. topic = 'kafka_01',
  10. `group.id` = 'CID_blink',
  11. bootstrap.servers = '192.168.0.251:9092'
  12. );

WITH参数

关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers参数需要填写自建的地址和端口号。

注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

本文转自实时计算——创建消息队列(Kafka)源表

目录
打赏
0
0
0
0
5351
分享
相关文章
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
446 1
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
162 2
初识Apache Kafka:搭建你的第一个消息队列系统
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
从零起步,到亲手实现:一步步教你用Unity引擎搭建出令人惊叹的3D游戏世界,绝不错过的初学者友好型超详细指南 ——兼探索游戏设计奥秘与实践编程技巧的完美结合之旅
【8月更文挑战第31天】本文介绍如何使用Unity引擎从零开始创建简单的3D游戏世界,涵盖游戏对象创建、物理模拟、用户输入处理及动画效果。Unity是一款强大的跨平台游戏开发工具,支持多种编程语言,具有直观编辑器和丰富文档。文章指导读者创建新项目、添加立方体对象、编写移动脚本,并引入基础动画,帮助初学者快速掌握Unity开发核心概念,迈出游戏制作的第一步。
449 1
为什么Kafka能秒杀众多消息队列?揭秘它背后的五大性能神器,让你秒懂Kafka的极速之道!
【8月更文挑战第24天】Apache Kafka作为分布式流处理平台的领先者,凭借其出色的性能和扩展能力广受好评。本文通过案例分析,深入探讨Kafka实现高性能的关键因素:分区与并行处理显著提升吞吐量;批量发送结合压缩算法减少网络I/O次数及数据量;顺序写盘与页缓存机制提高写入效率;Zero-Copy技术降低CPU消耗;集群扩展与负载均衡确保系统稳定性和可靠性。这些机制共同作用,使Kafka能够在处理大规模数据流时表现出色。
95 3
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
230 2
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。

相关产品

  • 云消息队列 Kafka 版