kafka权威指南 第三章第3节 创建Producer

简介:

问题导读:
1 如何创建Kafka的生产者?
2 发送数据都有哪几种方式?




向Kafka中写入数据的第一步就是创建生产者对象,创建生产者对象需要三个主要的配置:

bootstrap.servers
是Kafka的Broker的连接地址,格式为 host:port。这里不要填写所有的broker配置,生产者会查询这些broker的信息,然后添加额外的broker。但是推荐至少填写两个,以免有一个broker离线后,无法连接到集群。
key.serializer
Kafka broker 接收的是消息的键值对的字节数组,但是生产者的接口也支持参数,来发送java对象。这样代码的可读性更高,但是Java对象也需要序列化成字节数组才行。key.serializer需要设置成类的名字,并且这个类需要实现org.apache.kafka.common.serialization.Serializer接口,生产者将会使用这个类对对象的key进行序列化。Kafka的客户端提供了ByteArraySerializer,StringSerializer以及IntegerSerializer,你可以直接使用这些通用的类型,如果有需要的话,也可以自定义。注意,key.serializer只有在你发送一个值的时候有用。
value.serializer
与key.serializer类似,也是配置一个类的名字,然后将会把消息的value部分进行序列化。你的key和value可以设置成不同的序列化格式,如整型的key和字符串的value。

下面是创建Producer客户端的例子,一般情况下你都可以使用这些默认的配置:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);

1 第一行,创建属性配置对象;
2 然后为key和value设置它的序列化格式
3 最后根据配置创建Producer

有了这样一个简单的接口,producer的配置就算完成了。官网文档里面介绍了更多配置的参数,我们将在后面进行更详细的介绍。

一旦启动了这个producer,它就会开始发送消息。有下面三种机制:
Fire and forget 
这种模式会发送给broker消息,但是不关心消息是否正常到达。大多数情况下,它会正常到达,有时候也会在失败的时候尝试多发送几次。但是需要注意的是,这种情况下消息会有丢失
同步发送
这种发送的机制是使用send()方法,它会返回一个对象,通过get()方法可以同步等待,来判断是成功还是失败
异步发送
同样也是调用send,但是会在Kafka broker接收到消息的时候返回。

在上面所有的情况下,都需要考虑发送数据失败的情况以及如何处理它,并且注意单独的producer对象可以使用多线程来发送数据或者也可以使用多个producer。因此,如果你需要更高的吞吐量,你可以在相同的prodcuer上创建更多的线程,也可以开启更多的producer。

在后续的例子中,将会使用我们提到的方法来创建producer,并且处理不同情况可能发生的错误。

来自: http://www.aboutyun.com/forum.php?mod=viewthread&tid=22001 

本文转自   ChinaUnicom110   51CTO博客,原文链接:http://blog.51cto.com/xingyue2011/1939981

相关文章
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
1137 0
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
298 4
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
649 2
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
443 4
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
480 2
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
184 0
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
244 8
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
252 3
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
152 5
下一篇
oss云网关配置