深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

前面在深入理解kafka中提到的只是理论上的设计原理,


本篇讲得是基于c语言的kafka库的程序编写!!!!!


首先要编写生产者的代码,得先知道生产者的逻辑在代码上是怎么体现的


1.kafka生产者的逻辑

4.png

怎么理解呢?


我们在实例化生产者对象之前的话,肯定是要对一些参数进行配置,


比如下面介绍的conf这些


那么 配置完参数之后,就是创建生产者实例,那么实例化生产者之后,就是准备生产者


生产消息,那么我们在生产者生产消息的时候,肯定要初始化和构建消息对象发过去


因为用对象的方式去管理消息更容易拓展和后期进行维护和管理以及消费者读取消息也


不容易出错,那么构建完消息对象之后,那么就需要将消息对象交给生产者,让生产者


生产到指定的kafka的topic中的消息队列(也就是topic中的partition分区中,因为每个


分区都是独立的队列),生产到消息队列就是发送消息,到了消息队列就等消费者进行消费了,


如果不需要生产者了,那么就可以关闭该生产者了


配置参数:


在实例化生产者对象之前,你需要配置生产者的参数。这一般通过创建一个 RdKafka::Conf 对象,并使用 set 方法为其设置各种配置选项。这些配置选项可以包括 Kafka 服务器的地址、消息传递语义(例如,至少一次交付、精确一次交付等)、序列化器、分区器等。这个 Conf 对象可以在实例化生产者时传递给构造函数。


示例:

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "your_kafka_broker");
// 设置其他配置项...
RdKafka::Producer::create(conf);

实例化生产者:


  • 利用配置好的参数,创建 Kafka 生产者实例。这是通过调用 RdKafka::Producer::create() 函数实现的。传递配置对象作为参数,确保生产者在创建时拥有所需的配置。


构建消息对象:


  • 在生产者准备好之后,你可以构建消息对象。这通常包括指定消息的主题、键、内容等信息。RdKafka::Message类提供了相应的接口来设置这些消息属性。
RdKafka::Producer *producer = RdKafka::Producer::create(conf);
RdKafka::Message *msg = RdKafka::Message::create();
msg->set_payload("Your message payload");
msg->set_topic("your_topic");
// 设置其他消息属性...
// 生产者会在生产消息时拥有这个消息对象
producer->produce(msg);

生产消息:


  • 调用生产者的produce 方法发送消息到 Kafka 集群。在这一步,消息将被放入生产者内部的缓冲区,然后异步发送到 Kafka 服务器。produce 方法会返回一个错误码,你可以通过检查这个错误码来了解消息发送的状态。


轮询:


  • 为了确保消息的投递报告RdKafka::DeliveryReportCb回调被调用,你需要定期调用 RdKafka::poll()。这个操作通常在一个独立的线程中完成,以确保消息报告的及时处理。
producer->poll(0);  // 参数表示非阻塞 poll

投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


关闭生产者:


当生产者不再需要时,通过调用 delete 释放资源。在释放资源之前,你可能需要调用 flush 确保所有挂起的消息都已经被发送。

producer->flush(10000);  // 等待最多 10 秒钟
delete producer;

2.kafka的C++API


2.1 RdKafka::Conf   可以理解为上诉逻辑中的配置客户端参数

enum ConfType{ 
  CONF_GLOBAL,  // 全局配置 
  CONF_TOPIC    // Topic配置 
};
enum ConfResult{ 
  CONF_UNKNOWN = -2, 
  CONF_INVALID = -1, 
  CONF_OK = 0 
};
CONF_UNKNOWN: 表示配置未知,可能是因为没有进行相关的验证或检查。
CONF_INVALID: 表示配置无效,可能是由于配置值不符合期望的范围或格式。
CONF_OK: 表示配置有效,通过了验证。

这些接口不用全记住,收藏并关注就行,忘了的就来回忆一下!!!记住主要的就行

static Conf * create(ConfType type);
//创建配置对象。
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
//设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
//设置dr_cb属性值。
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
//设置event_cb属性值。
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
//设置用于自动订阅Topic的默认Topic配置。
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
//设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
//设置partitioner_key_pointer_cb属性值。
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
//设置socket_cb属性值。
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
//设置open_cb属性值。
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
//设置rebalance_cb属性值。
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
//设置offset_commit_cb属性值。
Conf::ConfResult get(const std::string &name, std::string &value) const;
//查询单条属性配置值。

2.2 RdKafka::Message


Message表示一条消费或生产的消息,或是事件。 这个可以理解为生产逻辑中的构建消息对象


下面是基于Message对象的接口(有些内容都封装在message里):

std::string errstr() const;
//如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
ErrorCode err() const;
//如果消息是一条错误事件,返回错误代码,否则返回0。
Topic * topic() const;
//返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
std::string topic_name() const;
//返回消息的Topic名称。
int32_t partition() const;
//如果分区可用,返回分区号。
void * payload() const;
//返回消息数据。
size_t len() const;
//返回消息数据的长度。
const std::string * key() const;
//返回字符串类型的消息key。
const void * key_pointer() const;
//返回void类型的消息key。
size_t key_len() const;
//返回消息key的二进制长度。
int64_t offset () const;
//返回消息或错误的位移。
void * msg_opaque() const;
//返回RdKafka::Producer::produce()提供的msg_opaque。
virtual MessageTimestamp timestamp() const = 0;
//返回消息时间戳。
virtual int64_t latency() const = 0;
//返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
virtual struct rd_kafka_message_s *c_ptr () = 0;
//返回底层数据结构的C rd_kafka_message_t句柄。
virtual Status status () const = 0;
//返回消息在Topic Log的持久化状态。
virtual RdKafka::Headers *headers () = 0;
//返回消息头。
virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
//返回消息头,错误信息会输出到err。

主要:


在 RdKafka::Message 中,最主要和常用的成员函数和属性包括:


1.err(): 通过这个函数可以获取消息的错误码,用于检查消息在生产或消费过程中是否发生了错误。


2.len(): 返回消息的长度,表示消息体的字节数。


3.payload(): 提供对消息实际内容(有效负载)的访问。


4.topic_name(): 返回消息所属的主题名称。


5.partition(): 返回消息所在的分区编号。


6.offset(): 返回消息的偏移量,表示消息在分区中的位置。


这些成员函数和属性通常是处理 Kafka 消息时最重要的信息。通过这些信息,你可以检查消息的状态,了解消息的来源和内容,以及在消费者端追踪消息的位置。其他的一些属性,比如 key(),用于获取消息的键,是可选的,取决于消息是否包含键。


2.3 RdKafka::DeliveryReportCb


每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。


为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。


投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


投递报告函数起着以下作用:


  1. 确认消息是否成功发送: 一旦消息被生产者发送到 Kafka 服务器,投递报告函数被调用。这允许你知道消息是否已经成功到达服务器。


  1. 追踪消息传递状态: 投递报告函数提供了有关消息传递状态的信息。通过检查消息的错误码(通过 RdKafka::Message::err() 获取),你可以了解消息是否成功投递到分区,以及可能的错误原因,比如消息发送超时、分区不存在等等。


  1. 确保消息处理: 这个回调函数可以帮助你确保消息得到了处理,无论是成功发送还是出现了一些错误。通过错误码,你可以适当地处理消息发送过程中的问题,例如重试、记录错误日志或者执行其他补救措施。


在整个流程中,投递报告函数是为了提供消息传递的状态和结果。它允许你追踪消息的处理情况,确保消息被成功地发送到了 Kafka 服务器,并且在出现问题时能够及时地得到通知和处理。因此,在实际的生产环境中,及时处理这个回调函数非常重要,以保证消息的可靠传递

virtual void dr_cb(Message &message)=0;

纯虚函数,需要继承来重写


当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。


C++封装示例:

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
  void dr_cb(RdKafka::Message &message)
  {
    if(message.err())
      std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
    else
    {
      // Message delivered to topic test [0] at offset 135000
      std::cerr << "Message delivered to topic " << message.topic_name()
        << " [" << message.partition() << "] at offset "
        << message.offset() << std::endl;
    }
  }
};

2.4 RdKafka::Event


事件对象


是一个用于表示Kafka事件的类,它封装了与事件相关的信息。在你的描述中,列举了事件的不同类型,如EVENT_ERROREVENT_STATSEVENT_LOGEVENT_THROTTLE。每个事件都有相应的属性和方法来获取事件的类型、错误代码、日志信息等。

enum Type{ 
  EVENT_ERROR, //错误条件事件 
  EVENT_STATS, // Json文档统计事件 
  EVENT_LOG, // Log消息事件 
  EVENT_THROTTLE // 来自Broker的throttle级信号事件 
};
virtual Type type() const =0;
//返回事件类型。
virtual ErrorCode err() const =0;
//返回事件错误代码。
virtual Severity severity() const =0;
//返回log严重级别。
virtual std::string fac() const =0;
//返回log基础字符串。
virtual std::string str () const =0;
//返回Log消息字符串。
virtual int throttle_time() const =0;
//返回throttle时间。
virtual std::string broker_name() const =0;
//返回Broker名称。
virtual int broker_id() const =0;
//返回Broker ID。

1.type(): 返回事件的类型。类型包括错误条件事件(EVENT_ERROR)、JSON文档统计事件(EVENT_STATS)、日志消息事件(EVENT_LOG)以及来自Broker的throttle级信号事件(EVENT_THROTTLE)。


2.err(): 返回事件的错误代码,如果事件类型是错误条件事件。


3.severity(): 返回日志消息的严重级别。


4.fac(): 返回日志消息的基础字符串。


5.str(): 返回日志消息的字符串。


6.throttle_time(): 如果事件类型是throttle级信号事件,返回throttle的时间。


7.broker_name(): 返回与事件相关联的Broker的名称。


8.broker_id(): 返回与事件相关联的Broker的ID。


2.5 RdKafka::EventCb


事件回调


一个抽象基类,它定义了一个事件回调函数,用于处理RdKafka::Event


事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

virtual void event_cb(Event &event)=0; //  事件回调函数

纯虚函数,需要继承来重写


C++封装示例:

class ProducerEventCb : public RdKafka::EventCb
{
public:
    void event_cb(RdKafka::Event &event)
    {
        switch(event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
            break;
        case RdKafka::Event::EVENT_THROTTLE:
            std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
            break;
        }
    }
};

这个回调函数的作用是在发生Kafka事件时被调用,将相应的RdKafka::Event对象传递给应用程序。应用程序可以实现自己的RdKafka::EventCb子类,然后在这个子类中实现event_cb方法,以处理具体的事件。这样,当有错误、统计信息、日志或来自Broker的throttle级信号事件发生时,那么逻辑就变成如下


1.配置参数: 你首先配置好生产者的参数,这包括Kafka集群的地址、topic的配置等。


2.创建生产者实例: 使用配置好的参数创建一个生产者实例。


3.准备生产者: 在生产消息之前,你可能需要进行一些准备工作,比如初始化和构建消息对象。


4.生产消息: 将构建好的消息对象交给生产者,让生产者将消息发送到指定的Kafka topic中。


5.处理事件: 这就是上述RdKafka::Event和RdKafka::EventCb的作用了。在生产者的生命周期中,可能会发生一些异步事件,如错误、日志信息等。通过设置RdKafka::EventCb,你可以在相应的事件发生时得到通知,从而执行你自己的处理逻辑。


6.关闭生产者: 如果不再需要生产者,记得关闭它以释放资源。


下面是示例:

class MyEventCallback : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) override {
        // 处理事件的逻辑
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                // 处理错误事件
                break;
            case RdKafka::Event::EVENT_STATS:
                // 处理统计信息事件
                break;
            // 可以处理其他类型的事件
            default:
                break;
        }
    }
};
int main() {
    // 配置生产者参数
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    // 设置事件回调
    MyEventCallback eventCallback;
    conf->set("event_cb", &eventCallback, errstr);
    // 创建生产者实例
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        // 处理生产者创建失败的情况
        return 1;
    }
    // 准备生产者...
    // 生产消息...
    // 处理事件...
    // 关闭生产者
    delete producer;
    delete conf;
    return 0;
}


深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(下):

https://developer.aliyun.com/article/1393801?spm=a2c6h.24874632.expert-profile.10.6af22f31AcelSW


相关文章
|
23天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
53 2
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
36 4
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
33 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
35 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
74 0
|
2月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
70 3
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
下一篇
无影云桌面