Kafka Producer接口

简介:

参考,

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

http://kafka.apache.org/08/configuration.html , 0.8版本,关于producer,consumer,broker所有的配置

 

因为Producer相对于consumer比较简单,直接看代码,需要注意的点

1. 配置参数,详细参考上面链接 
    1.1 metadata.broker.list, 不同于0.7,不需要给出zk的地址,而是给出一些broker地址,不用全部,这里建议给两个防止一个不可用 
          Kafka会自己找到相应topic,partition的leader broker 
    1.2 serializer.class,需要给出message的序列化的encoder,这里使用的是简单的StringEncoder 
          并且对于key还可以单独的设定,"key.serializer.class"  
          注意,除非明确知道message编码,否则不要直接使用StringEncoder, 
          因为源码中的逻辑是如果没有在初始化时指定编码会默认按UTF8转码,会导致乱码 
          所以不明确的时候,不要指定serializer.class,默认的encoder逻辑是直接将byte[]放入broker,不会做转码 
    1.3 partitioner.class,可以不设置,默认就是random partition,当然这里可以自定义,如何根据key来选择partition 
    1.4 request.required.acks, 是否要求broker给出ack,如果不设置默认是'fire and forget', 会丢数据 
          默认为0,即和0.7一样,发完不会管是否成功,lowest latency but the weakest durability 
          1, 等待leader replica的ack,否则重发,折中的方案,当leader在同步数据前dead,会丢数据 
          -1,等待all in-sync replicas的ack,只要有一个replica活着,就不会丢数据 
    1.5 producer.type,  
         sync,单条发送 
         async,buffer一堆请求后,再一起发送 
         如果不是对丢数据非常敏感,请设为async,因为对throughput帮助很大,但是当client crash时,会丢数据 
    1.6 compression.codec 
         支持"none", "gzip" and "snappy" 
         可以通过,compressed.topics,来指定压缩的topic

    当producer.type选择async的时候,需要关注如下配置 
    queue.buffering.max.ms (5000), 最大buffer数据的时间,默认是5秒 
    batch.num.messages (200), batch发送的数目,默认是200,producer会等待buffer的messages数目达到200或时间超过5秒,才发送数据 
    queue.buffering.max.messages (10000), 最多可以buffer的message数目,超过要么producer block或把数据丢掉 
    queue.enqueue.timeout.ms (-1), 默认是-1,即达到buffer最大meessage数目时,producer会block 
                                                       设为0,达到buffer最大meessage数目时会丢掉数据

 

2. Producer发送的是kv数据 
无论Producer或KeyedMessage都是<String, String>的泛型,这里是指key和value的类型

复制代码
import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "host1:9092, host2:9092 "); //
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner"); //可以不设置
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg); //指定topic,key,value
               producer.send(data);
        }
        producer.close();
    }
}
复制代码

 

对于自定义partitioner也很简单,

对于partition,两个参数,key和partitions的数目 
所要完成的逻辑就是,如果根据key在partitions中挑选一个合适的partition

复制代码
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

本文章摘自博客园,原文发布日期:2014-06-23
目录
相关文章
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
1435 0
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
401 4
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
562 4
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
788 2
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
315 8
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
657 2
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
327 3
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
237 0
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
218 5