深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(上)

前面在深入理解kafka中提到的只是理论上的设计原理,


本篇讲得是基于c语言的kafka库的程序编写!!!!!


首先要编写生产者的代码,得先知道生产者的逻辑在代码上是怎么体现的


1.kafka生产者的逻辑

4.png

怎么理解呢?


我们在实例化生产者对象之前的话,肯定是要对一些参数进行配置,


比如下面介绍的conf这些


那么 配置完参数之后,就是创建生产者实例,那么实例化生产者之后,就是准备生产者


生产消息,那么我们在生产者生产消息的时候,肯定要初始化和构建消息对象发过去


因为用对象的方式去管理消息更容易拓展和后期进行维护和管理以及消费者读取消息也


不容易出错,那么构建完消息对象之后,那么就需要将消息对象交给生产者,让生产者


生产到指定的kafka的topic中的消息队列(也就是topic中的partition分区中,因为每个


分区都是独立的队列),生产到消息队列就是发送消息,到了消息队列就等消费者进行消费了,


如果不需要生产者了,那么就可以关闭该生产者了


配置参数:


在实例化生产者对象之前,你需要配置生产者的参数。这一般通过创建一个 RdKafka::Conf 对象,并使用 set 方法为其设置各种配置选项。这些配置选项可以包括 Kafka 服务器的地址、消息传递语义(例如,至少一次交付、精确一次交付等)、序列化器、分区器等。这个 Conf 对象可以在实例化生产者时传递给构造函数。


示例:

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "your_kafka_broker");
// 设置其他配置项...
RdKafka::Producer::create(conf);

实例化生产者:


  • 利用配置好的参数,创建 Kafka 生产者实例。这是通过调用 RdKafka::Producer::create() 函数实现的。传递配置对象作为参数,确保生产者在创建时拥有所需的配置。


构建消息对象:


  • 在生产者准备好之后,你可以构建消息对象。这通常包括指定消息的主题、键、内容等信息。RdKafka::Message类提供了相应的接口来设置这些消息属性。
RdKafka::Producer *producer = RdKafka::Producer::create(conf);
RdKafka::Message *msg = RdKafka::Message::create();
msg->set_payload("Your message payload");
msg->set_topic("your_topic");
// 设置其他消息属性...
// 生产者会在生产消息时拥有这个消息对象
producer->produce(msg);

生产消息:


  • 调用生产者的produce 方法发送消息到 Kafka 集群。在这一步,消息将被放入生产者内部的缓冲区,然后异步发送到 Kafka 服务器。produce 方法会返回一个错误码,你可以通过检查这个错误码来了解消息发送的状态。


轮询:


  • 为了确保消息的投递报告RdKafka::DeliveryReportCb回调被调用,你需要定期调用 RdKafka::poll()。这个操作通常在一个独立的线程中完成,以确保消息报告的及时处理。
producer->poll(0);  // 参数表示非阻塞 poll

投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


关闭生产者:


当生产者不再需要时,通过调用 delete 释放资源。在释放资源之前,你可能需要调用 flush 确保所有挂起的消息都已经被发送。

producer->flush(10000);  // 等待最多 10 秒钟
delete producer;

2.kafka的C++API


2.1 RdKafka::Conf   可以理解为上诉逻辑中的配置客户端参数

enum ConfType{ 
  CONF_GLOBAL,  // 全局配置 
  CONF_TOPIC    // Topic配置 
};
enum ConfResult{ 
  CONF_UNKNOWN = -2, 
  CONF_INVALID = -1, 
  CONF_OK = 0 
};
CONF_UNKNOWN: 表示配置未知,可能是因为没有进行相关的验证或检查。
CONF_INVALID: 表示配置无效,可能是由于配置值不符合期望的范围或格式。
CONF_OK: 表示配置有效,通过了验证。

这些接口不用全记住,收藏并关注就行,忘了的就来回忆一下!!!记住主要的就行

static Conf * create(ConfType type);
//创建配置对象。
Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
//设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
//设置dr_cb属性值。
Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
//设置event_cb属性值。
Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr);
//设置用于自动订阅Topic的默认Topic配置。
Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
//设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr);
//设置partitioner_key_pointer_cb属性值。
Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr);
//设置socket_cb属性值。
Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr);
//设置open_cb属性值。
Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr);
//设置rebalance_cb属性值。
Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr);
//设置offset_commit_cb属性值。
Conf::ConfResult get(const std::string &name, std::string &value) const;
//查询单条属性配置值。

2.2 RdKafka::Message


Message表示一条消费或生产的消息,或是事件。 这个可以理解为生产逻辑中的构建消息对象


下面是基于Message对象的接口(有些内容都封装在message里):

std::string errstr() const;
//如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
ErrorCode err() const;
//如果消息是一条错误事件,返回错误代码,否则返回0。
Topic * topic() const;
//返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
std::string topic_name() const;
//返回消息的Topic名称。
int32_t partition() const;
//如果分区可用,返回分区号。
void * payload() const;
//返回消息数据。
size_t len() const;
//返回消息数据的长度。
const std::string * key() const;
//返回字符串类型的消息key。
const void * key_pointer() const;
//返回void类型的消息key。
size_t key_len() const;
//返回消息key的二进制长度。
int64_t offset () const;
//返回消息或错误的位移。
void * msg_opaque() const;
//返回RdKafka::Producer::produce()提供的msg_opaque。
virtual MessageTimestamp timestamp() const = 0;
//返回消息时间戳。
virtual int64_t latency() const = 0;
//返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
virtual struct rd_kafka_message_s *c_ptr () = 0;
//返回底层数据结构的C rd_kafka_message_t句柄。
virtual Status status () const = 0;
//返回消息在Topic Log的持久化状态。
virtual RdKafka::Headers *headers () = 0;
//返回消息头。
virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0;
//返回消息头,错误信息会输出到err。

主要:


在 RdKafka::Message 中,最主要和常用的成员函数和属性包括:


1.err(): 通过这个函数可以获取消息的错误码,用于检查消息在生产或消费过程中是否发生了错误。


2.len(): 返回消息的长度,表示消息体的字节数。


3.payload(): 提供对消息实际内容(有效负载)的访问。


4.topic_name(): 返回消息所属的主题名称。


5.partition(): 返回消息所在的分区编号。


6.offset(): 返回消息的偏移量,表示消息在分区中的位置。


这些成员函数和属性通常是处理 Kafka 消息时最重要的信息。通过这些信息,你可以检查消息的状态,了解消息的来源和内容,以及在消费者端追踪消息的位置。其他的一些属性,比如 key(),用于获取消息的键,是可选的,取决于消息是否包含键。


2.3 RdKafka::DeliveryReportCb


每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。


为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。


投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。


投递报告函数起着以下作用:


  1. 确认消息是否成功发送: 一旦消息被生产者发送到 Kafka 服务器,投递报告函数被调用。这允许你知道消息是否已经成功到达服务器。


  1. 追踪消息传递状态: 投递报告函数提供了有关消息传递状态的信息。通过检查消息的错误码(通过 RdKafka::Message::err() 获取),你可以了解消息是否成功投递到分区,以及可能的错误原因,比如消息发送超时、分区不存在等等。


  1. 确保消息处理: 这个回调函数可以帮助你确保消息得到了处理,无论是成功发送还是出现了一些错误。通过错误码,你可以适当地处理消息发送过程中的问题,例如重试、记录错误日志或者执行其他补救措施。


在整个流程中,投递报告函数是为了提供消息传递的状态和结果。它允许你追踪消息的处理情况,确保消息被成功地发送到了 Kafka 服务器,并且在出现问题时能够及时地得到通知和处理。因此,在实际的生产环境中,及时处理这个回调函数非常重要,以保证消息的可靠传递

virtual void dr_cb(Message &message)=0;

纯虚函数,需要继承来重写


当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。


C++封装示例:

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
  void dr_cb(RdKafka::Message &message)
  {
    if(message.err())
      std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
    else
    {
      // Message delivered to topic test [0] at offset 135000
      std::cerr << "Message delivered to topic " << message.topic_name()
        << " [" << message.partition() << "] at offset "
        << message.offset() << std::endl;
    }
  }
};

2.4 RdKafka::Event


事件对象


是一个用于表示Kafka事件的类,它封装了与事件相关的信息。在你的描述中,列举了事件的不同类型,如EVENT_ERROREVENT_STATSEVENT_LOGEVENT_THROTTLE。每个事件都有相应的属性和方法来获取事件的类型、错误代码、日志信息等。

enum Type{ 
  EVENT_ERROR, //错误条件事件 
  EVENT_STATS, // Json文档统计事件 
  EVENT_LOG, // Log消息事件 
  EVENT_THROTTLE // 来自Broker的throttle级信号事件 
};
virtual Type type() const =0;
//返回事件类型。
virtual ErrorCode err() const =0;
//返回事件错误代码。
virtual Severity severity() const =0;
//返回log严重级别。
virtual std::string fac() const =0;
//返回log基础字符串。
virtual std::string str () const =0;
//返回Log消息字符串。
virtual int throttle_time() const =0;
//返回throttle时间。
virtual std::string broker_name() const =0;
//返回Broker名称。
virtual int broker_id() const =0;
//返回Broker ID。

1.type(): 返回事件的类型。类型包括错误条件事件(EVENT_ERROR)、JSON文档统计事件(EVENT_STATS)、日志消息事件(EVENT_LOG)以及来自Broker的throttle级信号事件(EVENT_THROTTLE)。


2.err(): 返回事件的错误代码,如果事件类型是错误条件事件。


3.severity(): 返回日志消息的严重级别。


4.fac(): 返回日志消息的基础字符串。


5.str(): 返回日志消息的字符串。


6.throttle_time(): 如果事件类型是throttle级信号事件,返回throttle的时间。


7.broker_name(): 返回与事件相关联的Broker的名称。


8.broker_id(): 返回与事件相关联的Broker的ID。


2.5 RdKafka::EventCb


事件回调


一个抽象基类,它定义了一个事件回调函数,用于处理RdKafka::Event


事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。

virtual void event_cb(Event &event)=0; //  事件回调函数

纯虚函数,需要继承来重写


C++封装示例:

class ProducerEventCb : public RdKafka::EventCb
{
public:
    void event_cb(RdKafka::Event &event)
    {
        switch(event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl;
            break;
        case RdKafka::Event::EVENT_THROTTLE:
            std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl;
            break;
        }
    }
};

这个回调函数的作用是在发生Kafka事件时被调用,将相应的RdKafka::Event对象传递给应用程序。应用程序可以实现自己的RdKafka::EventCb子类,然后在这个子类中实现event_cb方法,以处理具体的事件。这样,当有错误、统计信息、日志或来自Broker的throttle级信号事件发生时,那么逻辑就变成如下


1.配置参数: 你首先配置好生产者的参数,这包括Kafka集群的地址、topic的配置等。


2.创建生产者实例: 使用配置好的参数创建一个生产者实例。


3.准备生产者: 在生产消息之前,你可能需要进行一些准备工作,比如初始化和构建消息对象。


4.生产消息: 将构建好的消息对象交给生产者,让生产者将消息发送到指定的Kafka topic中。


5.处理事件: 这就是上述RdKafka::Event和RdKafka::EventCb的作用了。在生产者的生命周期中,可能会发生一些异步事件,如错误、日志信息等。通过设置RdKafka::EventCb,你可以在相应的事件发生时得到通知,从而执行你自己的处理逻辑。


6.关闭生产者: 如果不再需要生产者,记得关闭它以释放资源。


下面是示例:

class MyEventCallback : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) override {
        // 处理事件的逻辑
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                // 处理错误事件
                break;
            case RdKafka::Event::EVENT_STATS:
                // 处理统计信息事件
                break;
            // 可以处理其他类型的事件
            default:
                break;
        }
    }
};
int main() {
    // 配置生产者参数
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    // 设置事件回调
    MyEventCallback eventCallback;
    conf->set("event_cb", &eventCallback, errstr);
    // 创建生产者实例
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        // 处理生产者创建失败的情况
        return 1;
    }
    // 准备生产者...
    // 生产消息...
    // 处理事件...
    // 关闭生产者
    delete producer;
    delete conf;
    return 0;
}


深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(下):

https://developer.aliyun.com/article/1393801?spm=a2c6h.24874632.expert-profile.10.6af22f31AcelSW


相关文章
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
507 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
224 12
|
6月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
6月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
313 16
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
682 5
|
9月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
232 61
|
8月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
407 10
|
9月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
331 5
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
468 1
下一篇
oss云网关配置