API 开发:producer 生产者
生产者 api 示例
一个正常的生产逻辑需要具备以下几个步骤
(1)配置生产者参数及创建相应的生产者实例
(2)构建待发送的消息
(3)发送消息
(4)关闭生产者实例
首先,引入 maven 依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency>
采用默认分区方式将消息散列的发送到各个分区当中
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /* kafka生产者api代码示例 */ public class MyProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); //设置 kafka 集群的地址 必选 props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092"); //ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的 消息发送,应答级别 props.put("acks", "all"); //序列化器 因为业务数据有各种类型的,但是kafka底层存储里面不可能有各种类型的,只能是序列化的字节,所以不管你要发什么东西给它,都要提供一个序列化器,帮你能够把key value序列化成二进制的字节 // 因为kafka底层的存储是没有类型维护机制的,用户所发的所有数据类型,都必须 序列化成byte[],所以kafka的producer需要一个针对用户所发送的数据类型的序列化工具类,且这个序列化工具类,需要实现kafka所提供的序列工具接口。 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* 需要额外的指定泛型,key value */ Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) // 其调用是异步的,数据的发送动作在producer的底层是异步线程的 producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), "dd:"+i)); // 在这里面可以通过逻辑判断去指定发送到那个topic中 //Thread.sleep(100); producer.close(); } }
消息对象 ProducerRecord,除了包含业务数据外,还包含了多个属性:
public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp;
其发送方法中,根据参数的不同,有不同的构造方法
其实这样也就意味着我们可以把消息发送到不同的topic。
必要的参数配置
Kafka 生产者客户端 KakaProducer 中有 3 个参数是必填的。
bootstrap.servers / key.serializer / value.serializer
为了防止参数名字符串书写错误,可以使用如下方式进行设置:
pro.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); pro.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
发送消息
创建生产者实例和构建消息之后 就可以开始发送消息了。发送消息主要有 3 种模式:
发后即忘( fire-and-forget)
发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。
在大多数情况下,这种发送方式没有问题;
不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。
这种发送方式的性能最高,可靠性最差。
Future<RecordMetadata> send = producer.send(rcd);
同步发送(sync )
try { producer.send(rcd).get(); } catch (Exception e) { e.printStackTrace(); }
因为Future的get方法是同步阻塞的。
异步发送(async )
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class MyProducer { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); // Kafka 服务端的主机名和端口号 props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key 序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord<String, String>("test", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.out.println(metadata.partition()+ "-"+ metadata.offset()); } } }); } kafkaProducer.close(); } }
API 开发:consumer 消费
import org.apache.kafka.clients.consumer.*; import java.util.Arrays; import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定义 kakfa 服务的地址,不需要将所有 broker 指定上 // 客户端只要知道一台服务器,就能通过这一台服务器来获知整个集群的信息(所有的服务器、主机名等) // 如果你只填写一台,万一,你得客户端启动的时候,宕机了不在线,那就无法连接到集群了 // 如果你填写了堕胎,有一个好处就是,万一连不上其中一个,可以去连接其它的 props.put("bootstrap.servers", "doitedu01:9092"); // 制定 consumer group props.put("group.id", "g1"); // 按照一个时间间隔自动去提交偏移量 // 是否自动提交 offset props.put("enable.auto.commit", "true"); // 自动提交 offset 的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key 的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value 的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // kafka的消费者,默认是从属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略确定消费起始偏移量 // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none /* earliest 自动重置到每个分区的最前一条消息 latest 自动重置到每个分区的最新一条消息 none 没有重置策略 */ props.put("auto.offset.reset","earliest"); // 定义 consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费者订阅的 topic, 可同时订阅多个 // subscribe订阅,是需要参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区 // 只要消费者组里的消费者 变化了 就要发生再均衡 consumer.subscribe(Arrays.asList("first", "test","test1")); // 显式指定消费起始偏移量(如果同时设置了消费者 偏移策略的话,以手动指定的为准) // 在设置消费分区起始偏移量这里,存在一个点,如果此时到这里了然后消费者组再均衡机制还没有做完,那么就会报错,因为可能这个消费者还没有被分配到这个分区 针对这个问题,其实动态再分配是有一个过程 和 时间的,谁也不知道要等多久,所以最好想的sleep就不容易实现了。 想要解决这个问题有两种办法 1.在这个过程中 拉一次数据,能拉到就代表再均衡机制完成了 consumer.poll(Long.MAX_VALVE);这里是无意义的拉一次数据,主要是为了确保分区分配已完成,然后就能够去定位偏移量了。但是这种方式不符合最初的设计初衷,如果是使用subscribe来订阅主题,那就意味着是应该参与这个组的均衡的,参与了,那就不要去指定组的偏移量了,应该听从组的分配。 2.既然要自己指定一个确定的起始消费位置,那通常隐含之意就是不需要去参与消费者组的自动再均衡机制那么就不要使用subscribe来订阅主题 consumer.assign(Arrays.asList(new TopicPartition("ddd",0))) 使用这个是不参与消费者的自动再均衡的。 //TopicPartition first0 = new TopicPartition("first",0); //TopicPartition first1 = new TopicPartition("first",1); //consumer.seek(first0,10); //consumer.seek(first1,15); /* kafka消费者的起始消费位置有两种决定机制 1.手动指定了起始位置,它肯定从你指定的位置开始 2.如果没有手动指定位置,它会在找消费组之前所记录的偏移量开始 3.如果之前的位置也获取不到,就看参数 : auto.offset.reset 所指定的重置策略 */ while (true) { // 读取数据,读取超时时间为 100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // ConsumerRecord中,不光有用户的业务数据,还有kafka塞入的元数据 String key = record.key(); String value = record.value(); // 本条数据所属的topic String topic = record.topic(); // 本条数据所属的分区 int partition = record.partition // 本条数据的offset long offset = record.offset(); // 当前这条数据所在分区的leader的朝代纪年 Optional<Integer> leaderEpoch = record.leaderEpoch(); // 在kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据,timestamp就是其中之一:记录本条数据的时间戳,但是时间戳有两种类型,本条数据的创建时间(生产者)、本条数据的追加时间(broker写入log文件的时间) TimestampType timestampType = record.timestampType(); long timestamp = record.timestamp(); // 数据头,是生产者在写入数据时附加进去的(相当于用户自己的元数据) // 在生产者发送数据的时候,有一个构造方法可以允许你自己携带自己的 headers Headers headers = record.headers(); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
如果消息还没生产到指定的位置呢?这是一个很有趣的问题,到底是等,还是报错
kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic test --offset 100000 --partition 0
假设分区0 中并没有offset >= 100000 的消息,执行之后,并不会报错,但是如果超标了,就会自动重置到最新的(lastest)。
如果如果指定的offset大于最大可用的offset,那么就会定义到最后一条消息。
subscribe 订阅主题
subscribe 有如下重载方法:
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern)
通过这几个构造函数来看,其中有ConsumerRebalanceListener listener 其实就是 再均衡 的监听器,再均衡的过程中,会调用这个方法。
Properties props = new Properties(); // 从配置文件中加载写好的参数 props.load(Consumer.class.getClassLoader.getResourceAsStream("consumer.properties")); // 手动set一些参数进去 props.setProperty(); ...... KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); // reb-1 主题 3个分区 // reb-2 主题 2个分区 consumer.subscribe(Arrays.asList("reb-1","reb-2"),new ConsumerRebalanceListener(){ // 再均衡分配过程中,消费者会取消先前所分配的主题、分区 // 取消了之后,consumer会调用下面的方法 public void onPartitionsRevoked(Collection<TopicPartition> partitions){ } // 再均衡过程中,消费者会重新分配到新的主题、分区 // 分配了新的主题 和 分区之后,consumer底层会调用下面的方法 public void onPartitionAssigned(Collection<TopicPartition> partitions){ } }); 但是以上的过程 懒加载,只有消费者真正 开始 poll的时候,才会实现再均衡分配的过程。
现有的再均衡原则就是每次有消费者增减 都会重新分配,其实就是先全部取消,然后又重新分配了呢,这过程中肯定存在消耗,得先把工作暂停,把偏移量记好,另外一个人接手的时候,还需要另外去读偏移量,重新从对应的位置开始。
而在kafka2.4.1中解决了这个重分配的问题。但是大多数使用的框架没有到这个版本,或者所使用的如spark flink等底层所依赖的kafka没有2.4.1这个版本。
消费者组再均衡分区分配策略
消费者组的意义何在?为了提高数据处理的并行度!
会触发 rebalance 的事件可能是如下任意一种:
- 有新的消费者加入消费组。
- 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
- 有消费者主动退出消费组(发送 LeaveGroupRequest 请求):比如客户端调用了 unsubscrible()方法取消对某些主题的订阅。
- 消费组所对应的 GroupCoorinator 节点发生了变更。
- 消费组内所订阅的任一主题或者主题的分区数量发生变化。
将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何 rebalance 也涉及到分区分配策略。
kafka 有两种的分区分配策略:range(默认) 和 round robin(新版本中又新增了另外 2 种)
我们可以通过 partition.assignment.strategy 参数选择 range 或 roundrobin。
partition.assignment.strategy 参数默认的值是 range。
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
这个参数属于“消费者”参数!
Range Strategy
- 先将消费者按照 client.id 字典排序,然后按 topic 逐个处理;
- 针对一个 topic,将其 partition 总数/消费者数得到 商 n 和 余数 m,则每个 consumer 至少分到 n个分区,且前 m 个 consumer 每人多分一个分区;
举例说明 2:假设有 TOPIC_A 有 5 个分区,由 3 个 consumer(C1,C2,C3)来消费;
5/3 得到商 1,余 2,则每个消费者至少分 1 个分区,前两个消费者各多 1 个分区 C1: 2 个分区,C2:2 个分区, C3:1 个分区
接下来,就按照“区间”进行分配:
C1: TOPIC_A-0 TOPIC_A-1 C2: TOPIC_A-2 TOPIC_A_3 C3: TOPIC_A-4
举例说明 2:假设 TOPIC_A 有 5 个分区,TOPIC_B 有 3 个分区,由 2 个 consumer(C1,C2)来消费
先分配 TOPIC_A:
5/2 得到商 2,余 1,则 C1 有 3 个分区,C2 有 2 个分区,得到结果
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 C2: TOPIC_A-3 TOPIC_A-4
再分配 TOPIC_B:
3/2 得到商 1,余 1,则 C1 有 2 个分区,C2 有 1 个分区,得到结果
C1: TOPIC_B-0 TOPIC_B-1 C2: TOPIC_B-2
最终分配结果:
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1 C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2
如果共同订阅的主题很多,那也就意味着,排在前面的消费者拿到的分区会明显多余排在后面的。
而消费者本身有一个id,是根据id号去排序
以上就是该种模式的弊端,其实就是一个topic一个topic去分的。这个问题尤其是在订阅多个topic的时候最明显,分配单个topic的情况,也就多一个分区。
Round-Robin Strateg
将所有主题分区组成 TopicAndPartition 列表,并对 TopicAndPartition 列表按照其 hashCode 排序,然后,以轮询的方式分配给各消费者。
以上述问题来举例:
先对 TopicPartition 的 hashCode 排序,假如排序结果如下:
TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-
然后按轮询方式分配
C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1 TOPIC_A-4
C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3 TOPIC_B-2
Sticky Strategy
对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor
sticky 策略的特点:
- 要去打成最大化的均衡
- 尽可能保留各消费者原来分配的分区
再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)
以一个例子来看
---开始 C1:A-P0 B-P1 B-P2 C2:B-P0 A-P1 ---加入C3后再分配 Range Strategy C1:A-P0 A-P1 C2:B-P0 B-P2 C3:B-P1 Sticky Strategy C1:A-P0 B-P1 C2:B-P0 A-P1 C3:B-P2 --
探究Kafka原理-3.生产者消费者API原理解析(下):https://developer.aliyun.com/article/1413719