开发者社区> 问答> 正文

tomcat启动,运行kafka-consumer端代码报错?报错

启动tomcat,访问kafka-Consumer端代码(注意是从tomcat启动访问startJob()方法

public void startJob() throws Exception{
        Properties props1 = new Properties();
        props1.put("zookeeper.connect", "10.0.11.43:2181/kafka");
        props1.put("group.id", "solr-consumertest4");
        props1.put("rebalance.max.retries", "5");
        props1.put("rebalance.backoff.ms", "2000");
        props1.put("zookeeper.session.timeout.ms", "5000");
        props1.put("auto.offset.reset", "smallest");
        props1.put("zookeeper.connectiontimeout.ms", "100000");
        props1.put("zookeeper.session.timeout.ms", "40000");
        props1.put("zookeeper.sync.time.ms", "200");
        props1.put("auto.commit.interval.ms", "100");
        ConsumerConfig consumerConfig = new ConsumerConfig(props1);
        
        ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        
        Whitelist whitelist = new Whitelist(topic);
        
        List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
        
        if (CollectionUtils.isEmpty(partitions)) {
            System.out.println("empty!");
            TimeUnit.SECONDS.sleep(1);
        }
        
        //消费消息
         for (KafkaStream<byte[], byte[]> partition : partitions) {
             ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
             while (iterator.hasNext()) {
                 MessageAndMetadata<byte[], byte[]> next = iterator.next();
                 System.out.println("partiton:" + next.partition());
                 System.out.println("offset:" + next.offset());
                 System.out.println("message:" + new String(next.message(), "utf-8"));
             }
         }
    }

这行报错:List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);

Caused by: kafka.common.ConsumerRebalanceFailedException: solr-consumertest3_kongdeyu-1472724776895-ef4780df can't rebalance after 5 retries

网上查都是说有两种情况

1同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.

2是将consumer端配置改为rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

可我仔细检查代码,确认就只有启动了一个消费者来取数据,consumer端的配置也改成对应的关系了,可还是报错

死活就是报错

后来我改成从main函数进入startJob()方法

居然就可以取到kafka上的消息了,没有任何错误

真是奇怪不明白为什么,难道kafka不能再tomcat 中运行吗?

求大神解惑

小弟,不胜感激



展开
收起
爱吃鱼的程序员 2020-06-09 10:50:12 890 0
1 条回答
写回答
取消 提交回答
  • https://developer.aliyun.com/profile/5yerqm5bn5yqg?spm=a2c6h.12873639.0.0.6eae304abcjaIB

    最后找到问题了是jar包的原因

    2020-06-09 10:50:29
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载