1.kafka消费者的逻辑
- 配置消费者客户端参数。
- 创建相应的消费者实例。
- 订阅主题。
- 拉取消息并消费;
- 提交消息位移;
- 关闭消费者实例;
2 Kafka 的C++ API
2.1 RdKafka::Conf
见生成者实现文章。
2.2 RdKafka::Event
见生成者实现文章。
2.3 RdKafka::EventCb
见生成者实现文章。
2.4 RdKafka::TopicPartition
static TopicPartition * create(const std::string &topic, int partition); //创建一个TopicPartition对象。 static TopicPartition *create (const std::string &topic, int partition,int64_t offset); //创建TopicPartition对象。 static void destroy (std::vector<TopicPartition*> &partitions); //销毁所有TopicPartition对象。 const std::string & topic () const; //返回Topic名称。 int partition (); //返回分区号。 int64_t offset(); //返回位移。 void set_offset(int64_t offset); //设置位移。 ErrorCode err(); //返回错误码。
2.5 RdKafka::RebalanceCb
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * > &partitions)=0;
用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数
再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。
C++封装API:
// 定义了一个名为ConsumerRebalanceCb的类,继承自RdKafka::RebalanceCb class ConsumerRebalanceCb : public RdKafka::RebalanceCb { private: // 定义了一个静态方法printTopicPartition,用于打印当前获取的分区 static void printTopicPartition(const std::vector<RdKafka::TopicPartition*>& partitions) { // 循环遍历传入的分区集合,打印每个分区的主题和分区号 for (unsigned int i = 0; i < partitions.size(); i++) std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], "; std::cerr << "\n"; } public: // 定义了一个再平衡回调函数rebalance_cb,处理消费者组再平衡时的事件 void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) { // 打印再平衡回调事件的错误码和涉及的分区信息 std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": "; printTopicPartition(partitions); // 根据再平衡的错误码进行处理 if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { // 如果是分配分区的事件,将分区分配给消费者,并记录分区数量 consumer->assign(partitions); partition_count = (int)partitions.size(); } else { // 如果是分区撤销的事件,取消消费者已分配的所有分区,并将分区数量置为0 consumer->unassign(); partition_count = 0; } } private: int partition_count; // 记录分区数量的私有成员变量 };
2.6 RdKafka::Message
见生成者实现文章。
2.7 RdKafka::KafkaConsumer(核心)
KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。
static KafkaConsumer * create(Conf *conf, std::string &errstr); 创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。 ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions); 返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。 ErrorCode subscription(std::vector< std::string > &topics); 返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。 ErrorCode subscribe(const std::vector< std::string > &topics); 更新订阅Topic分区。 ErrorCode unsubscribe(); 将当前订阅Topic取消订阅分区。 ErrorCode assign(const std::vector< TopicPartition * > &partitions); 将分配分区更新为partitions。 ErrorCode unassign(); 停止消费并删除当前分配的分区。 Message * consume(int timeout_ms); 消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。 如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。 ErrorCode commitSync(); 提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。 ErrorCode commitAsync(); 异步提交位移。 ErrorCode commitSync(Message *message); 基于消息对单个topic+partition对象同步提交位移。 virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0; 对指定多个TopicPartition同步提交位移。 ErrorCode commitAsync(Message *message); 基于消息对单个TopicPartition异步提交位移。 virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0; 对多个TopicPartition异步提交位移。 ErrorCode close(); 正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组) virtual ConsumerGroupMetadata *groupMetadata () = 0; 返回本Consumer实例的Consumer Group的元数据。 ErrorCode position (std::vector<TopicPartition*> &partitions) 获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。 ErrorCode seek (const TopicPartition &partition, int timeout_ms) 定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。 ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) 为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。
3 Kafka 消费者客户端开发
3.1 必要的参数配置(bootstrap.servers)
在创建消费者的时候以下以下三个选项是必选的:
bootstrap.servers:指定 broker (kafka服务器)的地址清单,清单里不需要包含所有的 broker(kafka) 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来
这三个参数在创建Kafka消费者时是必选的,它们分别为:
1.bootstrap.servers:
1.用途:指定 Kafka 集群中的 broker 地址清单。
2.作用:消费者在启动时需要知道至少一个 broker 的地址,以便获取集群中的元数据信息,如主题分区的分布情况、leader 信息等。消费者将会从提供的 broker 中获取这些信息,然后根据负载均衡策略决定从哪个分区拉取消息。
3.注意:建议至少提供两个 broker 的地址,以增加容错性,防止某个 broker 不可用时消费者无法正常工作。
2.group.id:
1.用途:指定消费者所属的消费者组的唯一标识。
2.作用:Kafka 提供了分组机制,即将消费者组织成逻辑上的组,组内的每个消费者协调消费不同的分区,以实现水平扩展和容错性。所有消费者实例在同一组内必须共享相同的 group ID,这样它们就可以协同工作,确保每个分区只有一个消费者消费。
3.注意:组内的消费者将协同处理订阅主题的所有分区。
3.auto.offset.reset:
1.用途:指定消费者在发现没有存储偏移量或偏移量无效的情况下该如何处理。
2.作用:对于一个新的消费者组(group.id是新的)来说,如果没有存储的偏移量信息,或者存储的偏移量无效,该参数就决定了消费者从哪里开始消费消息。可能的取值包括:
1.earliest:从最早的偏移量开始消费。
2.latest:从最新的偏移量开始消费。
3.none:如果没有发现消费者组的偏移量,就抛出异常。
3.注意:这个参数主要用于处理新的消费者组,已有偏移量的消费者组不受此参数影响。
std::string errorStr; RdKafka::Conf::ConfResult errorCode; m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); m_event_cb = new ConsumerEventCb; errorCode = m_config->set("event_cb", m_event_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } m_rebalance_cb = new ConsumerRebalanceCb; errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("enable.partition.eof", "false", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("group.id", m_groupID, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; }
3.2 订阅主题和分区
订阅主题,可以订阅多个。
也可以通过正则表达式方式一次订阅多个主题,比如 “topic-.*”, 则前缀为“topic-.”的主题都被订阅。
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // 获取最新的消息数据 errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr); if (errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Topic Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr); if (errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr); if (m_consumer == NULL) { std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl; } std::cout << "Created consumer " << m_consumer->name() << std::endl; // 订阅主题, 可以订阅多个主题 RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
3.3 消息消费
void msg_consume(RdKafka::Message* msg, void* opaque) { switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时 break; case RdKafka::ERR_NO_ERROR: // 有消息进来 std::cout << " Message in-> topic:" << msg->topic_name() << "partition:[" << msg->partition() << "] at offset " << msg->offset() << " key: " << msg->key() << " payload: " << (char*)msg->payload() << std::endl; break; default: std::cerr << "Consumer error: " << msg->errstr() << std::endl; break; } } void KafkaConsumer::pullMessage() { // 订阅Topic RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector); if (errorCode != RdKafka::ERR_NO_ERROR) { std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl; } // 消费消息 while(true) { RdKafka::Message *msg = m_consumer->consume(1000); msg_consume(msg, NULL); delete msg; } }
3.4 完整示例代码
KafkaConsumer.h
#ifndef KAFKACONSUMER_H #define KAFKACONSUMER_H // 防止头文件多次包含的保护机制 #pragma once // 另一种头文件包含保护的方式 #include <string> #include <iostream> #include <vector> #include <stdio.h> #include "librdkafka/rdkafkacpp.h" // 包含必要的C++标准库和Rdkafka C++库的头文件 class KafkaConsumer { public: // 构造函数 explicit KafkaConsumer(const std::string& brokers, const std::string& groupID, const std::vector<std::string>& topics, int partition); // 拉取消息的函数声明 void pullMessage(); // 析构函数声明 ~KafkaConsumer(); protected: // 成员变量 std::string m_brokers; // Kafka集群的broker地址 std::string m_groupID; // 消费者组的唯一标识符 std::vector<std::string> m_topicVector; // 要消费的主题列表 int m_partition; // 消息分区 RdKafka::Conf* m_config; // Kafka配置 RdKafka::Conf* m_topicConfig; // 主题配置 RdKafka::KafkaConsumer* m_consumer; // Kafka消费者 RdKafka::EventCb* m_event_cb; // 事件回调 RdKafka::RebalanceCb* m_rebalance_cb; // 重新平衡回调 }; #endif // KAFKACONSUMER_H // 结束头文件包含保护
KafkaConsumer.cpp
#include "KafkaConsumer.h" class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb (RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: if (event.fatal()) { std::cerr << "FATAL "; } std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl; break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; class ConsumerRebalanceCb : public RdKafka::RebalanceCb { private: static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) // 打印当前获取的分区 { for (unsigned int i = 0 ; i < partitions.size() ; i++) std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], "; std::cerr << "\n"; } public: void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) { std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": "; printTopicPartition(partitions); if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { consumer->assign(partitions); partition_count = (int)partitions.size(); } else { consumer->unassign(); partition_count = 0; } } private: int partition_count; }; KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID, const std::vector<std::string>& topics, int partition) { m_brokers = brokers; m_groupID = groupID; m_topicVector = topics; m_partition = partition; std::string errorStr; RdKafka::Conf::ConfResult errorCode; m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); m_event_cb = new ConsumerEventCb; errorCode = m_config->set("event_cb", m_event_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } m_rebalance_cb = new ConsumerRebalanceCb; errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("enable.partition.eof", "false", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("group.id", m_groupID, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } // partition.assignment.strategy range,roundrobin m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // 获取最新的消息数据 errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Topic Conf set failed: " << errorStr << std::endl; } errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr); if(errorCode != RdKafka::Conf::CONF_OK) { std::cout << "Conf set failed: " << errorStr << std::endl; } m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr); if(m_consumer == NULL) { std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl; } std::cout << "Created consumer " << m_consumer->name() << std::endl; } void msg_consume(RdKafka::Message* msg, void* opaque) { switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: std::cerr << "Consumer error: " << msg->errstr() << std::endl; // 超时 break; case RdKafka::ERR_NO_ERROR: // 有消息进来 std::cout << " Message in-> topic:" << msg->topic_name() << "partition:[" << msg->partition() << "] at offset " << msg->offset() << " key: " << msg->key() << " payload: " << (char*)msg->payload() << std::endl; break; default: std::cerr << "Consumer error: " << msg->errstr() << std::endl; break; } } void KafkaConsumer::pullMessage() { // 订阅Topic RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector); if (errorCode != RdKafka::ERR_NO_ERROR) { std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl; } // 消费消息 while(true) { RdKafka::Message *msg = m_consumer->consume(1000); msg_consume(msg, NULL); delete msg; } } KafkaConsumer::~KafkaConsumer() { m_consumer->close(); delete m_config; delete m_topicConfig; delete m_consumer; delete m_event_cb; delete m_rebalance_cb; }
class ConsumerEventCb : public RdKafka::EventCb
{
public:
void event_cb (RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
if (event.fatal())
{
std::cerr << "FATAL ";
}
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
这段代码定义了一个名为 ConsumerEventCb 的类,其主要作用是定义了一个回调函数 event_cb,用于处理不同类型的 Kafka 事件。在这个类中,通过继承 RdKafka::EventCb,实现了 Kafka 事件的回调处理。
当 Kafka 消费者使用这个事件回调时,会在特定情况下触发这些事件。事件的触发是由 Kafka broker 和 Kafka consumer 之间的交互而引起的。
当 Kafka broker 发送事件通知时(例如错误、统计、日志或节流等),Kafka consumer 将触发适当类型的事件。这时会调用 event_cb 函数,根据事件的类型执行相应的处理逻辑。
流程大致如下:
1.Kafka Consumer 设置事件回调函数: 在使用 Kafka Consumer 时,可以将 ConsumerEventCb 的实例作为事件回调函数传递给 Kafka Consumer。
2.Kafka Consumer 消费消息: 在 Kafka Consumer 消费消息的过程中,如果发生了特定类型的事件(例如错误、统计、日志或节流等),Kafka Consumer 会根据事件的性质调用相应的 event_cb 函数。
3.根据事件类型执行相应的处理逻辑: 在 event_cb 函数中,通过 switch 语句根据不同类型的事件执行相应的逻辑处理。对于每种事件类型,都有不同的处理方式,例如输出错误信息、统计信息、日志信息或节流信息等。
4.处理完毕后继续消费: 处理完特定类型的事件后,Kafka Consumer 可能会继续消费消息或者执行其他操作,这取决于应用程序的逻辑。
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
static void printTopicPartition (const std::vector&partitions) // 打印当前获取的分区
{
for (unsigned int i = 0 ; i < partitions.size() ; i++)
std::cerr << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector &partitions)
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
consumer->assign(partitions);
partition_count = (int)partitions.size();
}
else
{
consumer->unassign();
partition_count = 0;
}
}
private:
int partition_count;
};
1.定义静态成员函数 printTopicPartition:
1.该函数用于打印当前获取的分区信息。
2.遍历传入的 partitions 向量,输出每个分区的主题(topic)和分区号(partition)。
2.声明 ConsumerRebalanceCb 类:
1.该类继承自 RdKafka::RebalanceCb,表明它是一个用于处理重新分配分区的回调的类。
2.包含一个私有成员函数 printTopicPartition 和一个私有整数成员 partition_count。
3.实现 rebalance_cb 函数:
1.rebalance_cb 函数是一个虚函数,用于处理分区重新分配事件。它接收三个参数:
1.consumer:指向 RdKafka::KafkaConsumer 类型对象的指针,表示触发回调的消费者。
2.err:表示重新分配操作的错误码,使用 RdKafka::ErrorCode 类型。
3.partitions:一个指向 RdKafka::TopicPartition 对象的指针的向量,表示重新分配的分区信息。
2.在函数中,首先打印重新分配的动作和错误码。
3.调用前面定义的 printTopicPartition 函数打印重新分配的分区信息。
4.根据错误码的不同,执行不同的逻辑:
1.如果错误码是 ERR__ASSIGN_PARTITIONS,则表示需要分配分区,调用 consumer->assign(partitions) 将分配的分区应用到消费者,并更新 partition_count。
2.否则,调用 consumer->unassign() 取消分配的分区,将 partition_count 设置为 0。
4.私有成员 partition_count:
1.用于追踪当前分配的分区数量。
KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
const std::vector& topics, int partition)
{
m_brokers = brokers;
m_groupID = groupID;
m_topicVector = topics;
m_partition = partition;//属性赋值给消费者对象成员属性
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);//创建全局配置对象
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);//将消费者事件回调配置到全局配置对象当中
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_rebalance_cb = new ConsumerRebalanceCb;//创建消费自平衡回调
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);//将消费自平衡回调配置到全局配置对象当中
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
//上述都是设置全局配置
// partition.assignment.strategy range,roundrobin
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if(m_consumer == NULL)
{
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
}
1.成员变量初始化:
1.m_brokers、m_groupID、m_topicVector 和 m_partition 被设置为传入的参数值。
2.errorStr 用于存储错误信息,errorCode 用于存储配置设置的结果。
2.创建全局配置对象 m_config:
1.使用 RdKafka::Conf::create 创建一个全局配置对象,用于配置 Kafka Consumer。
2.创建事件回调对象 m_event_cb 的实例,并将其设置为全局配置的事件回调。
3.设置全局配置参数:
1.使用 set 方法设置全局配置参数,例如设置事件回调、禁用分区 EOF(end-of-file)通知、设置消费者组 ID、设置 bootstrap 服务器地址等。
2.如果设置失败,输出错误信息。
4.创建主题配置对象 m_topicConfig:
1.使用 RdKafka::Conf::create 创建一个主题配置对象,用于配置特定主题的消费者行为。
5.设置主题配置参数:
1.使用 set 方法设置主题配置参数,例如设置自动偏移重置为 "latest",表示消费者将从最新的消息开始消费。
2.如果设置失败,输出错误信息。
1.检查主题配置设置是否成功:
1.使用 errorCode 和 errorStr 来检查之前设置主题配置参数的操作是否成功。如果失败,输出错误信息。
2.设置默认主题配置参数:
1.使用 set 方法将之前创建的主题配置对象 m_topicConfig 设置为默认主题配置参数。这样,消费者将使用这个主题配置对象来消费所有的主题。
2.如果设置失败,输出错误信息。
3.创建 Kafka 消费者对象:
1.使用 RdKafka::KafkaConsumer::create 创建 Kafka 消费者对象,并传入之前配置好的全局配置对象 m_config。
2.如果创建失败,输出错误信息。
4.检查消费者对象创建是否成功:
1.使用条件语句检查消费者对象是否成功创建,如果为 NULL,则表示创建失败,输出错误信息。
5.输出消费者对象名称:
1.如果消费者对象成功创建,使用 m_consumer->name() 获取消费者的名称,并输出到控制台。
void KafkaConsumer::pullMessage()
{
// 订阅Topic
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
}
// 消费消息
while(true)
{
RdKafka::Message *msg = m_consumer->consume(1000);
msg_consume(msg, NULL);
delete msg;
}
}
在函数 KafkaConsumer::pullMessage() 中:
订阅 Topic:
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
这里使用了 librdkafka 提供的 subscribe 方法。m_consumer 是一个 Kafka 消费者对象,m_topicVector 是一个包含要订阅的 Topic 名称的向量(可能包含一个或多个 Topic)。
subscribe 方法用于订阅指定的 Topic,使消费者开始接收这些 Topic 中的消息。如果订阅成功,errorCode 将为 RdKafka::ERR_NO_ERROR;否则,将根据发生的错误返回相应的错误码。
消费消息:
while(true) { RdKafka::Message *msg = m_consumer->consume(1000); msg_consume(msg, NULL); delete msg; }
- 1.这里使用了一个无限循环来持续消费消息。
2.m_consumer->consume(1000) 表示从 Kafka 中拉取消息,等待最多 1000 毫秒(1秒)来获取消息。这个方法会阻塞至多指定的超时时间等待消息到达。一旦有消息到达或超时,它将返回一个 RdKafka::Message 指针。
3.msg_consume(msg, NULL) 被调用来处理接收到的消息。这个函数处理了从 Kafka 消费者获取的消息对象,根据消息的内容进行相应的操作。
4.delete msg 释放了消费者获取的消息对象的内存,确保不会造成内存泄漏。
因此,整个过程是:首先通过 subscribe 方法订阅了指定的 Topic,然后通过 consume 方法持续拉取消息,并通过 msg_consume 函数处理接收到的消息。
CMakeLists.txt
cmake_minimum_required(VERSION 2.8) project(KafkaConsumer) set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_COMPILER "g++") set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}") set(CMAKE_INCLUDE_CURRENT_DIR ON) # Kafka头文件路径 include_directories(/usr/local/include/librdkafka) # Kafka库路径 link_directories(/usr/lib64) aux_source_directory(. SOURCE) #add_executable(${PROJECT_NAME} ${SOURCE}) #TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++) ADD_EXECUTABLE(${PROJECT_NAME} main.cpp KafkaConsumer.cpp) TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
编译:
mkdir build cd build cmake .. make
4 位移提交
Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。
这个还是太官方了,我们就举个例子来介绍一下:
假设有一个图书馆的场景:
1.位移数据就像书籍的页码:
当你在阅读一本厚厚的书时,你会记住自己读到的页码,这样下次再打开书时就能从上次停下的地方继续读。
2.提交位移就像你在书上标记页码:
1.当你确定自己已经读完了一页或者几页时,你可能会在书的页边标记一下,表示你已经阅读到这个位置。
3.多个分区就像多本书:
1.如果你同时在阅读多本书,每本书都有自己的页码,你需要分别标记每本书的页码。
4.Consumer 的消费进度就像你的阅读进度:
1.位移提交就是告诉图书馆,你已经读到了每本书的哪一页,这样即使你离开一段时间,再回来时仍能从之前的地方继续阅读。
5.故障重启就像你离开一段时间再回到图书馆:
1.如果你离开了一段时间,但之前标记的页码仍然保存在书上,你可以根据这些页码继续阅读,而不必从头开始。
6.自动提交和手动提交就像自动书签和手动书签:
1.自动提交就像书上自带的自动书签,每隔一段时间系统会自动帮你标记页码。
2.手动提交就像你主动使用书签标记页码,你可以在任何时候选择提交。
7.同步提交和异步提交就像同步和异步的书签标记过程:
1.同步提交就像你标记完一页后,告诉图书馆:“我标记好了,你可以记录了”。
2.异步提交就像你标记完一页后,不急着告诉图书馆,而是稍后再告诉他们。
在 Kafka 的语境下,Consumer 提交位移就是告诉 Kafka,它已经成功消费了某个分区的消息到哪个位置,以便在之后的消费中从正确的位置继续。自动提交是由 Consumer 在后台自动完成,而手动提交是由开发者在代码中显式调用。同步提交会等待提交的确认,而异步提交则允许 Consumer 在提交时继续处理其他任务,不必等待确认。
4.1 自动提交
自动提交默认全部为同步提交。
自动提交相关参数:
4.2 手动提交
手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )。commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试,因为它是异步操作。
手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果。可以利用 commitSync 的自动重试来规避那些瞬时错误,同时不希望程序总处于阻塞状态,影响 TPS。
同时使用 commitSync() 和 commitAsync():
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保Consumer 关闭前能够保存正确的位移数据。
将两者结合后,既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.
1.总结:
1.手动提交位移时,可以选择 commitSync(同步提交)或 commitAsync(异步提交)。
2.commitAsync 不能完全替代 commitSync,因为它在出现问题时不会自动重试,而 commitSync 具有自动重试的机制。
3.通过同时使用 commitSync() 和 commitAsync(),可以实现异步无阻塞式的位移管理,同时确保在 Consumer 关闭前保存正确的位移数据。
2.相关函数说明:
1.commitSync():
1.同步提交,会阻塞当前线程,等待提交的确认。
2.具有自动重试机制,可以处理瞬时错误。
3.适合在程序关闭前执行,以确保最终一致性。
2.commitAsync():
异步提交,不会阻塞当前线程,允许程序继续执行。
不具备自动重试机制,需要开发者手动处理提交失败的情况。
适合常规性、阶段性的手动提交,避免阻塞程序,提高吞吐量(TPS)。
3.综合使用方式:
1.在正常的位移管理过程中,使用 commitAsync() 进行异步提交,以提高程序的响应性和吞吐量。
2.在 Consumer 即将关闭前,通过调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据,保障最终一致性。
4.3 提交API
ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。
ErrorCode commitAsync();
异步提交位移
深入浅出分析kafka客户端程序设计 ----- 消费者篇----万字总结(下)
:https://developer.aliyun.com/article/1393837?spm=a2c6h.24874632.expert-profile.8.6af22f31AcelSW