【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)

简介: 【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)

问题描述

事件中心提供 Kafka 终结点,现有的基于 Kafka 的应用程序可将该终结点用作运行你自己的 Kafka 群集的替代方案。 事件中心可与许多现有 Kafka 应用程序配合使用。在Azure官方提供的Demo中,都是针对Global Azure。以下内容通过实验来一步一步调试并在Azure中国区连接Event Hub成功。

 

操作步骤

  • 准备好Event Hub的连接字符串,可以是Namespace级别的SAS connection string,也可以是Event Hub Instance(Topic)级的连接字符串
  • Event Hub Namespace 级别的连接字符串格式为:Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX
  • Event Hub Instance 级别的连接字符串格式为:Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX;EntityPath=XXXXXX
  • 注:如果您使用的是Global Azure,Event Hub的域名地址为 *.servicebus.windows.net.

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git

cd azure-event-hubs-for-kafka/quickstart/java


在Demo代码中,有两部分代码发送消息的Producer和消费消息的Consumer. 他们的文件结构如下:

  • Producer:使用Demo中的Producer项目代码,发送消息到事件中心,如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依赖的版本信息。
  • Consumer:使用Demo中的Consumer项目代码,从开启Kafka终结点的事件中心接受消息。如果需要修改kafka的版本,可以在pom.xml文件中修改kafka依赖的版本信息。

 

第一步:修改发送端的Kafka连接字符串和TOPIC名称

在producer.config文件中修改bootstrap.servers 和 sasl.jaas.config 的值。使用事件中心的kafka终结点(Event Hubs Kafka endpoint)。

bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";

同时也在TestProducer.java文件中修改TOPIC值。如这次测试中使用的是testmessage

 

第二步:修改消费端的Kafka连接字符串和TOPIC名称

在consumer.config文件中修改bootstrap.servers 和 sasl.jaas.config 的值。使用事件中心的kafka终结点(Event Hubs Kafka endpoint)。

bootstrap.servers=youreventhubnamespacename.servicebus.chinacloudapi.cn:9093
group.id=$Default
request.timeout.ms=60000
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://youreventhubnamespacename.servicebus.chinacloudapi.cn/;SharedAccessKeyName=manage;SharedAccessKey=xxxxxx;EntityPath=testmessage";

同时也在TestConsumer.java文件中修改TOPIC值。如这次测试中使用的是testmessage

 

第三步:调试发送端和消费端代码

在VS Code中直接调试代码,点击F5启动或者在Mian方法之上点击run or debug linkbutton。测试效果如:

 

错误一:Invalid SASL mechanism response, server may be expecting a different protocol   /   Error reading field 'sasl_auth_bytes': Bytes size -1 cannot be negative

在开始Debug Producer代码时,出现了无效的SASL响应,Event Hub服务端不支持当前使用的kafka协议错误 (Invalid SASL mechanism response, server may be expecting a different protocol)。原来是由于使用的Event Hub定价层为基本层,而Azure支持Apache Kafka协议是在标准版和专用版。所以回到Azure Event Hub的定价层页面,升级到标准版后就可以成功连接到事件中心(Event Hub)。

This error occurs when publishing to a basic plan Event Hub, as the basic plan does not support interaction via Kafka protocol.An upgrade to a standard plan should resolve this. https://azure.microsoft.com/en-au/pricing/details/event-hubs/

 

参考资料

Send and Receive Messages in Java using Azure Event Hubs for Apache Kafka Ecosystems: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java

针对 Azure 事件中心的 Apache Kafka 开发人员指南: https://docs.azure.cn/zh-cn/event-hubs/apache-kafka-developer-guide

Not able connect to EventHub via KAFKA api: https://stackoverflow.com/questions/59891094/not-able-connect-to-eventhub-via-kafka-api

相关文章
|
3月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
217 7
|
2月前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
301 4
|
4月前
|
消息中间件 存储 监控
Apache Kafka 3.0与KRaft模式的革新解读
在该架构中,Kafka集群依旧包含多个broker节点,但已不再依赖ZooKeeper集群。被选中的Kafka集群Controller将从KRaft Quorum中加载其状态,并在必要时通知其他Broker节点关于元数据的变更。这种设计支持更多分区与快速Controller切换,并有效避免了因数据不一致导致的问题。
|
12月前
|
Dubbo 安全 应用服务中间件
Apache Dubbo 正式发布 HTTP/3 版本 RPC 协议,弱网效率提升 6 倍
在 Apache Dubbo 3.3.0 版本之后,官方推出了全新升级的 Triple X 协议,全面支持 HTTP/1、HTTP/2 和 HTTP/3 协议。本文将围绕 Triple 协议对 HTTP/3 的支持进行详细阐述,包括其设计目标、实际应用案例、性能测试结果以及源码架构分析等内容。
676 114
|
12月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
604 5
|
12月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
415 1
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
27天前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
242 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
229 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式

推荐镜像

更多