我有一个项目,搭的是springmvc,然后使用了zookeeper和dubbo做的分布式,我在里面添加了kafka消息队列,单个主题跑,没有问题,现在我想配置多个主题,请问该如何配置?(网上好几种方法我都有试,但是有报错,也不知该如何处理,网上没有搜到,请求大佬们的援助)
一下是kafka相关的jar包:
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.4.RELEASE</version>
</dependency>
然后我多个主题配置如下:
<!-- 消费者容器配置信息 -->
<bean id="containerProperties"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg >
<list>
<value>${topic}</value>
<value>${homework-topic}</value>
</list>
</constructor-arg>
<property name="messageListener" ref="messageListernerConsumerService" />
<property name="ackMode" value="${ackMode}" />
</bean>
这样配置报错:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'messageListenerContainer' defined in class path resource [spring-kafka-consumer.xml]: Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1578)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:775)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:861)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:541)
at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:444)
at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:326)
at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:107)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4853)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5314)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:145)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1408)
at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1398)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:361)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1706)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1645)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
... 21 more
求助攻,求助攻,求助攻!!!
问题已解决,是jar包的问题,低版本存在多个主题list转换成set,转成了null的bug,使用最新的版本,没有出现任何问题了
######回复 @岁月无声11 : 你看看的你jar包版本, 可能是jar包的原因.######您好,我最近也遇到和您相似的问题了,xml文件中配置了主题list,在消费监听的时候 @KafkaListener这个注解怎么配置呢?######不好意思,第一次提问题,没有用到代码块,导致看起来有点累,不好意思哈
######这不是你xml配置错了吗?kafka你创建多主题了吗?如果你kafka创建多主题了,不同的方法调用不同的主题的生产者和消费者跟单个主题是一样的啊!######你单个的怎么配置的,你就再加一个!######那么,请问该如何配置呢?######使用KFK可能大部分情况下,不会使用多个topic,因为像RabbitMQ那种Channel,KFK用分区进去替代了,使用起来就像是一个Channel。其实这种思想也是对的,一段业务代码就应该有明显标记来标记它到底是干嘛的,而不是他又多种情况。
######一般不用多个topic吗?假如说,我现在是多重情况呢,一种是消息推送类的,一种是数据处理类的,这样也使用同一个topic吗?######使用KFK可能大部分情况下,不会使用多个topic,因为像RabbitMQ那种Channel,KFK用分区进去替代了,使用起来就像是一个Channel。其实这种思想也是对的,一段业务代码就应该有明显标记来标记它到底是干嘛的,而不是他又多种情况。
Topic是标记消费者的,生产者是使用Topic,并且它是群发模式大部分。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。