【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
简介: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.

问题描述

根据 Spring Cloud Stream Binder Kafka 发送消息/接收消息的代码示例,只需要配置好 application.yaml 中的 event hub connection string, Event Hub Name就可以正常运行并查看执行结果。

但是,当使用JMeter作并发测试时候,就会遇见异常,  主要的异常信息为:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially. 这是Sinks中发送者发送消息的时候,调用 onSubscribe、onNext、onError 和 onComplete 必须顺序执行,不能并发操作。


完整的异常日志:

2025-02-18T13:08:34.399+08:00 ERROR 35416 --- [io-8080-exec-41] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : 
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: reactor.core.publisher.Sinks$EmissionException: 
Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.] with root cause
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
 at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56) ~[reactor-core-3.7.2.jar:3.7.2]
 at com.azure.spring.sample.eventhubs.SourceExample.sendMessage(SourceExample.java:22) ~[classes/:na]
 at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[na:na]
 at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
 at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
 at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:257) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:190) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:986) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:891) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1088) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:978) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.34.jar:6.0]
 at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.2.2.jar:6.2.2]
 at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.34.jar:6.0]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:195) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.2.2.jar:6.2.2]
 at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.2.jar:6.2.2]
 at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:483) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:397) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:905) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1190) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63) ~[tomcat-embed-core-10.1.34.jar:10.1.34]
 at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]


 

问题解答

错误的原因主要是在发送消息的方法中使用了 Sinks.EmitFailureHandler.FAIL_FAST参数,这个参数表示只要发送消息失败,不进行任何重试,直接返回异常。

在Sinks类中,因无法使用 EmitResult 中的其它参数来代替EmitFailureHandler.FAIL_FAST, 所以只能使用 tryEmitNext 方法来缓解此问题。

 


参考资料


Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Kafka in Spring Boot Application : https://github.com/Azure-Samples/azure-spring-boot-samples/tree/main/eventhubs/spring-cloud-azure-starter/spring-cloud-azure-sample-eventhubs-kafka


【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常 :https://www.cnblogs.com/lulight/p/17357267.html

 

相关文章
消息中间件 Java Kafka
188 0
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
209 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
185 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
4月前
|
前端开发 Java API
Spring Cloud Gateway Server Web MVC报错“Unsupported transfer encoding: chunked”解决
本文解析了Spring Cloud Gateway中出现“Unsupported transfer encoding: chunked”错误的原因,指出该问题源于Feign依赖的HTTP客户端与服务端的`chunked`传输编码不兼容,并提供了具体的解决方案。通过规范Feign客户端接口的返回类型,可有效避免该异常,提升系统兼容性与稳定性。
321 0
|
7月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1356 7
|
7月前
|
安全 Java API
深入解析 Spring Security 配置中的 CSRF 启用与 requestMatchers 报错问题
本文深入解析了Spring Security配置中CSRF启用与`requestMatchers`报错的常见问题。针对CSRF,指出默认已启用,无需调用`enable()`,只需移除`disable()`即可恢复。对于`requestMatchers`多路径匹配报错,分析了Spring Security 6.x中方法签名的变化,并提供了三种解决方案:分次调用、自定义匹配器及降级使用`antMatchers()`。最后提醒开发者关注版本兼容性,确保升级平稳过渡。
882 2
|
7月前
|
前端开发 IDE Java
Spring MVC 中因导入错误的 Model 类报错问题解析
在 Spring MVC 或 Spring Boot 开发中,若导入错误的 `Model` 类(如 `ch.qos.logback.core.model.Model`),会导致无法解析 `addAttribute` 方法的错误。正确类应为 `org.springframework.ui.Model`。此问题通常因 IDE 自动导入错误类引起。解决方法包括:删除错误导入、添加正确包路径、验证依赖及清理缓存。确保代码中正确使用 Spring 提供的 `Model` 接口以实现前后端数据传递。
245 0
|
1月前
|
Java 测试技术 数据库连接
【SpringBoot(四)】还不懂文件上传?JUnit使用?本文带你了解SpringBoot的文件上传、异常处理、组件注入等知识!并且带你领悟JUnit单元测试的使用!
Spring专栏第四章,本文带你上手 SpringBoot 的文件上传、异常处理、组件注入等功能 并且为你演示Junit5的基础上手体验
769 2
|
1月前
|
JavaScript Java Maven
【SpringBoot(二)】带你认识Yaml配置文件类型、SpringMVC的资源访问路径 和 静态资源配置的原理!
SpringBoot专栏第二章,从本章开始正式进入SpringBoot的WEB阶段开发,本章先带你认识yaml配置文件和资源的路径配置原理,以方便在后面的文章中打下基础
264 3
|
4月前
|
缓存 JSON 前端开发
第07课:Spring Boot集成Thymeleaf模板引擎
第07课:Spring Boot集成Thymeleaf模板引擎
509 0
第07课:Spring Boot集成Thymeleaf模板引擎

相关产品

  • 云消息队列 Kafka 版