kafka 使用及学习过程中的爬坑记录

简介: kafka 使用及学习过程中的爬坑记

文章目录

Producer factory does not support transactions

java.lang.IllegalStateException: Producer factory does not support transactions
  at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
  at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:483) ~[spring-kafka-2.8.2.jar:2.8.2]
  at site.sunlong.kafkapractice.controller.KafkaController.sendTransactionMsg(KafkaController.java:53) ~[classes/:na]
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_152]
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_152]
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_152]
  at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_152]
  at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.15.jar:5.3.15]
  at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
  at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.15.jar:5.3.15]
  at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.56.jar:4.0.FR]
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.15.jar:5.3.15]
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.15.jar:5.3.15]
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.15.jar:5.3.15]
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.15.jar:5.3.15]
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.StandardContextValve.__invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:41002) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.56.jar:9.0.56]
  at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]

异常原因

使用kafka事务发送消息的时候没有开启事务

解决方案

加事务前缀,自动给producer开启事务,所有加

# 事务前缀
spring.kafka.producer.transaction-id-prefix=tx_

Must set retries to non-zero when using the idempotent producer.

org.apache.kafka.common.config.ConfigException: Must set retries to non-zero when using the idempotent producer.

解决方案

# 重试次数
spring.kafka.producer.retries=3

Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
  at org.apache.kafka.clients.producer.ProducerConfig.maybeOverrideAcksAndRetries(ProducerConfig.java:459) ~[kafka-clients-3.0.0.jar:na]
  at org.apache.kafka.clients.producer.ProducerConfig.postProcessParsedConfig(ProducerConfig.java:420) ~[kafka-clients-3.0.0.jar:na]
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:114) ~[kafka-clients-3.0.0.jar:na]
  at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133) ~[kafka-clients-3.0.0.jar:na]
  at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:490) ~[kafka-clients-3.0.0.jar:na]
  at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290) ~[kafka-clients-3.0.0.jar:na]
  at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:863) ~[spring-kafka-2.8.2.jar:2.8.2]

解决方案

配置文件

# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=-1

No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
  at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
  at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:726) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:638) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) ~[spring-kafka-2.8.2.jar:2.8.2]

解决方案

在方法上加@Transactional注解

  @Transactional
    @GetMapping("/transaction")
    public void sendTransactionMsg(@RequestParam String userId){
        String msg = " transaction "+userId;
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {
                kafkaOperations.send(KafkaTopicConstants.TEST_TRANSACTION , msg);
//                throw new RuntimeException("test transaction");
                return "";
            }
        });
    }

a KafkaTemplate is required to support replies

启动报错

java.lang.IllegalStateException: a KafkaTemplate is required to support replies
  at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
  at org.springframework.kafka.config.MethodKafkaListenerEndpoint.lambda$createMessageListener$1(MethodKafkaListenerEndpoint.java:180) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.support.JavaUtils.acceptIfNotNull(JavaUtils.java:71) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:179) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:517) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:500) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:384) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:69) ~[spring-kafka-2.8.2.jar:2.8.2]
  at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:228) ~[spring-kafka-2.8.2.jar:2.8

解决方案

ConcurrentKafkaListenerContainerFactory 没有设置replyTemplate

ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory();
containerFactory.setReplyTemplate(kafkaTemplate);
目录
相关文章
|
1月前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
28 2
|
1月前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
33 1
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
62 2
|
6月前
|
消息中间件 存储 缓存
闭关学习一周kafka,原来他这么快是有原因的!
无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker,二是 Consumer 从 broker 读取数据。那 Kafka 的快也就体现在读写两个方面了,下面我们就聊聊 Kafka 快的原因。
62 1
|
消息中间件 负载均衡 Kafka
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)
Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验(一)
|
6月前
|
消息中间件 负载均衡 Kafka
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
Kafka学习---消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)
696 2
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
消息中间件 缓存 大数据
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
Kafka学习---1、Kafka 概述、Kafka快速入门
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
消息中间件 算法 关系型数据库
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)
Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)(二)