Kafka 客户端实现逻辑分析

简介:

这里主要分析kafka 客户端实现 (代码分析以perl kafka实现为准)

kafka客户端分为生产者和消费者,生产者发送消息,消费者获取消息.

在kafka协议里客户端通信中用到的最多的四个协议命令是fetch,fetchoffset,send,metadata.这四个分别是获取消息,获取offset,发送消息,获取metadata.剩下的其他协议命令大多都是kafka server内部通信用到的.offsetcommit这个命令在有些语言的client api的实现里给出了接口可以自己提交offset.但是在perl的实现里并没有.

先看看直接producer和consumer的代码

复制代码
复制代码
my $request = {
        ApiKey                              => $APIKEY_PRODUCE,
        CorrelationId                       => $self->{CorrelationId},
        ClientId                            => $self->{ClientId},
        RequiredAcks                        => $self->{RequiredAcks},
        Timeout                             => $self->{Timeout} * 1000,
        topics                              => [
            {
                TopicName                   => $topic,
                partitions                  => [
                    {
                        Partition           => $partition,
                        MessageSet          => $MessageSet,
                    },
                ],
            },
        ],
    };
 
    foreach my $message ( @$messages ) {
        push @$MessageSet, {
            Offset  => $PRODUCER_ANY_OFFSET,
            Key     => $key,
            Value   => $message,
        };
    }
 
    return $self->{Connection}->receive_response_to_request( $request, $compression_codec );
复制代码
复制代码

 

代码并未完全贴上.核心代码就这一部分.最后一行代码可以看见最终调用connection::receive_response_to_request函数.再上面的部分是设置消息格式.和消息内容的数据结构.

复制代码
复制代码
my $request = {
        ApiKey                              => $APIKEY_FETCH,
        CorrelationId                       => $self->{CorrelationId},
        ClientId                            => $self->{ClientId},
        MaxWaitTime                         => $self->{MaxWaitTime},
        MinBytes                            => $self->{MinBytes},
        topics                              => [
            {
                TopicName                   => $topic,
                partitions                  => [
                    {
                        Partition           => $partition,
                        FetchOffset         => $start_offset,
                        MaxBytes            => $max_size // $self->{MaxBytes},
                    },
                ],
            },
        ],
    };
 
    my $response = $self->{Connection}->receive_response_to_request( $request );
复制代码
复制代码

这是consumer的获取消息的核心部分代码.最后同producer一样.代码结构也相似.同样是设置消息数据结构然后发送.只是最后多了代码返回处理的部分.消息返回处理的部分就不再贴上详细说明了.有兴趣自行去cpan上看源代码.

下面看看最核心的函数代码.

复制代码
复制代码
sub receive_response_to_request {
    my ( $self, $request, $compression_codec ) = @_;
 
    local $Data::Dumper::Sortkeys = 1 if $self->debug_level;
 
    my $api_key = $request->{ApiKey};  //这里获取请求类型,是发送消息,还是获取消息和offset的.
 
# WARNING: The current version of the module limited to the following:
# supports queries with only one combination of topic + partition (first and only).
 
    my $topic_data  = $request->{topics}->[0];  //这些消息具体处理就略过不提了.
    my $topic_name  = $topic_data->{TopicName};
    my $partition   = $topic_data->{partitions}->[0]->{Partition};
 
    if (  //这里是比较关键的.判断是否有完整的metadata信息.没有metadata信息就通过fetch meta命令获取.
           !%{ $self->{_metadata} }         # the first request
        || ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )
    ) {
//updata_metadata函数就是封装了fetch metadata请求命令发送给kafka 来获取metadata信息.在这个地方处理不同语言里处理逻辑多少有些差别.php-kafka中有两种方式,一种通过这里的这个方法.另一种是通过zookeeper获取meta信息.在使用的时候需要指定zookeeper地址. $self->_update_metadata( $topic_name ) # hash metadata could be updated # FATAL error or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic = '%s'", $topic_name ) ); } my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec ); //这里将消息格式化成网络字节序. my $CorrelationId = $request->{CorrelationId} // _get_CorrelationId; say STDERR sprintf( '[%s] compression_codec = %d, request: %s', scalar( localtime ), $compression_codec // '<undef>', Data::Dumper->Dump( [ $request ], [ 'request' ] ) ) if $self->debug_level; my $attempts = $self->{SEND_MAX_ATTEMPTS}; my ( $ErrorCode, $partition_data, $server ); ATTEMPTS: while ( $attempts-- ) { //在while里进行发送尝试.java版客户端的三次尝试即是这里同样的逻辑 REQUEST: { $ErrorCode = $ERROR_NO_ERROR;
//这里差早topic分区对应的leader,成功则进行leader连接发送请求 if ( defined( my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader} ) ) { # hash metadata could be updated unless ( $server = $self->{_leaders}->{ $leader } ) { //没有找到对应leader的server就跳过此次请求尝试,更新metadata并进行下一次尝试 $ErrorCode = $ERROR_LEADER_NOT_FOUND; $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition ); last REQUEST; # go to the next attempt //在这里跳出主逻辑块.进行块之后的动作. } # Send a request to the leader if ( !$self->_connectIO( $server ) ) { //这里连接此topic分区的leader $ErrorCode = $ERROR_CANNOT_BIND; } elsif ( !$self->_sendIO( $server, $encoded_request ) ) { //这里向这个leader发送请求 $ErrorCode = $ERROR_CANNOT_SEND; } if ( $ErrorCode != $ERROR_NO_ERROR ) { //判断动作是否成功 $self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition ); last REQUEST; # go to the next attempt } my $response; //这里处理返回情况.如果发送的produce请求并且没有任何response返回.则构建一个空的response返回. if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) { # Do not receive a response, self-forming own response $response = { CorrelationId => $CorrelationId, topics => [ { TopicName => $topic_name, partitions => [ { Partition => $partition, ErrorCode => 0, Offset => $BAD_OFFSET, }, ], }, ], }; } else { //这里获取response.并从网络字节序转换成字符格式. my $encoded_response_ref; unless ( $encoded_response_ref = $self->_receiveIO( $server ) ) { if ( $api_key == $APIKEY_PRODUCE ) { # WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier # and there is no way to verify the delivery of data $ErrorCode = $ERROR_SEND_NO_ACK; # Should not be allowed to re-send data on the next attempt # FATAL error $self->_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error} ); } else { $ErrorCode = $ERROR_CANNOT_RECV; $self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition ); last REQUEST; # go to the next attempt } } if ( length( $$encoded_response_ref ) > 4 ) { # MessageSize => int32 $response = $protocol{ $api_key }->{decode}->( $encoded_response_ref ); say STDERR sprintf( '[%s] response: %s', scalar( localtime ), Data::Dumper->Dump( [ $response ], [ 'response' ] ) ) if $self->debug_level; } else { $self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED ); } } $response->{CorrelationId} == $CorrelationId # FATAL error or $self->_error( $ERROR_MISMATCH_CORRELATIONID ); $topic_data = $response->{topics}->[0]; $partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? 'PartitionOffsets' : 'partitions' }->[0]; if ( ( $ErrorCode = $partition_data->{ErrorCode} ) == $ERROR_NO_ERROR ) { return $response; } elsif ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) { $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition ); last REQUEST; # go to the next attempt } else { # FATAL error $self->_error( $ErrorCode, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) ); } } } # Expect to possible changes in the situation, such as restoration of connection say STDERR sprintf( '[%s] sleeping for %d ms before making request attempt #%d (%s)', scalar( localtime ), $self->{RETRY_BACKOFF}, $self->{SEND_MAX_ATTEMPTS} - $attempts + 1, $ErrorCode == $ERROR_NO_ERROR ? 'refreshing metadata' : "ErrorCode ${ErrorCode}", ) if $self->debug_level; sleep $self->{RETRY_BACKOFF} / 1000; $self->_update_metadata( $topic_name ) //最重要的逻辑在这里.可以看见上面失败则跳出REQUEST块,直接到这里执行更新动作.更新完之后再进行下一次尝试.这个逻辑应对着topic 分区的leader动态切换的.现有leader死了,切换到其他的leader上来.客户端能对此作出应对. # FATAL error or $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) ); } # FATAL error if ( $ErrorCode ) { $self->_error( $ErrorCode, format_message( "topic = '%s'%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : q{} ) ); } else { $self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) ); } return; }
复制代码
复制代码

上面主要分析核心逻辑实现.可以发现:

  consumer在消费的时候并没有手动提交过offset.也未设置groupId相关的配置,所以在消费的时候server其实并不是强制按group消费的,也不自动记录对应offset.只是按提交的offset返回对应的消息和下一个offset值而已.所以在kafka按组消费的功能其实是有各个客户端api实现的.在新版java的api中可以看见有autoCommitOffset的方法.在老版java api实现里也有autocommit的线程在替用户提交groupId与offset的记录.

producer和consumer的request里均需要指定topic分区.所以实际上在真正的api底层是没有对topic分区做负载的.一些具有负载功能的其他语言的api均由客户端内部实现.并非kafka server提供的功能.

分类: 消息中间件
 
 
本文转自左正博客园博客,原文链接: http://www.cnblogs.com/soundcode/p/7200495.html,如需转载请自行联系原作者
 
相关文章
|
8月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
604 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
8月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
259 12
|
8月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
848 5
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
313 4
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
197 2
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
243 1
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
368 1
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
220 3
|
消息中间件 负载均衡 Kafka
Kafka 实现负载均衡与故障转移:深入分析 Kafka 的架构特点与实践
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理和流传输设计的高性能消息系统。其核心设计注重高吞吐量、低延迟与可扩展性,并具备出色的容错能力。Kafka采用分布式日志概念,通过数据分区及副本机制确保数据可靠性和持久性。系统包含Producer(消息生产者)、Consumer(消息消费者)和Broker(消息服务器)三大组件。Kafka利用独特的分区机制实现负载均衡,每个Topic可以被划分为多个分区,每个分区可以被复制到多个Broker上,确保数据的高可用性和可靠性。
623 2

热门文章

最新文章