正文
一、什么是kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。——来自百度百科
二、卡夫卡安装
传统方式
安装环境:
1、Java8+ ,参考Linux系统下安装jdk17&jdk8安装
2、安装ZK,参考搭建ZooKeeper3.7.0集群(传统方式&Docker方式)
3、解压文件
[root@localhost ~]# tar -zxvf kafka_2.13-3.0.0.tgz
4、移动到/usr/local/kafka
[root@localhost ~]# mv kafka_2.13-3.0.0 /usr/local/kafka
5、修改kafka配置文件
[root@localhost config]# vim server.properties
broker1
broker.id=0 #监听 listeners=PLAINTEXT://192.168.139.155:9092 #zk地址 zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183
broker2
broker.id=1 #监听 listeners=PLAINTEXT://192.168.139.156:9092 #zk地址 zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183
broker3
broker.id=2 #监听 listeners=PLAINTEXT://192.168.139.157:9094 #zk地址 zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183
6、分别启动kafka
[root@localhost kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties
7、在其中一台创建topic
[root@localhost kafka]# ./bin/kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic test-topic --partitions 3 --replication-factor 3
通过zk的可视化工具可知,分区已经创建完成。
8、测试
发送消息
[root@localhost kafka]# ./bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.139.155:9092
消费消息 在另一台broker上接收消息
[root@localhost kafka]# ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.139.156:9092
docker方式安装
1、拉取镜像
[root@localhost ~]# docker pull wurstmeister/kafka
2、安装
Broker1
docker run -d --name kafka1 \ -p 9092:9092 \ -e KAFKA_BROKER_ID=1 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
Broker2
docker run -d --name kafka2 \ -p 9093:9093 \ -e KAFKA_BROKER_ID=2 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9093 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka
Broker3
docker run -d --name kafka3 \ -p 9094:9094 \ -e KAFKA_BROKER_ID=3 \ -e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9094 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka
3、测试
进入容器
[root@bogon ~]# docker container exec -it 2d4be3823f16 /bin/bash 进入/opt/kafka_2.13-2.7.1/bin目录
创建topic
bash-5.1# ./kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic my-topic --partitions 3 --replication-factor 3
创建消息
bash-5.1# ./kafka-console-producer.sh --topic my-topic --bootstrap-server 192.168.139.155:9092
消费者 ,进入另一个容器进行消费
bash-5.1# ./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server 192.168.139.155:9093
三、整合SpringBoot
一般模式消费
默认情况下是自动提交offset值。可通过consumer下的属性配置
enable-auto-commit: false
生产者
public void sendMSg(){ System.out.println(">>>>>>>>>>>>>>>>>"); for (int i=0;i<5;i++){ kafkaTemplate.send("xiaojie-topic","test message>>>>>>>>>>>>>>>>>>>>>>"+i); } }
消费者
@KafkaListener(groupId = "xiaojie_group",topics = {"xiaojie-topic"}) public void onMessage(ConsumerRecord<?, ?> record) { log.info("消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}", record.topic(), record.partition(), record.offset(), record.value()); }
生产者回调模式
生产者回调函数,可以确认消息是否成功发送到broker,发送失败,进行重试或者人工补偿措施,确保消息投递到broker。有以下两种方式
方式1:
public void sendMsgCallback(String callbackMessage){ kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(success -> { //当消息发送成功的回调函数 // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功>>>>>>>>>>>>>>>>>>>>>>>>>" + topic + "-" + partition + "-" + offset); }, failure -> { //消息发送失败的回调函数 System.out.println("消息发送失败,可以进行人工补偿"); }); }
方式2
public void sendMsgCallback1(String callbackMessage){ kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable ex) { //发送失败 System.out.println("发送失败。。。。。。。。。。。"); } @Override public void onSuccess(SendResult<String, String> result) { //分区信息 Integer partition = result.getRecordMetadata().partition(); //主题 String topic=result.getProducerRecord().topic(); String key=result.getProducerRecord().key(); //发送成功 System.out.println("发送成功。。。。。。。。。。。分区为:"+partition+",主题topic:"+topic+",key:"+key); } }); }
Kafka事务
应用场景
最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。
producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。
kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。
producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。
流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别
在spring.kafka.producer.transaction-id-prefix: tx #开启事务管理
注意:此时重试retries不能为0,acks=-1或者all
/** * @description: kafak事务提交 本地事务不需要事务管理器 * @param: * @return: void * @author xiaojie * @date: 2021/10/14 21:35 */ public void sendTx(){ kafkaTemplate.executeInTransaction(kafkaOperations -> { String msg="这是一条测试事务的数据......"; kafkaOperations.send("tx-topic",msg); int i=1/0; //报错之后,由于事务存在,消息并不会发送到broker return null; }); }
消费者批量消费消息
消费者批量消费消息,如果此时开启批量消费模式,那么同样的topic,消费者将会进行批量消费,不再进行逐条消费。
消费者手动确认
Kafak并不会像rabbitmq那样,消息消费之后,会将消息从队列中删除,Kafka通常根据时间决定数据可以保留多久。默认使用log.retention.hours参数配置时间,默认值是168小时,也就是一周。除此之外,还有其他两个参数,log.retention.minutes和log.retention.ms,这三个参数作用是一样的,都是决定消息多久以会被删除,不过还是推荐使用log.retention.ms,如果指定了不止一个参数,Kafka会优先使用最小值的那个参数。卡夫卡是以offset的位置进行消费,如果不进行,确认那么消费者下次消费的时候,还会从上次消费的位置进行消费。
修改消费者自动提交为false:enable-auto-commit: false
配置工厂类
/* RECORD,当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 BATCH,当每一批记录被消费者监听器(ListenerConsumer)处理之后提交 TIME, 每隔多长时间提交,超过该时间会自动提交 COUNT, 每次提交的数量,超过该数量自动提交 COUNT_TIME, 满足时间和数量的任何一个条件提交 MANUAL_IMMEDIATE MANUAL */ @Bean("manualListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.getContainerProperties().setPollTimeout(1500); factory.setBatchListener(true); //设置批量为true,那么消费端就要一批量的形式接收信息 //配置手动提交offset factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; }
消费者
/* * * @param message * @param ack * @手动提交ack * containerFactory 手动提交消息ack * errorHandler 消费端异常处理器 * @author xiaojie * @date 2021/10/14 * @return void */ @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic", errorHandler = "consumerAwareListenerErrorHandler" ) public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) { for (int i=0;i<record.size();i++){ System.out.println(record.get(i).value()); } ack.acknowledge();//直接提交offset }
指定消费
/** * @description: id:消费者ID; * groupId:消费组ID; * topics:监听的topic,可监听多个; topics不能和topicPartitions同时使用 * topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。 * @param: * @param: record * @return: void * @author xiaojie * @date: 2021/10/14 21:50 */ @KafkaListener(groupId = "xiaojie_group",topicPartitions = { @TopicPartition(topic = "test-topic", partitions = {"1"}), @TopicPartition(topic = "xiaojie-test-topic", partitions = {"1"}, partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "15")) }) public void onMessage1(ConsumerRecord<?, ?> record) { //指定消费某个topic,的某个分区,指定消费位置 //执行消费xiaojie-test-topic的1号分区,和xiaojie-test-topic的1和2号分区,并且2号分区从15开始消费 log.info("消费主题>>>>>>:{},消费分区>>>>>>>>:{},消费偏移量>>>>>:{},消息内容>>>>>:{}", record.topic(), record.partition(), record.offset(), record.value()); }
指定自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
package com.xiaojie.config; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.springframework.stereotype.Component; import java.util.Map; /** * @Description:自定义分区器 我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为: * 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区; * 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区, * 这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费 * patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition; * @author: xiaojie * @date: 2021.10.14 */ @Component public class CustomizePartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //计算分区器 System.out.println("key>>>>>>>>>>>>>"+key); if ("weixin".equals(key)&&"test-topic".equals(topic)){ return 1; } return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
消费端异常处理
package com.xiaojie.config; import org.springframework.context.annotation.Bean; import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler; import org.springframework.stereotype.Component; /** * @author xiaojie * @version 1.0 * @description:通过异常处理器,我们可以处理consumer在消费时发生的异常。 * 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面 * @date 2021/10/14 21:56 */ @Component public class MyErrorHandler { @Bean ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){ return (message, e, consumer) -> { System.out.println("消息消费异常"+message.getPayload()); System.out.println("异常信息>>>>>>>>>>>>>>>>>"+e); return null; }; } }
使用方法
/* * * @param message * @param ack * @手动提交ack * containerFactory 手动提交消息ack * errorHandler 消费端异常处理器 * @author xiaojie * @date 2021/10/14 * @return void */ @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic", errorHandler = "consumerAwareListenerErrorHandler" ) public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) { for (int i=0;i<record.size();i++){ System.out.println(record.get(i).value()); } ack.acknowledge();//直接提交offset }
消息过滤器
@Bean("filterFactory") public ConcurrentKafkaListenerContainerFactory filterFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); factory.setAckDiscarded(true); factory.setRecordFilterStrategy(consumerRecord -> { String value = (String) consumerRecord.value(); if (value.contains("hello")) { //返回false消息没有被过滤继续消费 return false; } System.out.println("...................."); //返回true 消息被过滤掉了 return true; }); return factory; }
使用方法
/** * @description: 消费者过滤器 * @param: * @param: record * @return: void * @author xiaojie * @date: 2021/10/16 1:04 */ @KafkaListener(topics = "filter-topic",containerFactory = "filterFactory") public void filterOnmessage(ConsumerRecord<?,?> record){ log.info("消费到的消息是:》》》》》》》》》》》{}",record.value()); }
完整代码请参考,kafka部分:spring-boot: Springboot整合redis、消息中间件等相关代码