上篇文章说了,kafka新版旧版的区别,producer全部异步发消息,并且提供回调机制callback,判断是否成功,通过分批次发送batching保证吞吐量,分区策略更加合理,旧版本默认是在一段时间内把消息发到固定区域,新版本采用轮询,消息更加均匀。Consumer新版为单线程执行,单个consumer线程管理多个socker,在10版本后,加入了心跳线程,这最多也就算了是双线程。偏移量 在新版本交给kafka处理,舍弃了zookeeper,这样可以依赖kafka备份机制天然实现高可用原理。
一、构造producer
构造一个producer大致需要实现五个步骤:
1)构造一个properties,然后指定bootstrap.server,key.serializer和value.serializer这三个属性。这三个属性是必须的,service代表,localhost:9092,Key.serializer是
org.apache.kafka.common.serialization.stringSerializer。Value.serializer和key的 序列化一致。
2)使用propertites 构造kafkaProducer对象。
3)构造待发送对象,producerRecord,指定把消息发送到topic,partition,以及对应的key和value,其中partition和key不用指定。
4)调用kafkaProducer的send发送消息。
5)关闭kafkaProducer。
1、properties对象的构造
下面详细展开每一步要做的事,首先构造properties这里有三个参数是必须要指定的,他们分别如下:
1、bootstrap.servers:该参数指定host:port,用于kafka的broker服务器连接,producer使用时候会替换成实际的broker列表,如果kafka集群数量很多,那么只需要指定部分broker即可,不需要列出所有机器,因为不管指定几台broker,producer都会通过该参数发现集群中所有broker,该参数指定多台机器只为故障转移,这样即使一台broker挂了,producer重启后依然可以指定其他broker连接kafka集群。
2、Key.serializer:被发送到broker任何格式都必须是字节数组,因此消息的各个组件组件必须首先做序列化,然后才能发送到broker。该参数就是为消息key做序列化用的。这个参数指定是实现了serializer接口类全限定名称,kafka的大部分默认初始类型是primitive type,提供了现成的序列化器。StringSerializer,该类会将一个字符串转成字节数组,这个参数也揭示一个事实,这个用户可以自定义序列化器,只要实现serializer接口就可以。需要注意的时候,producer发送消息不指定key,也是需要配置这个参数的。
3、Value.serializer:org apache.kafka.common.serialization. Serializer,需要注意的是,这个必须写全称,和key.serializer类似,将消息的value部分转成字节数组。
2、KafkaProducer的构造
这时候就开始构造发消息的主入口,所有的功能都由kafkaProducer提供,只需要命令:
Producer<String,String> producer = new KafkaProducer<>(props);
创建producer的时候也可以把key和value序列化,如果序列化了,就不需要在properties的时候序列化。
3、producerRecord对象构造
下一步构造消息实例,java版本的producer使用producerRecord类来代表每条消息,创建producerRecord也很简单,指定topic和value,当然这里还可以指定pratition和key,值得注意的是,时间戳一定要谨慎使用,时间戳索引文件中索引项都是严格按照时间戳顺序,会导致该消息时间序列混乱,因此让kafka自行定义时间戳比较稳妥。
4、发送消息
Kafka producer发消息主要用send方法,虽然send只是两个简单方法签名,但是producer在底层完全实现了异步发送,并且使用java提供的future同时实现了同步发送 和 异步发送 +回调(callback)两种方式。网上教程大部分是那种send,就不管,专有名称叫fire and forget,发送就忘记,这种在实际场景中不被推荐使用,因为对于发送结果producer完全不知道,所以真实的使用场景中,同步和异步发送还是最常见的方法。
异步发送
实际上所有写入操作都是默认异步,java版本的producer和send方法会返回一个java 的future对象供用户稍后获取发送结果,这就是所谓回调机制。
ProducerRecord producerRecord = new ProducerRecord("kafka-boot", "foo" + i); ProducerFactory factory = kafkaTemplate.getProducerFactory(); Producer producer = factory.createProducer(); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { }else if(e instanceof RetriableException){ }else{ } } });
上面方法就是回调,实现了onCompletion方法,该方法两个参数metadata和exception,可以用if判断下,他们不会同时为null,也就是说至少有一个为null,当消息发送成功时候,exception为null,当消息发送失败的时候,metadata为null。
另外,上面的callback实际是java的接口,用户可以自定义callback实现类来处理消息发送后的逻辑,只需要实现org.apache.kafka.clients.producer.Callback接口即可。
同步发送
同步发送 和异步发送是通过future来区分的,调用future.get()无线等待结果返回,即实现同步发送效果。
使用future.get()会一直等待下去,直到kafka broker将返回结果给producer,当结果从broker处返回时get方法要么返回结果,要么抛出异常,由producer自行处理。如果没有错误,get将返回对应的recordMetada实例(包含已发送消息的所有元素),包括消息发送的topic,分区以及消息对应分区的位移信息。
不管同步发送还是异步发送都会发送失败的可能,导致返回异常错误,当前kafka的错误类型包含两类:可重试异常 和 不可重试异常。
常见可重试异常如下:
LeaderNotAvailableException:分区的leader副本不可用,通常出现在leader换届选举期间,通常是瞬时的异常,重试之后可以自行恢复。
NotControllerException:controller当前不可用,(后面会重点讲解controller),通常表面controller在选举,也可以重试恢复。
NetworkException:网络瞬时故障导致的异常。
对于这种可重试的异常,如果在 producer 程序中配置了重试次数,那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion exception 中。不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。
那么不可重试异常哪些呢:
RecordTooLargeException :发送的消息尺寸过大,超过了规定的大小上限 显然这种异常无论如何重试都是无法成功的。
SerializationException :序列化失败异常,这也是无法恢复的
KafkaException :其他类型的异常
所有这些不可重试异常 旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序,用户需要自行处理这些异常。由于不可重试异常和可重试异常在 producer 程序端可能有不同的处理逻辑,所以需要不同的区分。
5、关闭producer
程序结束一定要close,毕竟producer是占用系统资源的(比如创建了额外线程,申请了很多内存以及创建了socket连接等),因此必须要显式的调用kafkaProducer.close方法关闭producer。
如果只是普通的无参数调用close,则会等producer 会被允许先处理完之前的发送请求后再关闭,即所谓的“优雅”关闭退出( graceful shutdown) ;同时, KafkaProducer 还提供个带超时参数的 close 方法 close(timeout 如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求,在某种程度上,仿佛 producer端的程序丢失了要发送的消息。因此在实际场景中一定要谨慎使用带超时的 close 方法。