启动tomcat,访问kafka-Consumer端代码(注意是从tomcat启动访问startJob()方法)
public void startJob() throws Exception{
Properties props1 = new Properties();
props1.put("zookeeper.connect", "10.0.11.43:2181/kafka");
props1.put("group.id", "solr-consumertest4");
props1.put("rebalance.max.retries", "5");
props1.put("rebalance.backoff.ms", "2000");
props1.put("zookeeper.session.timeout.ms", "5000");
props1.put("auto.offset.reset", "smallest");
props1.put("zookeeper.connectiontimeout.ms", "100000");
props1.put("zookeeper.session.timeout.ms", "40000");
props1.put("zookeeper.sync.time.ms", "200");
props1.put("auto.commit.interval.ms", "100");
ConsumerConfig consumerConfig = new ConsumerConfig(props1);
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Whitelist whitelist = new Whitelist(topic);
List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
if (CollectionUtils.isEmpty(partitions)) {
System.out.println("empty!");
TimeUnit.SECONDS.sleep(1);
}
//消费消息
for (KafkaStream<byte[], byte[]> partition : partitions) {
ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
while (iterator.hasNext()) {
MessageAndMetadata<byte[], byte[]> next = iterator.next();
System.out.println("partiton:" + next.partition());
System.out.println("offset:" + next.offset());
System.out.println("message:" + new String(next.message(), "utf-8"));
}
}
}
这行报错:List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
Caused by: kafka.common.ConsumerRebalanceFailedException: solr-consumertest3_kongdeyu-1472724776895-ef4780df can't rebalance after 5 retries
网上查都是说有两种情况
1同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.
2是将consumer端配置改为rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
可我仔细检查代码,确认就只有启动了一个消费者来取数据,consumer端的配置也改成对应的关系了,可还是报错
死活就是报错
后来我改成从main函数进入startJob()方法
居然就可以取到kafka上的消息了,没有任何错误
真是奇怪不明白为什么,难道kafka不能再tomcat 中运行吗?
求大神解惑
小弟,不胜感激
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
最后找到问题了是jar包的原因