深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

前面在深入理解kafka中提到的只是理论上的设计原理,


本篇讲得是基于c语言的kafka库的程序编写!!!!!


首先要编写生产者的代码,得先知道生产者的逻辑在代码上是怎么体现的


1.kafka生产者的逻辑

4.png

怎么理解呢?


我们在实例化生产者对象之前的话,肯定是要对一些参数进行配置,


比如下面介绍的conf这些


那么 配置完参数之后,就是创建生产者实例,那么实例化生产者之后,就是准备生产者


生产消息,那么我们在生产者生产消息的时候,肯定要初始化和构建消息对象发过去


因为用对象的方式去管理消息更容易拓展和后期进行维护和管理以及消费者读取消息也


不容易出错,那么构建完消息对象之后,那么就需要将消息对象交给生产者,让生产者


生产到指定的kafka的topic中的消息队列(也就是topic中的partition分区中,因为每个


分区都是独立的队列),生产到消息队列就是发送消息,到了消息队列就等消费者进行消费了,


如果不需要生产者了,那么就可以关闭该生产者了


配置参数:


在实例化生产者对象之前,你需要配置生产者的参数。这一般通过创建一个 RdKafka::Conf 对象,并使用 set 方法为其设置各种配置选项。这些配置选项可以包括 Kafka 服务器的地址、消息传递语义(例如,至少一次交付、精确一次交付等)、序列化器、分区器等。这个 Conf 对象可以在实例化生产者时传递给构造函数。


示例:

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "your_kafka_broker");
// 设置其他配置项...
RdKafka::Producer::create(conf);

实例化生产者:


  • 利用配置好的参数,创建 Kafka 生产者实例。这是通过调用 RdKafka::Producer::create() 函数实现的。传递配置对象作为参数,确保生产者在创建时拥有所需的配置。


构建消息对象:


  • 在生产者准备好之后,你可以构建消息对象。这通常包括指定消息的主题、键、内容等信息。RdKafka::Message类提供了相应的接口来设置这些消息属性。
RdKafka::Producer *producer = RdKafka::Producer::create(conf);
RdKafka::Message *msg = RdKafka::Message::create();
msg->set_payload("Your message payload");
msg->set_topic("your_topic");
// 设置其他消息属性...
// 生产者会在生产消息时拥有这个消息对象
producer->produce(msg);

生产消息:


  • 调用生产者的produce 方法发送消息到 Kafka 集群。在这一步,消息将被放入生产者内部的缓冲区,然后异步发送到 Kafka 服务器。produce 方法会返回一个错误码,你可以通过检查这个错误码来了解消息发送的状态。


轮询:


  • 为了确保消息的投递报告RdKafka::DeliveryReportCb回调被调用,你需要定期调用 RdKafka::poll()。这个操作通常在一个独立的线程中完成,以确保消息报告的及时处理。
producer->poll(0);  // 参数表示非阻塞 poll

投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


关闭生产者:


当生产者不再需要时,通过调用 delete 释放资源。在释放资源之前,你可能需要调用 flush 确保所有挂起的消息都已经被发送。

producer->flush(10000);  // 等待最多 10 秒钟
delete producer;

2.kafka的C++API


2.1 RdKafka::Conf   可以理解为上诉逻辑中的配置客户端参数

enum ConfType{ 
  CONF_GLOBAL,  // 全局配置 
  CONF_TOPIC    // Topic配置 
};
enum ConfResult{ 
  CONF_UNKNOWN = -2, 
  CONF_INVALID = -1, 
  CONF_OK = 0 
};
CONF_UNKNOWN: 表示配置未知,可能是因为没有进行相关的验证或检查。
CONF_INVALID: 表示配置无效,可能是由于配置值不符合期望的范围或格式。
CONF_OK: 表示配置有效,通过了验证。

这些接口不用全记住,收藏并关注就行,忘了的就来回忆一下!!!记住主要的就行

static Conf * create(ConfType type);
//创建配置对象。
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
//设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
//设置dr_cb属性值。
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
//设置event_cb属性值。
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
//设置用于自动订阅Topic的默认Topic配置。
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
//设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
//设置partitioner_key_pointer_cb属性值。
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
//设置socket_cb属性值。
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
//设置open_cb属性值。
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
//设置rebalance_cb属性值。
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
//设置offset_commit_cb属性值。
Conf::ConfResult get(const std::string &name, std::string &value) const;
//查询单条属性配置值。

2.2 RdKafka::Message


Message表示一条消费或生产的消息,或是事件。 这个可以理解为生产逻辑中的构建消息对象


下面是基于Message对象的接口(有些内容都封装在message里):

std::string errstr() const;
//如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
ErrorCode err() const;
//如果消息是一条错误事件,返回错误代码,否则返回0。
Topic * topic() const;
//返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
std::string topic_name() const;
//返回消息的Topic名称。
int32_t partition() const;
//如果分区可用,返回分区号。
void * payload() const;
//返回消息数据。
size_t len() const;
//返回消息数据的长度。
const std::string * key() const;
//返回字符串类型的消息key。
const void * key_pointer() const;
//返回void类型的消息key。
size_t key_len() const;
//返回消息key的二进制长度。
int64_t offset () const;
//返回消息或错误的位移。
void * msg_opaque() const;
//返回RdKafka::Producer::produce()提供的msg_opaque。
virtual MessageTimestamp timestamp() const = 0;
//返回消息时间戳。
virtual int64_t latency() const = 0;
//返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
virtual struct rd_kafka_message_s *c_ptr () = 0;
//返回底层数据结构的C rd_kafka_message_t句柄。
virtual Status status () const = 0;
//返回消息在Topic Log的持久化状态。
virtual RdKafka::Headers *headers () = 0;
//返回消息头。
virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
//返回消息头,错误信息会输出到err。

主要:


在 RdKafka::Message 中,最主要和常用的成员函数和属性包括:


1.err(): 通过这个函数可以获取消息的错误码,用于检查消息在生产或消费过程中是否发生了错误。


2.len(): 返回消息的长度,表示消息体的字节数。


3.payload(): 提供对消息实际内容(有效负载)的访问。


4.topic_name(): 返回消息所属的主题名称。


5.partition(): 返回消息所在的分区编号。


6.offset(): 返回消息的偏移量,表示消息在分区中的位置。


这些成员函数和属性通常是处理 Kafka 消息时最重要的信息。通过这些信息,你可以检查消息的状态,了解消息的来源和内容,以及在消费者端追踪消息的位置。其他的一些属性,比如 key(),用于获取消息的键,是可选的,取决于消息是否包含键。


2.3 RdKafka::DeliveryReportCb


每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。


为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。


投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


投递报告函数起着以下作用:


  1. 确认消息是否成功发送: 一旦消息被生产者发送到 Kafka 服务器,投递报告函数被调用。这允许你知道消息是否已经成功到达服务器。


  1. 追踪消息传递状态: 投递报告函数提供了有关消息传递状态的信息。通过检查消息的错误码(通过 RdKafka::Message::err() 获取),你可以了解消息是否成功投递到分区,以及可能的错误原因,比如消息发送超时、分区不存在等等。


  1. 确保消息处理: 这个回调函数可以帮助你确保消息得到了处理,无论是成功发送还是出现了一些错误。通过错误码,你可以适当地处理消息发送过程中的问题,例如重试、记录错误日志或者执行其他补救措施。


在整个流程中,投递报告函数是为了提供消息传递的状态和结果。它允许你追踪消息的处理情况,确保消息被成功地发送到了 Kafka 服务器,并且在出现问题时能够及时地得到通知和处理。因此,在实际的生产环境中,及时处理这个回调函数非常重要,以保证消息的可靠传递

virtual void dr_cb(Message &message)=0;

纯虚函数,需要继承来重写


当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。


C++封装示例:

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
  void dr_cb(RdKafka::Message &message)
  {
    if(message.err())
      std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
    else
    {
      // Message delivered to topic test [0] at offset 135000
      std::cerr << "Message delivered to topic " << message.topic_name()
        << " [" << message.partition() << "] at offset "
        << message.offset() << std::endl;
    }
  }
};

2.4 RdKafka::Event


事件对象


是一个用于表示Kafka事件的类,它封装了与事件相关的信息。在你的描述中,列举了事件的不同类型,如EVENT_ERROREVENT_STATSEVENT_LOGEVENT_THROTTLE。每个事件都有相应的属性和方法来获取事件的类型、错误代码、日志信息等。

enum Type{ 
  EVENT_ERROR, //错误条件事件 
  EVENT_STATS, // Json文档统计事件 
  EVENT_LOG, // Log消息事件 
  EVENT_THROTTLE // 来自Broker的throttle级信号事件 
};
virtual Type type() const =0;
//返回事件类型。
virtual ErrorCode err() const =0;
//返回事件错误代码。
virtual Severity severity() const =0;
//返回log严重级别。
virtual std::string fac() const =0;
//返回log基础字符串。
virtual std::string str () const =0;
//返回Log消息字符串。
virtual int throttle_time() const =0;
//返回throttle时间。
virtual std::string broker_name() const =0;
//返回Broker名称。
virtual int broker_id() const =0;
//返回Broker ID。

1.type(): 返回事件的类型。类型包括错误条件事件(EVENT_ERROR)、JSON文档统计事件(EVENT_STATS)、日志消息事件(EVENT_LOG)以及来自Broker的throttle级信号事件(EVENT_THROTTLE)。


2.err(): 返回事件的错误代码,如果事件类型是错误条件事件。


3.severity(): 返回日志消息的严重级别。


4.fac(): 返回日志消息的基础字符串。


5.str(): 返回日志消息的字符串。


6.throttle_time(): 如果事件类型是throttle级信号事件,返回throttle的时间。


7.broker_name(): 返回与事件相关联的Broker的名称。


8.broker_id(): 返回与事件相关联的Broker的ID。


2.5 RdKafka::EventCb


事件回调


一个抽象基类,它定义了一个事件回调函数,用于处理RdKafka::Event


事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

virtual void event_cb(Event &event)=0; //  事件回调函数

纯虚函数,需要继承来重写


C++封装示例:

class ProducerEventCb : public RdKafka::EventCb
{
public:
    void event_cb(RdKafka::Event &event)
    {
        switch(event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
            break;
        case RdKafka::Event::EVENT_THROTTLE:
            std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
            break;
        }
    }
};

这个回调函数的作用是在发生Kafka事件时被调用,将相应的RdKafka::Event对象传递给应用程序。应用程序可以实现自己的RdKafka::EventCb子类,然后在这个子类中实现event_cb方法,以处理具体的事件。这样,当有错误、统计信息、日志或来自Broker的throttle级信号事件发生时,那么逻辑就变成如下


1.配置参数: 你首先配置好生产者的参数,这包括Kafka集群的地址、topic的配置等。


2.创建生产者实例: 使用配置好的参数创建一个生产者实例。


3.准备生产者: 在生产消息之前,你可能需要进行一些准备工作,比如初始化和构建消息对象。


4.生产消息: 将构建好的消息对象交给生产者,让生产者将消息发送到指定的Kafka topic中。


5.处理事件: 这就是上述RdKafka::Event和RdKafka::EventCb的作用了。在生产者的生命周期中,可能会发生一些异步事件,如错误、日志信息等。通过设置RdKafka::EventCb,你可以在相应的事件发生时得到通知,从而执行你自己的处理逻辑。


6.关闭生产者: 如果不再需要生产者,记得关闭它以释放资源。


下面是示例:

class MyEventCallback : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) override {
        // 处理事件的逻辑
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                // 处理错误事件
                break;
            case RdKafka::Event::EVENT_STATS:
                // 处理统计信息事件
                break;
            // 可以处理其他类型的事件
            default:
                break;
        }
    }
};
int main() {
    // 配置生产者参数
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    // 设置事件回调
    MyEventCallback eventCallback;
    conf->set("event_cb", &eventCallback, errstr);
    // 创建生产者实例
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        // 处理生产者创建失败的情况
        return 1;
    }
    // 准备生产者...
    // 生产消息...
    // 处理事件...
    // 关闭生产者
    delete producer;
    delete conf;
    return 0;
}


深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(下):

https://developer.aliyun.com/article/1393801?spm=a2c6h.24874632.expert-profile.10.6af22f31AcelSW


相关文章
|
1月前
|
消息中间件 大数据 Kafka
【Kafka】Kafka 中生产者运行流程
【4月更文挑战第10天】【Kafka】Kafka 中生产者运行流程
|
1月前
|
消息中间件 存储 负载均衡
kafka底层原理分析
kafka底层原理分析
45 2
|
16天前
|
消息中间件 Kafka
Kafka生产者和消费者相关命令行操作
Kafka生产者和消费者相关命令行操作
18 1
|
22天前
|
消息中间件 存储 Kafka
Kafka(二)【文件存储机制 & 生产者】(1)
Kafka(二)【文件存储机制 & 生产者】
|
22天前
|
消息中间件 存储 Kafka
Kafka(二)【文件存储机制 & 生产者】(2)
Kafka(二)【文件存储机制 & 生产者】
|
1天前
|
消息中间件 监控 安全
Kafka客户端工具:Offset Explorer 使用指南
Kafka客户端工具:Offset Explorer 使用指南
4 0
|
4天前
|
消息中间件 存储 大数据
深度分析:Apache Kafka及其在大数据处理中的应用
Apache Kafka是高吞吐、低延迟的分布式流处理平台,常用于实时数据流、日志收集和事件驱动架构。与RabbitMQ(吞吐量有限)、Pulsar(多租户支持但生态系统小)和Amazon Kinesis(托管服务,成本高)对比,Kafka在高吞吐和持久化上有优势。适用场景包括实时处理、数据集成、日志收集和消息传递。选型需考虑吞吐延迟、持久化、协议支持等因素,使用时注意资源配置、数据管理、监控及安全性。
|
1月前
|
消息中间件 存储 Kafka
【Kafka】Replica、Leader 和 Follower 三者的概念分析
【4月更文挑战第11天】【Kafka】Replica、Leader 和 Follower 三者的概念分析
|
1月前
|
消息中间件 缓存 安全
Kafka 的生产者优秀架构设计
Kafka 的生产者优秀架构设计
38 0
|
13天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章