rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
/**
public class KafkaConsumer {
private final ConsumerConnector consumer;
private KafkaConsumer() {
Properties props = new Properties();
props.put("auto.offset.reset", "smallest");
//zookeeper 配置
props.put("zookeeper.connect", "Master:2181");
//group 代表一个消费组
props.put("group.id", "consumer3");
// 连接zk的session超时时间
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间 props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率 props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常
//序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("consumertest", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0);
ConsumerIterator<String, String> it = stream.iterator();
System.out.println("consumer starting...");
while (it.hasNext())
System.out.println(it.next().message());
System.out.println("consumer over");
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
}
谢谢~
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的问题和代码示例,看起来您的Kafka消费者程序在尝试读取消息时遇到了阻塞,且消息偏移量(offset)一直为0。这可能是由几个原因导致的:
消费者组配置:您设置了group.id
为consumer3
,确保这个消费组没有其他正在运行的消费者实例也在消费同个Topic,因为Kafka会确保一个分区的消息在同一时间只被消费组内的一个消费者消费。
自动偏移重置策略:您已设置auto.offset.reset
为smallest
,这意味着如果消费者找不到其位点或者位点不存在(例如新消费组第一次启动),它会从Topic的起始处开始消费。如果Topic内实际上没有消息或之前的消息已被完全消费过,那么消费者将等待新消息到达,这可能导致看上去像是阻塞。
Zookeeper连接与配置:请确认zookeeper.connect
指向的地址Master:2181
是正确的,并且Zookeeper服务是正常运行的。错误的Zookeeper配置或连接问题会导致消费者无法正确初始化或获取到偏移量信息。
Topic消息生产与消费速度:检查是否在消费者启动后有新的消息产生到consumertest
这个Topic中。如果生产者发送速率远低于消费者的消费能力,且之前无积压消息,消费者看起来就会“阻塞”在迭代器的hasNext()
调用上。
版本兼容性与API变更:注意您使用的消费者API可能相对老旧,现代Kafka客户端推荐使用org.apache.kafka.clients.consumer.KafkaConsumer
而不是kafka.consumer.Consumer
。旧API可能存在已知问题或不再维护,建议升级到新API并调整相应配置。
验证Topic活跃度:首先通过Kafka控制台或其他监控工具确认consumertest
Topic是否有新消息产生。
检查消费者日志:查看消费者端的日志输出,寻找任何异常或警告信息,特别是关于连接、认证或配置方面的错误。
更新消费者API:考虑将消费者代码迁移到新版本的Kafka Consumer API,这通常能提供更好的性能和稳定性。以下是一个基于新API的简单示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class NewKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your.bootstrap.servers");
props.put("group.id", "consumer3");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
如果以上步骤均未解决问题,请进一步检查网络状况、防火墙设置或Kafka集群本身的健康状态。