Apache Kafka-事务消息的支持与实现(本地事务)

简介: Apache Kafka-事务消息的支持与实现(本地事务)

20191116123525638.png

概述


Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。


一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。


Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。


官方文档: http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html


这个功能比较鸡肋,大家看着用哈 ,它保证不了不同介质的数据一致性。


官方示例


From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer.


202102200833192.png


原生的API操作,请查看文档,这里我们来看下使用Spring kafka如何实现事务消息。


Code (原生API)

@Test 
    public void testT(){ // 正常的 
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.126.140:9092");
        props.put("transactional.id", "my-transactional-id");
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
        producer.initTransactions();
        try {
            producer.beginTransaction();
            for (int i = 0; i < 100; i++)
                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
        producer.close();
    }


20210220133430315.png


    @Test
    public void testT2(){ // 测试异常情况  
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.126.140:9092");
        props.put("transactional.id", "my-transactional-id");
        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
        producer.initTransactions();
        try {
            producer.beginTransaction();
            for (int i = 0; i < 100; i++){
                producer.send(new ProducerRecord<>("my-topic2", Integer.toString(i), Integer.toString(i)));
                if (i == 50) {
                    throw new RuntimeException("MOCK Exception");
                }
            }
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
        producer.close();
    }


看看数据

20210220133347351.png


可以看到 入了一部分,只是这里进入的数据也无法被消费。


Code (Spring Kafka)


2021022009503270.png


POM依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 引入 Spring-Kafka 依赖 -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

配置文件

 spring:
  # Kafka 配置项,对应 KafkaProperties 配置类
  kafka:
    bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    # Kafka Producer 配置项
    producer:
      acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
      retries: 3 # 发送失败时,重试发送的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
      transaction-id-prefix: artisan. # 事务编号前缀
    # Kafka Consumer 配置项
    consumer:
      auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.artisan.springkafka.domain
      isolation-level: read_committed # 读取已提交的消息
    # Kafka Consumer Listener 监听器配置
    listener:
      missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
logging:
  level:
    org:
      springframework:
        kafka: ERROR # spring-kafka
      apache:
        kafka: ERROR # kafka


2021022009533690.png


spring.kafka.producer.acks 配置为all,Kafka 的事务消息需要基于幂等性来实现,必须保证所有节点都写入成功,否则的话启动时会抛出Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence

20210220095537657.png


事务编号前缀属性设置 transaction-id-prefix, 需要保证相同应用配置相同,不同应用配置不同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes?


spring.kafka.consumer.properties.isolation.level 设置为 read_committed ,Consumer 仅读取已提交的消息, 否则不生效


20210220095831134.png


生产者

  package com.artisan.springkafka.producer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:25
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanProducerMock {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private KafkaTemplate<Object,Object> kafkaTemplate ;
    public String testTransaction(Runnable runnable){
        return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, String>() {
            @Override
            public String doInOperations(KafkaOperations<Object, Object> operations)  throws  RuntimeException {
                for (int i = 1; i <= 10; i++) {
                    // 用于测试  消息是否在同一个事务中
                    if (i   == 7 ) {
                        throw new RuntimeException("MOCK ERROR , TEST Tranasction");
                    }
                    // 模拟发送的消息
                    Integer id = new Random().nextInt(100);
                    MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);
                    SendResult<Object, Object> sendResult = null;
                    try {
                        sendResult = operations.send(TOPIC.TOPIC, messageMock).get();
                    }  catch ( Exception e) {
                        logger.error("Error {}", e);
                    }
                    logger.info( i+ "-[doInOperations][发送数据:[{}] 发送结果:[{}]]", messageMock, sendResult);
                    // 本地业务逻辑...
                    runnable.run();
                }
                // 返回结果
                return "OJ8K";
            }
        });
    }
}



我们模拟发送10条消息,第7条的时候抛出异常,观察消费者是否能消费前面已经发送的6条 ,如果能消费,那肯定不符合和预期。 因为Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败)。


调用 kafkaTemplate#executeInTransaction(OperationsCallback callback) 模板方法,实现在 Kafka 事务中,执行自定义 KafkaOperations.OperationsCallback 操作。


executeInTransaction(...) 方法中,可以通过 KafkaOperations 来执行发送消息等 Kafka 相关的操作,当然了也可以执行自己的业务逻辑,比如 runnable参数,用于表示本地业务逻辑


executeInTransaction(...) 方法的开始,会自动动创建 Kafka 的事务,然后执行KafkaOperations 的逻辑。成功,则提交 Kafka 事务;失败,则回滚 Kafka 事务。


注意事项


如果 Kafka Producer 开启了事务的功能,则所有发送的消息,都必须处于 Kafka 事务之中,否则会抛出 No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record


20210220114325739.png


假设业务中,即存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer)


消费者

 package com.artisan.springkafka.consumer;
import com.artisan.springkafka.constants.TOPIC;
import com.artisan.springkafka.domain.MessageMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:33
 * @mark: show me the code , change the world
 */
@Component
public class ArtisanCosumerMock {
    private Logger logger = LoggerFactory.getLogger(getClass());
    private static final String CONSUMER_GROUP_PREFIX = "MANUAL_ACK_" ;
    @KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)
    public void onMessage(MessageMock messageMock)  {
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }
}


单元测试

  package com.artisan.springkafka.produceTest;
import com.artisan.springkafka.SpringkafkaApplication;
import com.artisan.springkafka.producer.ArtisanProducerMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 *  * @version 1.0
 * @description: TODO
 * @date 2021/2/17 22:40
 * @mark: show me the code , change the world
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringkafkaApplication.class)
public class ProduceMockTest {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private ArtisanProducerMock artisanProducerMock;
    @Autowired
    ArtisanProducerMock producerMock;
    @Test
    public void testAsynSend() throws ExecutionException, InterruptedException {
        logger.info("开始发送");
        producerMock.testTransaction(() -> {
            logger.info(" mock doing bussiness ");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("bussiness over  ");
        });
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
}


测试结果

....
....
....
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.1)
2021-02-20 01:35:44.452  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : Starting ProduceMockTest using Java 1.8.0_261 on LAPTOP-JF3RBRRJ with PID 12108 (started by artisan in D:\IdeaProjects\boot2\springkafkaTransaction)
2021-02-20 01:35:44.456  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : No active profile set, falling back to default profiles: default
2021-02-20 01:35:45.832  INFO 12108 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-02-20 01:35:46.811  INFO 12108 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-02-20 01:35:46.827  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : Started ProduceMockTest in 2.77 seconds (JVM running for 3.55)
2021-02-20 01:35:47.021  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : 开始发送
2021-02-20 01:35:47.298  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 1-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@16]]]
2021-02-20 01:35:47.298  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:48.302  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 01:35:48.305  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 2-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@17]]]
2021-02-20 01:35:48.305  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 3-[doInOperations][发送数据:[MessageMock{id=36, name='messageSendByAsync-36'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=36, name='messageSendByAsync-36'}, timestamp=null), recordMetadata=AC-0@18]]]
2021-02-20 01:35:49.308  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:50.314  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 01:35:50.314  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 4-[doInOperations][发送数据:[MessageMock{id=19, name='messageSendByAsync-19'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=19, name='messageSendByAsync-19'}, timestamp=null), recordMetadata=AC-0@19]]]
2021-02-20 01:35:50.318  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:51.321  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 01:35:51.321  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 5-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=AC-0@20]]]
2021-02-20 01:35:51.325  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.producer.ArtisanProducerMock       : 6-[doInOperations][发送数据:[MessageMock{id=45, name='messageSendByAsync-45'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=45, name='messageSendByAsync-45'}, timestamp=null), recordMetadata=AC-0@21]]]
2021-02-20 01:35:52.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 01:35:53.326  INFO 12108 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
java.lang.RuntimeException: MOCK ERROR , TEST Tranasction
  at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:42)
  at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:34)
  at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:467)
  at com.artisan.springkafka.producer.ArtisanProducerMock.testTransaction(ArtisanProducerMock.java:34)
  at com.artisan.springkafka.produceTest.ProduceMockTest.testAsynSend(ProduceMockTest.java:45)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
  at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
  at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
  at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
  at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
  at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
  at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
  at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
  at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
  at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
  at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
  at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
  at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
2021-02-20 01:35:53.346  INFO 12108 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService
2021-02-20 01:35:53.357  INFO 12108 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
Process finished with exit code -1


可以看到,有异常了,消费者未的消费到消息。

那我们来个成功的看看嘛


20210220101035349.png


2021-02-20 10:10:46.103  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : 开始发送
2021-02-20 10:10:46.429  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 1-[doInOperations][发送数据:[MessageMock{id=44, name='messageSendByAsync-44'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=44, name='messageSendByAsync-44'}, timestamp=null), recordMetadata=OOO_TOIPC-0@7]]]
2021-02-20 10:10:46.429  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 2-[doInOperations][发送数据:[MessageMock{id=76, name='messageSendByAsync-76'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=76, name='messageSendByAsync-76'}, timestamp=null), recordMetadata=OOO_TOIPC-0@8]]]
2021-02-20 10:10:47.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:48.430  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:48.431  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 3-[doInOperations][发送数据:[MessageMock{id=2, name='messageSendByAsync-2'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=OOO_TOIPC-0@9]]]
2021-02-20 10:10:48.431  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:49.434  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:49.438  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 4-[doInOperations][发送数据:[MessageMock{id=34, name='messageSendByAsync-34'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=34, name='messageSendByAsync-34'}, timestamp=null), recordMetadata=OOO_TOIPC-0@10]]]
2021-02-20 10:10:49.438  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:50.440  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:50.440  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 5-[doInOperations][发送数据:[MessageMock{id=41, name='messageSendByAsync-41'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=41, name='messageSendByAsync-41'}, timestamp=null), recordMetadata=OOO_TOIPC-0@11]]]
2021-02-20 10:10:50.444  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 6-[doInOperations][发送数据:[MessageMock{id=29, name='messageSendByAsync-29'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=OOO_TOIPC-0@12]]]
2021-02-20 10:10:51.446  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 7-[doInOperations][发送数据:[MessageMock{id=49, name='messageSendByAsync-49'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=49, name='messageSendByAsync-49'}, timestamp=null), recordMetadata=OOO_TOIPC-0@13]]]
2021-02-20 10:10:52.447  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 8-[doInOperations][发送数据:[MessageMock{id=12, name='messageSendByAsync-12'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=12, name='messageSendByAsync-12'}, timestamp=null), recordMetadata=OOO_TOIPC-0@14]]]
2021-02-20 10:10:53.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 9-[doInOperations][发送数据:[MessageMock{id=15, name='messageSendByAsync-15'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=15, name='messageSendByAsync-15'}, timestamp=null), recordMetadata=OOO_TOIPC-0@15]]]
2021-02-20 10:10:54.450  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:55.454  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:55.454  INFO 25272 --- [           main] c.a.s.producer.ArtisanProducerMock       : 10-[doInOperations][发送数据:[MessageMock{id=25, name='messageSendByAsync-25'}] 发送结果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=25, name='messageSendByAsync-25'}, timestamp=null), recordMetadata=OOO_TOIPC-0@16]]]
2021-02-20 10:10:55.458  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        :  mock doing bussiness 
2021-02-20 10:10:56.460  INFO 25272 --- [           main] c.a.s.produceTest.ProduceMockTest        : bussiness over  
2021-02-20 10:10:56.625  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=44, name='messageSendByAsync-44'}]
2021-02-20 10:10:56.737  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=76, name='messageSendByAsync-76'}]
2021-02-20 10:10:56.846  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=2, name='messageSendByAsync-2'}]
2021-02-20 10:10:56.962  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=34, name='messageSendByAsync-34'}]
2021-02-20 10:10:56.970  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=41, name='messageSendByAsync-41'}]
2021-02-20 10:10:57.074  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=29, name='messageSendByAsync-29'}]
2021-02-20 10:10:57.082  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=49, name='messageSendByAsync-49'}]
2021-02-20 10:10:57.090  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=12, name='messageSendByAsync-12'}]
2021-02-20 10:10:57.094  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=15, name='messageSendByAsync-15'}]
2021-02-20 10:10:57.101  INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock        : 【接受到消息][线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息内容:MessageMock{id=25, name='messageSendByAsync-25'}]


懂了么,老兄 ~

我们继续看下数据 (新跑的数据,和日志有出入)


2021022010225217.png


源码地址


https://github.com/yangshangwei/boot2/tree/master/springkafkaTransaction



相关文章
|
1月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
66 7
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
79 5
|
1月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
76 4
|
13天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
20 1
|
1月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
1月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
43 0
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
51 1
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
314 9
|
4月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
73 3

推荐镜像

更多