创建消息队列(Kafka)源表

简介:

Kafka源表的实现来源于自社区的kafka版本实现。

注意:本文档只适合独享模式下使用。

Kafka需要定义的DDL如下。

 
  
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka010',
  9. topic = 'xxx',
  10. `group.id` = 'xxx',
  11. bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
  12. );

注意:以上表中的五个字段顺序务必保持一致。

WITH参数

通用配置

参数 注释说明 备注
type Kafka对应版本 推荐使用KAFKA010
topic 读取的单个topic topic名称

必选配置

(1)kafka08必选配置:

参数 注释说明 备注
group.id 消费组id
zookeeper.connect zk链接地址 zk连接id

(2)kafka09/kafka010/kafka011必选配置:

参数 注释说明 备注
group.id 消费组id
bootstrap.servers kafka集群地址 kafka集群地址

Kafka集群地址:

如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。

如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。

可选配置

 
   
  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

kafka版本对应关系

Type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2

Kafka消息解析

默认Kafka读到的消息:

 
   
  1. messageKey varbianry,
  2. message varbianry,
  3. topic varchar,
  4. partition int,
  5. offset bigint

这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。

参数 注释说明 备注
parserUdtf 自定义解析函数 用于解析从kafka读到的消息映射到ddl具体对应的类型

如何写一个parserUdtf参见自定义表值函数(UDTF)

自建kafka

与阿里云Kafka消息队列一样,DDL定义相同。

示例:

 
   
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka011',
  9. topic = 'kafka_01',
  10. `group.id` = 'CID_blink',
  11. bootstrap.servers = '192.168.0.251:9092'
  12. );

WITH参数

关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers参数需要填写自建的地址和端口号。

注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

本文转自实时计算——创建消息队列(Kafka)源表

相关文章
|
16天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
36 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
20天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
3月前
|
图形学 人工智能 C#
从零起步,到亲手实现:一步步教你用Unity引擎搭建出令人惊叹的3D游戏世界,绝不错过的初学者友好型超详细指南 ——兼探索游戏设计奥秘与实践编程技巧的完美结合之旅
【8月更文挑战第31天】本文介绍如何使用Unity引擎从零开始创建简单的3D游戏世界,涵盖游戏对象创建、物理模拟、用户输入处理及动画效果。Unity是一款强大的跨平台游戏开发工具,支持多种编程语言,具有直观编辑器和丰富文档。文章指导读者创建新项目、添加立方体对象、编写移动脚本,并引入基础动画,帮助初学者快速掌握Unity开发核心概念,迈出游戏制作的第一步。
162 1
|
3月前
|
消息中间件 传感器 缓存
为什么Kafka能秒杀众多消息队列?揭秘它背后的五大性能神器,让你秒懂Kafka的极速之道!
【8月更文挑战第24天】Apache Kafka作为分布式流处理平台的领先者,凭借其出色的性能和扩展能力广受好评。本文通过案例分析,深入探讨Kafka实现高性能的关键因素:分区与并行处理显著提升吞吐量;批量发送结合压缩算法减少网络I/O次数及数据量;顺序写盘与页缓存机制提高写入效率;Zero-Copy技术降低CPU消耗;集群扩展与负载均衡确保系统稳定性和可靠性。这些机制共同作用,使Kafka能够在处理大规模数据流时表现出色。
61 3
|
3月前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
93 2
|
3月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
5月前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【6月更文挑战第30天】Apache Kafka是分布式消息系统,用于高吞吐量的发布订阅。在Java中,开发者使用Kafka的客户端库创建生产者和消费者。生产者发送序列化消息到主题,消费者通过订阅和跟踪偏移量消费消息。Kafka以持久化、容灾和顺序写入优化I/O。Java示例代码展示了如何创建并发送/接收消息。通过分区、消费者组和压缩等策略,Kafka在高并发场景下可被优化。
116 1
|
29天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
44 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
258 9

相关产品

  • 云消息队列 Kafka 版