前面在深入理解kafka中提到的只是理论上的设计原理,
本篇讲得是基于c语言的kafka库的程序编写!!!!!
首先要编写生产者的代码,得先知道生产者的逻辑在代码上是怎么体现的
1.kafka生产者的逻辑
怎么理解呢?
我们在实例化生产者对象之前的话,肯定是要对一些参数进行配置,
比如下面介绍的conf这些
那么 配置完参数之后,就是创建生产者实例,那么实例化生产者之后,就是准备生产者
生产消息,那么我们在生产者生产消息的时候,肯定要初始化和构建消息对象发过去
因为用对象的方式去管理消息更容易拓展和后期进行维护和管理以及消费者读取消息也
不容易出错,那么构建完消息对象之后,那么就需要将消息对象交给生产者,让生产者
生产到指定的kafka的topic中的消息队列(也就是topic中的partition分区中,因为每个
分区都是独立的队列),生产到消息队列就是发送消息,到了消息队列就等消费者进行消费了,
如果不需要生产者了,那么就可以关闭该生产者了
配置参数:
在实例化生产者对象之前,你需要配置生产者的参数。这一般通过创建一个 RdKafka::Conf 对象,并使用 set 方法为其设置各种配置选项。这些配置选项可以包括 Kafka 服务器的地址、消息传递语义(例如,至少一次交付、精确一次交付等)、序列化器、分区器等。这个 Conf 对象可以在实例化生产者时传递给构造函数。
示例:
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("bootstrap.servers", "your_kafka_broker"); // 设置其他配置项... RdKafka::Producer::create(conf);
实例化生产者:
- 利用配置好的参数,创建 Kafka 生产者实例。这是通过调用
RdKafka::Producer::create()
函数实现的。传递配置对象作为参数,确保生产者在创建时拥有所需的配置。
构建消息对象:
- 在生产者准备好之后,你可以构建消息对象。这通常包括指定消息的主题、键、内容等信息。
RdKafka::Message
类提供了相应的接口来设置这些消息属性。
RdKafka::Producer *producer = RdKafka::Producer::create(conf); RdKafka::Message *msg = RdKafka::Message::create(); msg->set_payload("Your message payload"); msg->set_topic("your_topic"); // 设置其他消息属性... // 生产者会在生产消息时拥有这个消息对象 producer->produce(msg);
生产消息:
- 调用生产者的
produce
方法发送消息到 Kafka 集群。在这一步,消息将被放入生产者内部的缓冲区,然后异步发送到 Kafka 服务器。produce
方法会返回一个错误码,你可以通过检查这个错误码来了解消息发送的状态。
轮询:
- 为了确保消息的投递报告(
RdKafka::DeliveryReportCb
)回调被调用,你需要定期调用RdKafka::poll()
。这个操作通常在一个独立的线程中完成,以确保消息报告的及时处理。
producer->poll(0); // 参数表示非阻塞 poll
投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。
关闭生产者:
当生产者不再需要时,通过调用 delete 释放资源。在释放资源之前,你可能需要调用 flush 确保所有挂起的消息都已经被发送。
producer->flush(10000); // 等待最多 10 秒钟 delete producer;
2.kafka的C++API
2.1 RdKafka::Conf 可以理解为上诉逻辑中的配置客户端参数
enum ConfType{ CONF_GLOBAL, // 全局配置 CONF_TOPIC // Topic配置 }; enum ConfResult{ CONF_UNKNOWN = -2, CONF_INVALID = -1, CONF_OK = 0 }; CONF_UNKNOWN: 表示配置未知,可能是因为没有进行相关的验证或检查。 CONF_INVALID: 表示配置无效,可能是由于配置值不符合期望的范围或格式。 CONF_OK: 表示配置有效,通过了验证。
这些接口不用全记住,收藏并关注就行,忘了的就来回忆一下!!!记住主要的就行
static Conf * create(ConfType type); //创建配置对象。 Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr); //设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。 Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr); //设置dr_cb属性值。 Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr); //设置event_cb属性值。 Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr); //设置用于自动订阅Topic的默认Topic配置。 Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr); //设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。 Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr); //设置partitioner_key_pointer_cb属性值。 Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr); //设置socket_cb属性值。 Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr); //设置open_cb属性值。 Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr); //设置rebalance_cb属性值。 Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr); //设置offset_commit_cb属性值。 Conf::ConfResult get(const std::string &name, std::string &value) const; //查询单条属性配置值。
2.2 RdKafka::Message
Message表示一条消费或生产的消息,或是事件。 这个可以理解为生产逻辑中的构建消息对象
下面是基于Message对象的接口(有些内容都封装在message里):
std::string errstr() const; //如果消息是一条错误事件,返回错误字符串,否则返回空字符串。 ErrorCode err() const; //如果消息是一条错误事件,返回错误代码,否则返回0。 Topic * topic() const; //返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。 std::string topic_name() const; //返回消息的Topic名称。 int32_t partition() const; //如果分区可用,返回分区号。 void * payload() const; //返回消息数据。 size_t len() const; //返回消息数据的长度。 const std::string * key() const; //返回字符串类型的消息key。 const void * key_pointer() const; //返回void类型的消息key。 size_t key_len() const; //返回消息key的二进制长度。 int64_t offset () const; //返回消息或错误的位移。 void * msg_opaque() const; //返回RdKafka::Producer::produce()提供的msg_opaque。 virtual MessageTimestamp timestamp() const = 0; //返回消息时间戳。 virtual int64_t latency() const = 0; //返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。 virtual struct rd_kafka_message_s *c_ptr () = 0; //返回底层数据结构的C rd_kafka_message_t句柄。 virtual Status status () const = 0; //返回消息在Topic Log的持久化状态。 virtual RdKafka::Headers *headers () = 0; //返回消息头。 virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; //返回消息头,错误信息会输出到err。
主要:
在 RdKafka::Message 中,最主要和常用的成员函数和属性包括:
1.err(): 通过这个函数可以获取消息的错误码,用于检查消息在生产或消费过程中是否发生了错误。
2.len(): 返回消息的长度,表示消息体的字节数。
3.payload(): 提供对消息实际内容(有效负载)的访问。
4.topic_name(): 返回消息所属的主题名称。
5.partition(): 返回消息所在的分区编号。
6.offset(): 返回消息的偏移量,表示消息在分区中的位置。
这些成员函数和属性通常是处理 Kafka 消息时最重要的信息。通过这些信息,你可以检查消息的状态,了解消息的来源和内容,以及在消费者端追踪消息的位置。其他的一些属性,比如 key(),用于获取消息的键,是可选的,取决于消息是否包含键。
2.3 RdKafka::DeliveryReportCb
每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。
为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。
投递报告函数(RdKafka::DeliveryReportCb)在 Kafka 生产者发送消息后,用于接收有关消息传递结果的回调通知。它的主要作用是确保消息是否成功投递到 Kafka 服务器以及最终的处理结果。
投递报告函数起着以下作用:
- 确认消息是否成功发送: 一旦消息被生产者发送到 Kafka 服务器,投递报告函数被调用。这允许你知道消息是否已经成功到达服务器。
- 追踪消息传递状态: 投递报告函数提供了有关消息传递状态的信息。通过检查消息的错误码(通过 RdKafka::Message::err() 获取),你可以了解消息是否成功投递到分区,以及可能的错误原因,比如消息发送超时、分区不存在等等。
- 确保消息处理: 这个回调函数可以帮助你确保消息得到了处理,无论是成功发送还是出现了一些错误。通过错误码,你可以适当地处理消息发送过程中的问题,例如重试、记录错误日志或者执行其他补救措施。
在整个流程中,投递报告函数是为了提供消息传递的状态和结果。它允许你追踪消息的处理情况,确保消息被成功地发送到了 Kafka 服务器,并且在出现问题时能够及时地得到通知和处理。因此,在实际的生产环境中,及时处理这个回调函数非常重要,以保证消息的可靠传递
virtual void dr_cb(Message &message)=0;
纯虚函数,需要继承来重写
当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。
C++封装示例:
class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message &message) { if(message.err()) std::cerr << "Message delivery failed: " << message.errstr() << std::endl; else { // Message delivered to topic test [0] at offset 135000 std::cerr << "Message delivered to topic " << message.topic_name() << " [" << message.partition() << "] at offset " << message.offset() << std::endl; } } };
2.4 RdKafka::Event
事件对象
是一个用于表示Kafka事件的类,它封装了与事件相关的信息。在你的描述中,列举了事件的不同类型,如EVENT_ERROR
、EVENT_STATS
、EVENT_LOG
和EVENT_THROTTLE
。每个事件都有相应的属性和方法来获取事件的类型、错误代码、日志信息等。
enum Type{ EVENT_ERROR, //错误条件事件 EVENT_STATS, // Json文档统计事件 EVENT_LOG, // Log消息事件 EVENT_THROTTLE // 来自Broker的throttle级信号事件 };
virtual Type type() const =0; //返回事件类型。 virtual ErrorCode err() const =0; //返回事件错误代码。 virtual Severity severity() const =0; //返回log严重级别。 virtual std::string fac() const =0; //返回log基础字符串。 virtual std::string str () const =0; //返回Log消息字符串。 virtual int throttle_time() const =0; //返回throttle时间。 virtual std::string broker_name() const =0; //返回Broker名称。 virtual int broker_id() const =0; //返回Broker ID。
1.type(): 返回事件的类型。类型包括错误条件事件(EVENT_ERROR)、JSON文档统计事件(EVENT_STATS)、日志消息事件(EVENT_LOG)以及来自Broker的throttle级信号事件(EVENT_THROTTLE)。
2.err(): 返回事件的错误代码,如果事件类型是错误条件事件。
3.severity(): 返回日志消息的严重级别。
4.fac(): 返回日志消息的基础字符串。
5.str(): 返回日志消息的字符串。
6.throttle_time(): 如果事件类型是throttle级信号事件,返回throttle的时间。
7.broker_name(): 返回与事件相关联的Broker的名称。
8.broker_id(): 返回与事件相关联的Broker的ID。
2.5 RdKafka::EventCb
事件回调
一个抽象基类,它定义了一个事件回调函数,用于处理RdKafka::Event
。
事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。
virtual void event_cb(Event &event)=0; // 事件回调函数
纯虚函数,需要继承来重写
C++封装示例:
class ProducerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch(event.type()) { case RdKafka::Event::EVENT_ERROR: std::cout << "RdKafka::Event::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cout << "RdKafka::Event::EVENT_STATS: " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: std::cout << "RdKafka::Event::EVENT_LOG " << event.fac() << std::endl; break; case RdKafka::Event::EVENT_THROTTLE: std::cout << "RdKafka::Event::EVENT_THROTTLE " << event.broker_name() << std::endl; break; } } };
这个回调函数的作用是在发生Kafka事件时被调用,将相应的RdKafka::Event对象传递给应用程序。应用程序可以实现自己的RdKafka::EventCb子类,然后在这个子类中实现event_cb方法,以处理具体的事件。这样,当有错误、统计信息、日志或来自Broker的throttle级信号事件发生时,那么逻辑就变成如下
1.配置参数: 你首先配置好生产者的参数,这包括Kafka集群的地址、topic的配置等。
2.创建生产者实例: 使用配置好的参数创建一个生产者实例。
3.准备生产者: 在生产消息之前,你可能需要进行一些准备工作,比如初始化和构建消息对象。
4.生产消息: 将构建好的消息对象交给生产者,让生产者将消息发送到指定的Kafka topic中。
5.处理事件: 这就是上述RdKafka::Event和RdKafka::EventCb的作用了。在生产者的生命周期中,可能会发生一些异步事件,如错误、日志信息等。通过设置RdKafka::EventCb,你可以在相应的事件发生时得到通知,从而执行你自己的处理逻辑。
6.关闭生产者: 如果不再需要生产者,记得关闭它以释放资源。
下面是示例:
class MyEventCallback : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) override { // 处理事件的逻辑 switch (event.type()) { case RdKafka::Event::EVENT_ERROR: // 处理错误事件 break; case RdKafka::Event::EVENT_STATS: // 处理统计信息事件 break; // 可以处理其他类型的事件 default: break; } } }; int main() { // 配置生产者参数 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // 设置事件回调 MyEventCallback eventCallback; conf->set("event_cb", &eventCallback, errstr); // 创建生产者实例 RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { // 处理生产者创建失败的情况 return 1; } // 准备生产者... // 生产消息... // 处理事件... // 关闭生产者 delete producer; delete conf; return 0; }
深入浅出分析kafka客户端程序设计 ----- 生产者篇----万字总结(下):
https://developer.aliyun.com/article/1393801?spm=a2c6h.24874632.expert-profile.10.6af22f31AcelSW