本文为博主原创,未经允许不得转载:
目录:
1. 自定义生产消息 kafkaTemplate 实例
2. 封装 kafka 发送消息的service 方法
3. 测试 kafka 发送消息service 的方法
4. 自定义 kafka 消费消息的工厂 bean
5. kafka 监听消费消息
1. 自定义 kafkaTemplate 实例
a : 使用 @ConditionalOnProperty 注解属性控制是否加载 kafka 相关初始化配置,因为在项目开发过程中,如kafka 或redis 等工具容易封装为
工具类,被各微服务引用并进行加载。使用 @ConditionalOnProperty 注解的 havingValue 属性可以控制服务中是否进行加载对应的配置。
该属性的值,可在 yaml 配置文件中指定: kafka.used = true 。如果为true 则加载,false则不加载
b. 使用工厂实例生成指定的 kafkaTemplate 实例
package com.example.demo.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @ConditionalOnProperty(prefix="kafka",name = "used",havingValue = "true") public class KafkaTemplateConfig { /** * Producer Template 配置 */ @Bean(name="kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * Producer 工厂配置 */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * Producer 参数配置 */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // 指定多个kafka集群多个地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092"); // 重试次数,0为不启用重试机制 props.put(ProducerConfig.RETRIES_CONFIG, 0); //同步到副本, 默认为1 // acks=0 把消息发送到kafka就认为发送成功 // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 props.put(ProducerConfig.ACKS_CONFIG, 1); // 生产者空间不足时,send()被阻塞的时间,默认60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 控制批处理大小,单位为字节 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB) props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576); // 键的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。 // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none"); return props; } }
2. 封装 kafka 发送消息的service 方法:
package com.example.demo.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @Service public class KafkaProduceService { @Autowired private KafkaTemplate kafkaTemplate; /** * producer 同步方式发送数据 * * @param topic topic名称 * @param message producer发送的数据 */ public void sendMessageSync(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); } /** * producer 异步方式发送数据 * * @param topic topic名称 * @param message producer发送的数据 */ public void sendMessageAsync(String topic, String message) { kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { System.out.println("success"); } @Override public void onSuccess(Object o) { System.out.println("failure"); } }); } }
3. 测试 kafka 发送消息service 的方法:
package com.example.demo; import com.example.demo.service.KafkaProduceService; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class ProduceServiceTest { @Autowired private KafkaProduceService producerService; @Test public void sendMessageSync() throws InterruptedException, ExecutionException, TimeoutException { producerService.sendMessageSync("test","同步发送消息测试"); } @Test public void sendMessageAsync() { producerService.sendMessageAsync("test","异步发送消息测试"); } }
4. 自定义 kafka 消费消息的工厂 bean :
package com.example.demo.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(true); return factory; } // @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } // @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // Kafka地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.125.26.68:9092"); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "defaultGroup"); // 是否自动提交offset偏移量(默认true) propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率(ms) propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // Session超时设置 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // 键的反序列化方式 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 值的反序列化方式 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // offset偏移量规则设置: // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } }
5. kafka 监听消费消息:
package com.example.demo.service; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumerListener { @KafkaListener(topics = {"test"},groupId = "group1", containerFactory="kafkaListenerContainerFactory") public void kafkaListener(String message){ System.out.println(message); } }
标签: kafka