Kafka C++客户端库librdkafka笔记

简介: 目录 目录 1 1. 前言 2 2. 缩略语 2 3. 配置和主题 3 3.1. 配置和主题结构 3 3.1.1. Conf 3 3.1.2. ConfImpl 3 3.1.3. Topic 3 3.

目录

目录 1

1. 前言 2

2. 缩略语 2

3. 配置和主题 3

3.1. 配置和主题结构 3

3.1.1. Conf 3

3.1.2. ConfImpl 3

3.1.3. Topic 3

3.1.4. TopicImpl 3

4. 线程 4

5. 消费者 5

5.1. 消费者结构 5

5.1.1. Handle 5

5.1.2. HandleImpl 5

5.1.3. ConsumeCb 6

5.1.4. EventCb 6

5.1.5. Consumer 7

5.1.6. KafkaConsumer 7

5.1.7. KafkaConsumerImpl 7

5.1.8. rd_kafka_message_t 7

5.1.9. rd_kafka_msg_s 7

5.1.10. rd_kafka_msgq_t 8

5.1.11. rd_kafka_toppar_t 8

6. 生产者 10

6.1. 生产者结构 10

6.1.1. DeliveryReportCb 11

6.1.2. PartitionerCb 11

6.1.3. Producer 11

6.1.4. ProduceImpl 11

6.2. 生产者启动过程1 11

6.3. 生产者启动过程2 12

6.4. 生产者生产过程 14

7. poll过程 15

1. 前言

librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。

2. 缩略语

缩略语

缩略语全称

示例或说明

rd

Rapid Development

rd.h

rk

RdKafka

 

toppar

Topic Partition

struct rd_kafka_toppar_t

{

};

rep

Reply,

struct rd_kafka_t {

  rd_kafka_q_t *rk_rep

};

msgq

Message Queue

struct rd_kafka_msgq_t {

};

rkb

RdKafka Broker

Kafka代理

rko

RdKafka Operation

Kafka操作

rkm

RdKafka Message

Kafka消息

payload

 

存在Kafka上的消息(或叫Log)

3. 配置和主题

3.1. 配置和主题结构

abab2dd2789ee543b6c0a740673d0244b335e13a 

3.1.1. Conf

配置接口,配置分两种:全局的和主题的。

3.1.2. ConfImpl

配置的实现。

3.1.3. Topic

主题接口。

3.1.4. TopicImpl

主题的实现。

4. 线程

RdKafka编程涉及到三类线程:

1) 应用线程,业务代码的实现

2) Kafka Broker线程rd_kafka_broker_thread_main,负责与Broker通讯,多个

3) Kafka Handler线程rd_kafka_thread_main,每创建一个consumerproducer即会创建一个Handler线程。

2891d432b66dfb7c620f22fc02be9f492b91239d 

5. 消费者

5.1. 消费者结构

42a12ec4eeb9ce9ea7258be248e6c28de1b86c82 

5.1.1. Handle

定义了poll等接口,它的实现者为HandleImpl

5.1.2. HandleImpl

实现了消费者和生产者均使用的poll等,其中poll的作用为:

1) 为生产者回调消息发送结果;

2) 为生产者和消费者回调事件。

class Handle {

  /**

   * @brief Polls the provided kafka handle for events.

   *

   * Events will trigger application provided callbacks to be called.

   *

   * The \p timeout_ms argument specifies the maximum amount of time

   * (in milliseconds) that the call will block waiting for events.

   * For non-blocking calls, provide 0 as \p timeout_ms.

   * To wait indefinately for events, provide -1.

   *

   * Events:

   *   - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]

   *   - event callbacks (if an RdKafka::EventCb is configured) [producer consumer]

   *

   * @remark  An application should make sure to call poll() at regular

   *          intervals to serve any queued callbacks waiting to be called.

   *

   * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer,

   *          use its RdKafka::KafkaConsumer::consume() instead.

   *

   * @returns the number of events served.

   */

  virtual int poll(int timeout_ms) = 0;

};

5.1.3. ConsumeCb

只针对消费者的Callback

5.1.4. RebalanceCb

只针对消费者的Callback

5.1.5. EventCb

消费者和生产者均可设置EventCb,如:_global_conf->set("event_cb", &_event_cb, errmsg);

/**

 * @brief Event callback class

 *

 * Events are a generic interface for propagating errorsstatisticslogs, etc

 * from librdkafka to the application.

 *

 * @sa RdKafka::Event

 */

class RD_EXPORT EventCb {

 public:

  /**

   * @brief Event callback

   *

   * @sa RdKafka::Event

   */

  virtual void event_cb (Event &event) = 0;

 

  virtual ~EventCb() { }

};

 

/**

 * @brief Event object class as passed to the EventCb callback.

 */

class RD_EXPORT Event {

 public:

  /** @brief Event type */

  enum Type {

    EVENT_ERROR,     /**< Event is an error condition */

    EVENT_STATS,     /**< Event is a statistics JSON document */

    EVENT_LOG,       /**< Event is a log message */

    EVENT_THROTTLE   /**< Event is a throttle level signaling from the broker */

  };

};

5.1.6. Consumer

简单消息者,一般不使用,而是使用KafkaConsumer

5.1.7. KafkaConsumer

消费者和生产者均采用多重继承方式,其中KafkaConsumer为消费者接口,KafkaConsumerImpl为消费者实现。

5.1.8. KafkaConsumerImpl

KafkaConsumerImpl为消费者实现。

5.1.9. rd_kafka_message_t

消息结构。

5.1.10. rd_kafka_msg_s

消息结构,但消息数据实际存储在rd_kafka_message_t,结构大致如下:

struct rd_kafka_msg_s

{

  rd_kafka_message_t rkm_rkmessage;

  struct

  {

    rd_kafka_msg_s* tqe_next;

    rd_kafka_msg_s** tqe_prev;

    int64_t rkm_timestamp;

    rd_kafka_timestamp_type_t rkm_tstype;

  }rkm_link;

};

5.1.11. rd_kafka_msgq_t

存储消息的消息队列,生产者生产的消息并不直接socket发送到brokers,而是放入了这个队列,结构大致如下:

struct rd_kafka_msgq_t

{

  struct

  {

    rd_kafka_msg_s* tqh_first; // 队首

    rd_kafka_msg_s* tqh_last;  // 队尾

  };

  

  // 消息个数

  rd_atomic32_t rkmq_msg_cnt;

  // 所有消息加起来的字节数

  rd_atomic64_t rkmq_msg_bytes;

};

5.1.12. rd_kafka_toppar_t

Topic-Partition队列,很复杂的一个结构,部分内容如下:

// Topic + Partition combination

typedef struct rd_kafka_toppar_s

{

  struct

  {

    rd_kafka_toppar_s* tqe_next;

    rd_kafka_toppar_s** tqe_prev;

  }rktp_rklink;

 

  struct

  {

    rd_kafka_toppar_s* tqe_next;

    rd_kafka_toppar_s** tqe_prev;

  }rktp_rkblink;

  

  struct

  {

    rd_kafka_toppar_s* cqe_next;

    rd_kafka_toppar_s* cqe_prev;

  }rktp_fetchlink;

  

  struct

  {

    rd_kafka_toppar_s* tqe_next;

    rd_kafka_toppar_s** tqe_prev;

  }rktp_rktlink;

  

  struct

  {

    rd_kafka_toppar_s* tqe_next;

    rd_kafka_toppar_s** tqe_prev;

  }rktp_cgrplink;

  

  rd_kafka_itopic_t* rktp_rkt;

  int32_t rktp_partition;

  int32_t rktp_leader_id;

  rd_kafka_broker_t* rktp_leader;

  rd_kafka_broker_t* rktp_next_leader;

  rd_refcnt_t rktp_refcnt;

  rd_kafka_msgq_t rktp_msgq; // application->rdkafka queue

}rd_kafka_toppar_t;

6. 生产者

6.1. 生产者结构

d3495ea58884d20f17d0ea44ea2bcf9df51faac6 

6.1.1. DeliveryReportCb

消息已经成功递送到Broker时回调,只针对生产者有效。

6.1.2. PartitionerCb

计算分区号回调函数,只针对生产者有效。

6.1.3. Producer

Producer为生产者接口,它的实现者为ProducerImpl

6.1.4. ProduceImpl

ProducerImpl为生产者的实现。

6.2. 生产者启动过程1

启动时会创建两组线程:一组Broker线程(rd_kafka_broker_thread_main,多个),实为与Broker间的网络IO线程;一组Handler线程(rd_kafka_thread_main,单个),每调用一次RdKafka::Producer::createrd_kafka_new即创建一Handler线程

f04ae942c2b84f064c4a25a8227dd02bb0f0d024
Handler线程调用栈:

(gdb) t 17

[Switching to thread 17 (Thread 0x7ff7059d3700 (LWP 16765))]

#0  0x00007ff7091e6cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0

(gdb) bt

#0  0x00007ff7091e6cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0

#1  0x00000000005b4d2f in cnd_timedwait_ms (cnd=0x1517748, mtx=0x1517720, timeout_ms=898) at tinycthread.c:501

#2  0x0000000000580e16 in rd_kafka_q_serve (rkq=0x1517720, timeout_ms=898, max_cnt=0, cb_type=RD_KAFKA_Q_CB_CALLBACK, callback=0x0, opaque=0x0) at rdkafka_queue.c:440

#3  0x000000000054ee9b in rd_kafka_thread_main (arg=0x1516df0) at rdkafka.c:1227

#4  0x00000000005b4e0f in _thrd_wrapper_function (aArg=0x15179d0) at tinycthread.c:624

#5  0x00007ff7091e2e25 in start_thread () from /lib64/libpthread.so.0

#6  0x00007ff7082d135d in clone () from /lib64/libc.so.6

6.3. 生产者启动过程2

创建网络IO线程,消费者启动过程类似,只是一个调用rd_kafka_broker_producer_serve(rkb),另一个调用rd_kafka_broker_consumer_serve(rkb)

IO线程负责消息的收和发,发送底层调用的是sendmsg,收调用的是recvmsg(但MSVC平台调用sendrecv)。

6d238f61818e6212d2b91e653ac9ed1d32b93282 

6.4. 生产者生产过程

 40d740dfc54adbdb1745c8d26e624d8b91c969f8

生产者生产的消息并不直接socket发送到brokers,而是放入队列rd_kafka_msgq_t中。Broker线程(rd_kafka_broker_thread_main)消费这个队列。

Broker线程同时监控与Broker间的网络连接,又要监控队列中是否有数据,如何实现的?这个队列和管道绑定在一起的,绑定的是管道写端(rktp->rktp_msgq_wakeup_fd = rkb->rkb_toppar_wakeup_fd; rkb->rkb_toppar_wakeup_fd=rkb->rkb_wakeup_fd[1])。

这样Broker线程即可同时监听网络数据和管道数据。

// int rd_kafka_msg_partitioner(rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,int do_lock)

(gdb) p *rkm

$7 = {rkm_rkmessage = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x1590c10, partition = 1, payload = 0x7f48c4001260, len = 203, key = 0x7f48c400132b, key_len = 14, offset = 0, 

    _private = 0x0}, rkm_link = {tqe_next = 0x5b5d47554245445b, tqe_prev = 0x6361667265746e69}, rkm_flags = 196610, rkm_timestamp = 1524829399009, 

  rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME, rkm_u = {producer = {ts_timeout = 16074575505526, ts_enq = 16074275505526}}}

(gdb) p rkm->rkm_rkmessage

$8 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x1590c10, partition = 1, payload = 0x7f48c4001260, len = 203, key = 0x7f48c400132b, key_len = 14, offset = 0, _private = 0x0}

(gdb) p rkm->rkm_rkmessage->payload

$9 = (void *) 0x7f48c4001260

(gdb) p (char*)rkm->rkm_rkmessage->payload

$10 = 0x7f48c4001260 "{\"p\":\"f\",\"o\":1,\"d\":\"m\",\"d\":\"m\",\"i\":\"f2\",\"ip\":\"127.0.0.1\",\"pt\":2018,\"sc\":0,\"fc\":1,\"tc\":0,\"acc\":395,\"mcc\":395,\"cd\":\"test\",\"cmd\":\"tester\",\"cf\":\"main\",\"cp\":\"1.49.16.9"...

7. poll过程

poll的作用是触发回调,生产者即使不调用poll,消息也会发送出去,但是如果不通过poll触发回调,则不能确定消息发送状态(成功或失败等)。

消费队列rd_kafka_t->rk_reprk_rep为响应队列,类型为rd_kafka_q_trd_kafka_q_s

 cd52a58f5b8d1d02c9ce2e53926d7e06c675e917

 

 

相关文章
|
4天前
|
消息中间件 存储 开发工具
消息队列 MQ产品使用合集之C++如何使用Paho MQTT库进行连接、发布和订阅消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3天前
|
存储 安全 Linux
网络请求的高效处理:C++ libmicrohttpd库详解
网络请求的高效处理:C++ libmicrohttpd库详解
|
1天前
|
存储 算法 程序员
C++基础知识(八:STL标准库(Vectors和list))
C++ STL (Standard Template Library标准模板库) 是通用类模板和算法的集合,它提供给程序员一些标准的数据结构的实现如 queues(队列), lists(链表), 和 stacks(栈)等. STL容器的提供是为了让开发者可以更高效率的去开发,同时我们应该也需要知道他们的底层实现,这样在出现错误的时候我们才知道一些原因,才可以更好的去解决问题。
|
1天前
|
算法 前端开发 C++
C++基础知识(八:STL标准库 deque )
deque在C++的STL(Standard Template Library)中是一个非常强大的容器,它的全称是“Double-Ended Queue”,即双端队列。deque结合了数组和链表的优点,提供了在两端进行高效插入和删除操作的能力,同时保持了随机访问的特性。
|
1天前
|
存储 C++ 索引
C++基础知识(八:STL标准库 Map和multimap )
C++ 标准模板库(STL)中的 map 容器是一种非常有用的关联容器,用于存储键值对(key-value pairs)。在 map 中,每个元素都由一个键和一个值组成,其中键是唯一的,而值则可以重复。
|
6天前
|
消息中间件 监控 安全
Kafka客户端工具:Offset Explorer 使用指南
Kafka客户端工具:Offset Explorer 使用指南
15 0
|
8天前
|
域名解析 网络协议 程序员
程序员必知:【转】adns解析库——域名解析实例(C++、linux)
程序员必知:【转】adns解析库——域名解析实例(C++、linux)
14 0
|
18天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
18天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
17天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
775 0