kafka Consumer high-level api 之白名单

简介: kafka Consumer high-level api 之白名单

Kafka提供了两套API给Consumer

  1. The high-level Consumer API
  2. The SimpleConsumer API    

第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么下面来介绍下第一种API:

使用白名单可以适配多个topic的情况。

示例代码:

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysite.constant.Constants;
import com.mysite.util.PropertiesUtil;
import com.mysite.util.Utils;
public class KafkaConsumer {
  private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  private ConsumerIterator<byte[], byte[]> iterator = null;
  private static ConsumerConfig consumerConfig;
  private ConsumerConnector connector = null;
  private List<KafkaStream<byte[], byte[]>> partitions = null;
  private Whitelist whitelist = null;
  private int threads = 0;
  private String[] topics;
  private String type;
  
  private String topic = null;
  private String message = null;
  private MessageAndMetadata<byte[], byte[]> next = null;
  public KafkaConsumer(Properties props) {
    String topicStr = props.getProperty("topics");
    if(topicStr==null||topicStr.trim().length()<=0){
      throw new NullPointerException("请正确填写TOPIC.");
    }   
    threads = Integer.parseInt(props.getProperty("threads", "1").trim());
    consumerConfig = createConsumerConfig(props);
    // topic的过滤器
    whitelist = new Whitelist("(" + topicStr + ")");
    init();
  }
  /**
   * 初始化参数
   * 
   * @param props
   * @return
   */
  private static ConsumerConfig createConsumerConfig(Properties props) {
    logger.info("---init kafka config...");
    props.put("zookeeper.session.timeout.ms", "30000");
    props.put("zookeeper.sync.time.ms", "6000");
    props.put("auto.commit.enable", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "largest");
    
    return new ConsumerConfig(props);
  }
  private void init() {
    connector = Consumer.createJavaConsumerConnector(consumerConfig);
    partitions = connector.createMessageStreamsByFilter(whitelist,threads);
    if (CollectionUtils.isEmpty(partitions)) {
      logger.info("empty!");
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    logger.info("---connect kafka success!");
    try{
      for (KafkaStream<byte[], byte[]> partition : partitions) {
        iterator = partition.iterator();
        while (iterator.hasNext()) {
          next = iterator.next();
          try {
            message = new String(next.message(), Constants.UTF8);
          } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
          }
          logger.info(Thread.currentThread()+",partition:"+partition+",offset:" + next.offset() + ",message:" + message);
          
        }
      }
    }catch (Exception e) {
      logger.error("run time error:{}",e);
      close();
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e1) {
        e1.printStackTrace();
      }
      init();
    }
  }
  /**
   * 销毁资源 未使用
   * 
   */
  private void close() {
    logger.info("close resource...");
    if (partitions != null)
      partitions.clear();
    partitions = null;
    if (iterator != null)
      iterator.clearCurrentChunk();
    iterator = null;
    if (connector != null)
      connector.shutdown();
    connector = null;
  }
  /**
   * 主方法入口
   * 
   * @param args
   */
  public static void main(String[] args) {
    FileInputStream fis = null;
    Properties props = new Properties();
    Properties kafkaProps = null;
    Properties syslogProps = null;
    try {
      String encode = System.getProperty(Constants.ENCODE, Constants.UTF8).trim();
      logger.info("encode:{}", encode);
      String path = System.getProperty(Constants.CONFIG_PATH);
      logger.info("path:{}", path);
      if(path==null||path.trim().length()<=0){
        throw new NullPointerException("请正确填写配置文件路径.");
      }
      fis = new FileInputStream(path);
      props.load(new InputStreamReader(fis, encode));
      kafkaProps = PropertiesUtil.getProperties(Constants.KAFKA_PREFIX, props);
      logger.info("kafkaProps:{}", kafkaProps);
      new KafkaConsumer(kafkaProps);
    } catch (Exception e) {
      logger.error("----Runtime error:", e);
    } finally {
      if (fis != null) {
        try {
          fis.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (props != null)
        props.clear();
      if (kafkaProps != null)
        kafkaProps.clear();
    }
  }
}



使用到的配置:

zookeeper.connect=192.168.0.25:2181,192.168.0.26:2181
group.id=groupId1
topics=topic1,topic2
threads=2



相关文章
|
29天前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
4月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
140 58
|
2月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
78 5
|
2月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
86 4
|
3月前
|
消息中间件 安全 大数据
Kafka多线程Consumer是实现高并发数据处理的有效手段之一
【9月更文挑战第2天】Kafka多线程Consumer是实现高并发数据处理的有效手段之一
349 4
|
4月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
124 4
|
4月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
103 2
|
4月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
90 8
|
4月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
111 7