开发者社区 问答 正文

[@小川游鱼][¥20]kafka的消费者stream iterator block,读不出消息?

rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

/**

  • Created by Administrator on 2015/10/10.
    */

public class KafkaConsumer {

private final ConsumerConnector consumer;

private KafkaConsumer() {
    Properties props = new Properties();
    props.put("auto.offset.reset", "smallest");
    //zookeeper 配置
    props.put("zookeeper.connect", "Master:2181");

    //group 代表一个消费组
    props.put("group.id", "consumer3");

    // 连接zk的session超时时间
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间        props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率        props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常        
    //序列化类 
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConfig config = new ConsumerConfig(props);

    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}

void consume() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("consumertest", new Integer(1));

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0);
    ConsumerIterator<String, String> it = stream.iterator();
    System.out.println("consumer starting...");
    while (it.hasNext())
        System.out.println(it.next().message());
    System.out.println("consumer over");
}

public static void main(String[] args) {
    new KafkaConsumer().consume();
}

}

谢谢~

展开
收起
晓生寒 2018-12-14 16:23:17 3424 分享 版权
阿里云 AI 助理回答

根据您提供的问题和代码示例,看起来您的Kafka消费者程序在尝试读取消息时遇到了阻塞,且消息偏移量(offset)一直为0。这可能是由几个原因导致的:

  1. 消费者组配置:您设置了group.idconsumer3,确保这个消费组没有其他正在运行的消费者实例也在消费同个Topic,因为Kafka会确保一个分区的消息在同一时间只被消费组内的一个消费者消费。

  2. 自动偏移重置策略:您已设置auto.offset.resetsmallest,这意味着如果消费者找不到其位点或者位点不存在(例如新消费组第一次启动),它会从Topic的起始处开始消费。如果Topic内实际上没有消息或之前的消息已被完全消费过,那么消费者将等待新消息到达,这可能导致看上去像是阻塞。

  3. Zookeeper连接与配置:请确认zookeeper.connect指向的地址Master:2181是正确的,并且Zookeeper服务是正常运行的。错误的Zookeeper配置或连接问题会导致消费者无法正确初始化或获取到偏移量信息。

  4. Topic消息生产与消费速度:检查是否在消费者启动后有新的消息产生到consumertest这个Topic中。如果生产者发送速率远低于消费者的消费能力,且之前无积压消息,消费者看起来就会“阻塞”在迭代器的hasNext()调用上。

  5. 版本兼容性与API变更:注意您使用的消费者API可能相对老旧,现代Kafka客户端推荐使用org.apache.kafka.clients.consumer.KafkaConsumer而不是kafka.consumer.Consumer。旧API可能存在已知问题或不再维护,建议升级到新API并调整相应配置。

解决步骤

  • 验证Topic活跃度:首先通过Kafka控制台或其他监控工具确认consumertestTopic是否有新消息产生。

  • 检查消费者日志:查看消费者端的日志输出,寻找任何异常或警告信息,特别是关于连接、认证或配置方面的错误。

  • 更新消费者API:考虑将消费者代码迁移到新版本的Kafka Consumer API,这通常能提供更好的性能和稳定性。以下是一个基于新API的简单示例:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    public class NewKafkaConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "your.bootstrap.servers");
            props.put("group.id", "consumer3");
            props.put("enable.auto.commit", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                consumer.subscribe(Collections.singletonList("consumertest"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records)
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }
    }
    
  • 监控与调试:利用Kafka自带的监控工具或如Grafana仪表盘来观察Topic的生产与消费速率,以及消费者的状态,以帮助定位问题所在。

如果以上步骤均未解决问题,请进一步检查网络状况、防火墙设置或Kafka集群本身的健康状态。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答