开发者社区 问答 正文

Kafka 消息接入是如何做到的?



接入说明


  1. 使用 Kafka 消息服务之前,先在 MQ 控制台上申请资源,否则将无法通过鉴权认证,以及相关的运维功能。具体步骤如下:申请 Topic 资源,消息类型选择 Kafka 消息;
  2. 申请 Consumer ID。

Kafka服务端版本是0.10.1,Client版本建议0.10及以上。
建议查看各个客户端的 开源 Demo 访问地址,使用之前请仔细阅读 README.md。
目前客户端 Demo 包括 Java, Python, Go, PHP, NodeJs, C++, Logstash, SpringCloud,其它客户端暂时请参照已有 Demo 自行调试,后续会陆续完善。
欢迎加钉钉群咨询,用钉钉扫描群二维码。

访问域名

Region域名
公网kafka-ons-internet.aliyun.com:8080
华东1kafka-cn-hangzhou.aliyun.com:8080
华北2kafka-cn-beijing.aliyun.com:8080
华东2kafka-cn-shanghai.aliyun.com:8080
华南1kafka-cn-shenzhen.aliyun.com:8080

下面以 Java 语言为例,详细说明接入过程。

MQ-Kafka Java Demo


下载连接
1、Maven 依赖配置
  1. <dependency>
  2.        <groupId>org.apache.kafka</groupId>
  3.        <artifactId>kafka-clients</artifactId>
  4.        <version>0.10.1.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.aliyun.openservices</groupId>
  8.       <artifactId>ons-sasl-client</artifactId>
  9.       <version>0.1</version>
  10. </dependency>

2、配置 AccessKey,SecretKey。
MQ-Kafka 利用服务账号 AccessKey,SecretKey 对消息通道进行鉴权。创建文本文件 kafka_client_jaas.conf,内容如下:
  1. KafkaClient {
  2.    com.aliyun.openservices.ons.sasl.client.OnsLoginModule required
  3.    AccessKey="***"
  4.    SecretKey="***";
  5. };

注意:把***替换为服务账号的 AccessKey,SecretKey。
3、设置 kafka_client_jaas.conf 的路径:
有两种办法进行设置(这里假设kafka_client_jaas.conf放在/home/admin下面,实际部署时请注意修改为自己的路径):
程序启动时,启动 JVM 参数:
  1. -Djava.security.auth.login.config=/home/admin/kafka_client_jaas.conf

或者在代码中设置参数(需要保证在 Kafka Producer 和 Consumer 启动之前):
  1. System.setProperty("java.security.auth.login.config", “/home/admin/kafka_client_jaas.conf");

4、下载根证书
下载地址
下载后放入某个目录下,这里假设为”/home/admin”
5、主要参数配置
参数
bootstrap.servers请根据 region 进行选择
ssl.truststore.location/home/admin/kafka.client.truststore.jks(请注意修改为自己的路径)
ssl.truststore.passwordKafkaOnsClient
security.protocolSASL_SSL
sasl.mechanismONS

Kafka 其它参数遵照 Kafka 官网说明。
6、 执行运行语句:
sh run_demo.sh -Dkafka.ons.TOPIC=kafka-test
注意:请将 Topic 更改为申请的 Topic。
7、查看执行结果:
produce ok:kafka-test-*
看到类似以上结果表示 Demo 运行成功。

展开
收起
猫饭先生 2017-10-27 10:28:28 3644 分享 版权
阿里云 AI 助理回答

您已经详细描述了如何使用Java接入阿里云的Kafka服务(MQ-Kafka),这里我再简要概括一下关键步骤,确保您能顺利实施:

  1. 申请资源

    • 在阿里云MQ控制台上申请Topic资源,选择Kafka消息类型。
    • 申请Consumer ID。
  2. 确认版本兼容性

    • 确保您的Kafka服务端版本与客户端版本兼容。服务端为0.10.1时,推荐客户端使用0.10及以上版本。
  3. 查看并参考Demo

    • 阿里云提供了多种语言的开源Demo,包括Java、Python等。对于Java示例,请直接下载或参照其GitHub仓库获取代码。
  4. Maven依赖配置

    • 添加Apache Kafka客户端和阿里云ONS SASL客户端到项目的Maven依赖中。
  5. 鉴权配置

    • 创建kafka_client_jaas.conf文件,填入AccessKey和SecretKey进行鉴权。
    • 设置JVM参数或在代码中指定此文件路径,以应用鉴权配置。
  6. 根证书下载与配置

    • 下载阿里云提供的根证书,并通过SSL信任存储配置指向该证书文件。
  7. 主要参数配置

    • 在Kafka生产者或消费者的配置中设置正确的Bootstrap服务器地址、信任存储位置、密码、安全协议及SASL机制等。
  8. 运行Demo

    • 使用脚本run_demo.sh启动Demo,记得替换脚本中的Topic名称为您实际申请的Topic。
  9. 检查结果

    • 观察执行输出,寻找类似“produce ok:your-topic-name-*”的消息,表明消息已成功发送至Kafka Topic。

如果在接入过程中遇到任何问题,可以考虑加入阿里云提供的钉钉群咨询,获取更直接的技术支持。希望这些步骤能帮助您顺利完成阿里云Kafka服务的接入工作。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答