Kafka解惑之Old Producer(1)—— Beginning-阿里云开发者社区

开发者社区> 朱小厮> 正文

Kafka解惑之Old Producer(1)—— Beginning

简介: 欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。 众所周知,目前Kafka的最新版本已经到达1.0.0,很多公司运行的kafka也大多升级到了0.10.
+关注继续查看

欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。


众所周知,目前Kafka的最新版本已经到达1.0.0,很多公司运行的kafka也大多升级到了0.10.x版本,Kafka的Producer客户端早已不再使用0.8.2.x就已基本停止维护的Scala版本的Producer了,那么我们还有必要了解它么?当然很有必要,通过Kafka Old Producer我们可以了解Kafka变迁升级的历史:旧版的Old Producer模型相对简单利于初始了解,通过对Old Producer的了解也可以慢慢的发现隐患的问题,这样进一步可以研究探讨解决方法,最后再通过对新版Producer的学习来提升对Kafka的认知,与此同时也可以让读者在遇到相似问题的时候可以借鉴Kafka的优化过来来优化自己的应用。以铜为鉴,可以正衣冠。

在使用Scala版本的Kafka生产者客户端kafka.javaapi.producer.Producer时,实际上调用的是kafka.producer.Producer类。

package kafka.javaapi.producer
class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {
  def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }
  def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }
  def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }
  def close : scala.Unit = { /* compiled code */ }
}

包括kafka-console-producer.sh的脚本(常用来测试发送消息之用)中,对于0.8.2.x版本如果不指定“– new-producer”参数;或者对于.0.0版本如果指定“– old-producer”参数的话,实际上内部调用的都是kafka.producer.Producer这个类。

对于kafka-console-producer.sh脚本的内容如下:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

我们看到实际上kafka-console-producer.sh的内容就是运行kafka.tools.ConsoleProducer而已,可以看到main函数代码块中的config.useOldProducer,这个笔者看的是1.0.0版本的代码,而0.8.2.2版本中的ConsoleProducer对应的是config.useNewProducer,稍有不同而已,不过如果都指定使用旧版的Scala的Producer,那么都是指kafka.producer.OldProducer。

object ConsoleProducer {
  def main(args: Array[String]) {
    try {
        val config = new ProducerConfig(args)
        val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
        reader.init(System.in, getReaderProps(config))

        val producer =
          if(config.useOldProducer) {
            new OldProducer(getOldProducerProps(config))
          } else {
            new NewShinyProducer(getNewProducerProps(config))
          }

进一步剖析,kafka.producer.OldProducer的内部构造很简单,关键代码如下:

class OldProducer(producerProps: Properties) extends BaseProducer {
  // default to byte array partitioner
  if (producerProps.getProperty("partitioner.class") == null)
    producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
  val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))

可以看到内部的producer最终还是实例化的kafka.producer.Producer。最终验证了开篇所述的旧版的Kafka生产者客户端即为Kafka.producer.Producer。

新版的Java版的Kafka客户端是:org.apache.kafka.clients.producer.KafkaProducer,读者请注意区分。对于新版的KafkaProducer在以后的文章中会有详细介绍。

下面就来深入了解下Kafka.producer.Producer(下面如无特殊说明都将Kafka.producer.Producer此简称为Producer)了。当实例化Producer的时候,首先要读取、解析以及校验配置信息的合法性,根据配置信息来实例化Producer。Producer的配置项有18个,比如设置分区器、消息压缩方式等,这些都比较好理解,而最主要的配置就是request.required.acks和producer.type这两个配置。

request.required.acks是用来配置生产端消息确认的方式,在0.8.x这个系列的版本之中,可以配置为0,1,-1的值,也可以配置为其他的整数值,用来控制一条消息经由多少个ISR中的副本所在的Broker确认之后才向客户端发送确认信息,这个参数在之后的版本,比如1.0.0版本中就只能设置0,1,-1(all)这3(4)种取值,分别表示:
1. 当request.required.acks=0时,这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
2. 当request.required.acks=1(默认)时,这意味着producer在ISR中的leader已成功收到数据并得到确认。如果leader宕机了,则会丢失数据。
3. 当request.required.acks=-1时,producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。为了提高数据的可靠性,可以通过min.insync.replicas参数来辅助作用,当同步副本数不足时,生产者会跑出异常。

有关kafka的消息可靠性的更深层次的讲解可以参考我2017年初的一篇博客:kafka数据可靠性深度解读,这篇博客主要是针对0.8.2.x版本的kafka做深层次的探讨,后续会对1.0.0版本做进一步的说明。

Producer的发送模式分为同步(sync)和异步(async)两种情况,这一点可以通过参数producer.type来配置。同步模式会将消息直接发往broker中,而异步模式则会将消息存入LinkedBlockingQueue中,然后通过一个ProducerSendThread来专门发送消息。为了便于说明,笔者这里先对同步模式的情况来做说明,而异步模式只是在同步模式的基础上做了一些封装而已。

class Producer[K,V](val config: ProducerConfig,
                    private val eventHandler: EventHandler[K,V])  // only for unit testing
  extends Logging {

  private val hasShutdown = new AtomicBoolean(false)
  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

  private var sync: Boolean = true
  private var producerSendThread: ProducerSendThread[K,V] = null
  private val lock = new Object()

  config.producerType match {
    case "sync" =>
    case "async" =>
      sync = false
      producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                       queue,
                                                       eventHandler,
                                                       config.queueBufferingMaxMs,
                                                       config.batchNumMessages,
                                                       config.clientId)
      producerSendThread.start()
  }

在讲述Producer的具体行为之前先来看一个发送方的Demo:

public class ProducerScalaDemo {
    public static final String brokerList = "xxx.xxx.xxx.xxx:9092";
    public static final String topic = "topic-zzh";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", brokerList);
        properties.put("producer.type", "sync");
        properties.put("request.required.acks", "1");

        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));

        String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";
        KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);
        producer.send(keyedMessage);
    }
}

我们可以看到再初始化Producer的时候之用了ProducerConfig这一个类型的参数,而在Producer的类定义中还用到了EventHandler这个类型的参数。在Scala语言中只有一个主构造函数,这个主构造函数的参数列表就是跟在类名后面括号中的各个的参数,如果要重载的话就需要自定义辅助构造函数,辅助构造函数必须调用主构造函数(this方法)。如此上面这个Demo中很显然的就调用了辅助构造函数来进行实例化,那么我们再来看下其对应的辅助构造函数:

def this(config: ProducerConfig) =
  this(config,
       new DefaultEventHandler[K,V](config,
                                    CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
                                    CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
                                    CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
                                    new ProducerPool(config)))

这里又引入了两个新的东西:DefaultEventHandler和ProducerPool,这个DefaultEventHandler继承了EventHandler这个类,这个是消息发送的关键。而ProducerPool内部是一个HashMap,其中的key是broker的id,而value就是每个broker对应的SyncProducer,这个SyncProducer就是真正的消息发送者。


欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
使用NAT网关轻松为单台云服务器设置多个公网IP
在应用中,有时会遇到用户询问如何使单台云服务器具备多个公网IP的问题。 具体如何操作呢,有了NAT网关这个也不是难题。
26770 0
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
10004 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13807 0
windows server 2008阿里云ECS服务器安全设置
最近我们Sinesafe安全公司在为客户使用阿里云ecs服务器做安全的过程中,发现服务器基础安全性都没有做。为了为站长们提供更加有效的安全基础解决方案,我们Sinesafe将对阿里云服务器win2008 系统进行基础安全部署实战过程! 比较重要的几部分 1.
9147 0
腾讯云服务器 设置ngxin + fastdfs +tomcat 开机自启动
在tomcat中新建一个可以启动的 .sh 脚本文件 /usr/local/tomcat7/bin/ export JAVA_HOME=/usr/local/java/jdk7 export PATH=$JAVA_HOME/bin/:$PATH export CLASSPATH=.
4650 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
7328 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
4476 0
+关注
朱小厮
主要从事消息中间件的相关研发工作,著有《RabbitMQ实战指南》。
210
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载