概述
Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。
一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。
Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
官方文档: http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
这个功能比较鸡肋,大家看着用哈 ,它保证不了不同介质的数据一致性。
官方示例
From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer.
原生的API操作,请查看文档,这里我们来看下使用Spring kafka如何实现事务消息。
Code (原生API)
@Test public void testT(){ // 正常的 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.126.140:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); }
@Test public void testT2(){ // 测试异常情况 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.126.140:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++){ producer.send(new ProducerRecord<>("my-topic2", Integer.toString(i), Integer.toString(i))); if (i == 50) { throw new RuntimeException("MOCK Exception"); } } producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); }
看看数据
可以看到 入了一部分,只是这里进入的数据也无法被消费。
Code (Spring Kafka)
POM依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 引入 Spring-Kafka 依赖 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>
配置文件
spring: # Kafka 配置项,对应 KafkaProperties 配置类 kafka: bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 # Kafka Producer 配置项 producer: acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 retries: 3 # 发送失败时,重试发送的次数 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 transaction-id-prefix: artisan. # 事务编号前缀 # Kafka Consumer 配置项 consumer: auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: com.artisan.springkafka.domain isolation-level: read_committed # 读取已提交的消息 # Kafka Consumer Listener 监听器配置 listener: missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 logging: level: org: springframework: kafka: ERROR # spring-kafka apache: kafka: ERROR # kafka
spring.kafka.producer.acks 配置为all,Kafka 的事务消息需要基于幂等性来实现,必须保证所有节点都写入成功,否则的话启动时会抛出Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence
事务编号前缀属性设置 transaction-id-prefix, 需要保证相同应用配置相同,不同应用配置不同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes?
spring.kafka.consumer.properties.isolation.level 设置为 read_committed ,Consumer 仅读取已提交的消息, 否则不生效
生产者
package com.artisan.springkafka.producer; import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import java.util.Random; import java.util.concurrent.ExecutionException; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/2/17 22:25 * @mark: show me the code , change the world */ @Component public class ArtisanProducerMock { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<Object,Object> kafkaTemplate ; public String testTransaction(Runnable runnable){ return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, String>() { @Override public String doInOperations(KafkaOperations<Object, Object> operations) throws RuntimeException { for (int i = 1; i <= 10; i++) { // 用于测试 消息是否在同一个事务中 if (i == 7 ) { throw new RuntimeException("MOCK ERROR , TEST Tranasction"); } // 模拟发送的消息 Integer id = new Random().nextInt(100); MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id); SendResult<Object, Object> sendResult = null; try { sendResult = operations.send(TOPIC.TOPIC, messageMock).get(); } catch ( Exception e) { logger.error("Error {}", e); } logger.info( i+ "-[doInOperations][发送数据:[{}] 发送结果:[{}]]", messageMock, sendResult); // 本地业务逻辑... runnable.run(); } // 返回结果 return "OJ8K"; } }); } }
我们模拟发送10条消息,第7条的时候抛出异常,观察消费者是否能消费前面已经发送的6条 ,如果能消费,那肯定不符合和预期。 因为Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。
调用 kafkaTemplate#executeInTransaction(OperationsCallback callback) 模板方法,实现在 Kafka 事务中,执行自定义 KafkaOperations.OperationsCallback 操作。
executeInTransaction(...) 方法中,可以通过 KafkaOperations 来执行发送消息等 Kafka 相关的操作,当然了也可以执行自己的业务逻辑,比如 runnable参数,用于表示本地业务逻辑
executeInTransaction(...) 方法的开始,会自动动创建 Kafka 的事务,然后执行KafkaOperations 的逻辑。成功,则提交 Kafka 事务;失败,则回滚 Kafka 事务。
注意事项
如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出 No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
假设业务中,即存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer)
消费者
package com.artisan.springkafka.consumer; import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/2/17 22:33 * @mark: show me the code , change the world */ @Component public class ArtisanCosumerMock { private Logger logger = LoggerFactory.getLogger(getClass()); private static final String CONSUMER_GROUP_PREFIX = "MANUAL_ACK_" ; @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC) public void onMessage(MessageMock messageMock) { logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock); } }
单元测试
package com.artisan.springkafka.produceTest; import com.artisan.springkafka.SpringkafkaApplication; import com.artisan.springkafka.producer.ArtisanProducerMock; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.support.SendResult; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * * @version 1.0 * @description: TODO * @date 2021/2/17 22:40 * @mark: show me the code , change the world */ @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringkafkaApplication.class) public class ProduceMockTest { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private ArtisanProducerMock artisanProducerMock; @Autowired ArtisanProducerMock producerMock; @Test public void testAsynSend() throws ExecutionException, InterruptedException { logger.info("开始发送"); producerMock.testTransaction(() -> { logger.info(" mock doing bussiness "); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("bussiness over "); }); // 阻塞等待,保证消费 new CountDownLatch(1).await(); } }
测试结果
.... .... .... . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.4.1) 2021-02-20 01:35:44.452 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : Starting ProduceMockTest using Java 1.8.0_261 on LAPTOP-JF3RBRRJ with PID 12108 (started by artisan in D:\IdeaProjects\boot2\springkafkaTransaction) 2021-02-20 01:35:44.456 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : No active profile set, falling back to default profiles: default 2021-02-20 01:35:45.832 INFO 12108 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2021-02-20 01:35:46.811 INFO 12108 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 2021-02-20 01:35:46.827 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : Started ProduceMockTest in 2.77 seconds (JVM running for 3.55) 2021-02-20 01:35:47.021 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : 开始发送 2021-02-20 01:35:47.298 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 1-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@16]]] 2021-02-20 01:35:47.298 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:48.302 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:48.305 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 2-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@17]]] 2021-02-20 01:35:48.305 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 3-[doInOperations][发送数据:[MessageMock{id=36, name='messageSendByAsync-36'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=36, name='messageSendByAsync-36'}, timestamp=null), recordMetadata=AC-0@18]]] 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:50.314 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:50.314 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 4-[doInOperations][发送数据:[MessageMock{id=19, name='messageSendByAsync-19'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=19, name='messageSendByAsync-19'}, timestamp=null), recordMetadata=AC-0@19]]] 2021-02-20 01:35:50.318 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:51.321 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:51.321 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 5-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=AC-0@20]]] 2021-02-20 01:35:51.325 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 6-[doInOperations][发送数据:[MessageMock{id=45, name='messageSendByAsync-45'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=45, name='messageSendByAsync-45'}, timestamp=null), recordMetadata=AC-0@21]]] 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:53.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over java.lang.RuntimeException: MOCK ERROR , TEST Tranasction at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:42) at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:34) at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:467) at com.artisan.springkafka.producer.ArtisanProducerMock.testTransaction(ArtisanProducerMock.java:34) at com.artisan.springkafka.produceTest.ProduceMockTest.testAsynSend(ProduceMockTest.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74) at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) 2021-02-20 01:35:53.346 INFO 12108 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2021-02-20 01:35:53.357 INFO 12108 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor' Process finished with exit code -1
可以看到,有异常了,消费者未的消费到消息。
那我们来个成功的看看嘛
2021-02-20 10:10:46.103 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : 开始发送 2021-02-20 10:10:46.429 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 1-[doInOperations][发送数据:[MessageMock{id=44, name='messageSendByAsync-44'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=44, name='messageSendByAsync-44'}, timestamp=null), recordMetadata=OOO_TOIPC-0@7]]] 2021-02-20 10:10:46.429 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 2-[doInOperations][发送数据:[MessageMock{id=76, name='messageSendByAsync-76'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=76, name='messageSendByAsync-76'}, timestamp=null), recordMetadata=OOO_TOIPC-0@8]]] 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:48.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:48.431 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 3-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=OOO_TOIPC-0@9]]] 2021-02-20 10:10:48.431 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:49.434 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:49.438 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 4-[doInOperations][发送数据:[MessageMock{id=34, name='messageSendByAsync-34'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=34, name='messageSendByAsync-34'}, timestamp=null), recordMetadata=OOO_TOIPC-0@10]]] 2021-02-20 10:10:49.438 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:50.440 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:50.440 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 5-[doInOperations][发送数据:[MessageMock{id=41, name='messageSendByAsync-41'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=41, name='messageSendByAsync-41'}, timestamp=null), recordMetadata=OOO_TOIPC-0@11]]] 2021-02-20 10:10:50.444 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 6-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=OOO_TOIPC-0@12]]] 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 7-[doInOperations][发送数据:[MessageMock{id=49, name='messageSendByAsync-49'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=49, name='messageSendByAsync-49'}, timestamp=null), recordMetadata=OOO_TOIPC-0@13]]] 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 8-[doInOperations][发送数据:[MessageMock{id=12, name='messageSendByAsync-12'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=12, name='messageSendByAsync-12'}, timestamp=null), recordMetadata=OOO_TOIPC-0@14]]] 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 9-[doInOperations][发送数据:[MessageMock{id=15, name='messageSendByAsync-15'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=15, name='messageSendByAsync-15'}, timestamp=null), recordMetadata=OOO_TOIPC-0@15]]] 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:55.454 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:55.454 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 10-[doInOperations][发送数据:[MessageMock{id=25, name='messageSendByAsync-25'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=25, name='messageSendByAsync-25'}, timestamp=null), recordMetadata=OOO_TOIPC-0@16]]] 2021-02-20 10:10:55.458 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:56.460 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:56.625 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=44, name='messageSendByAsync-44'}] 2021-02-20 10:10:56.737 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=76, name='messageSendByAsync-76'}] 2021-02-20 10:10:56.846 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=2, name='messageSendByAsync-2'}] 2021-02-20 10:10:56.962 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=34, name='messageSendByAsync-34'}] 2021-02-20 10:10:56.970 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=41, name='messageSendByAsync-41'}] 2021-02-20 10:10:57.074 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}] 2021-02-20 10:10:57.082 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='messageSendByAsync-49'}] 2021-02-20 10:10:57.090 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=12, name='messageSendByAsync-12'}] 2021-02-20 10:10:57.094 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=15, name='messageSendByAsync-15'}] 2021-02-20 10:10:57.101 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]
懂了么,老兄 ~
我们继续看下数据 (新跑的数据,和日志有出入)
源码地址
https://github.com/yangshangwei/boot2/tree/master/springkafkaTransaction