开发者社区> 问答> 正文

[@小川游鱼][¥20]kafka的消费者stream iterator block,读不出消息?

rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

/**

  • Created by Administrator on 2015/10/10.
    */

public class KafkaConsumer {

private final ConsumerConnector consumer;

private KafkaConsumer() {
    Properties props = new Properties();
    props.put("auto.offset.reset", "smallest");
    //zookeeper 配置
    props.put("zookeeper.connect", "Master:2181");

    //group 代表一个消费组
    props.put("group.id", "consumer3");

    // 连接zk的session超时时间
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间        props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率        props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常        
    //序列化类 
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConfig config = new ConsumerConfig(props);

    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}

void consume() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("consumertest", new Integer(1));

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0);
    ConsumerIterator<String, String> it = stream.iterator();
    System.out.println("consumer starting...");
    while (it.hasNext())
        System.out.println(it.next().message());
    System.out.println("consumer over");
}

public static void main(String[] args) {
    new KafkaConsumer().consume();
}

}

谢谢~

展开
收起
晓生寒 2018-12-14 16:23:17 3397 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载