3、消费者源码
(1)main.cpp
#include <iostream>
#include "kafkaconsumerclient.h"
using namespace std;
int main()
{
KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("localhost:9092", "test", "1", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END
if (!KafkaConsumerClient_->Init())
{
fprintf(stderr, "kafka server initialize error\n");
return -1;
}
KafkaConsumerClient_->Start(1000);
return 0;
}
(2)kafkaconsumerclient.h
#ifndef KAFKACONSUMERCLIENT_H #define KAFKACONSUMERCLIENT_H #include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <list> #include <librdkafka/rdkafkacpp.h> #include <vector> #include <fstream> using std::string; using std::list; using std::cout; using std::endl; using std::vector; using std::fstream; class KafkaConsumerClient { public: KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0); virtual ~KafkaConsumerClient(); //初始化 bool Init(); //开始获取消息 void Start(int timeout_ms); //停止 void Stop(); private: void Msg_consume(RdKafka::Message* message, void* opaque); private: std::string m_strBrokers; std::string m_strTopics; std::string m_strGroupid; int64_t m_nLastOffset = 0; RdKafka::Consumer *m_pKafkaConsumer = NULL; RdKafka::Topic *m_pTopic = NULL; int64_t m_nCurrentOffset = RdKafka::Topic::OFFSET_BEGINNING; int32_t m_nPartition = 0; bool m_bRun = false; }; #endif // KAFKACONSUMERCLIENT_H
(3)kafkaconsumerclient.cpp
#include "kafkaconsumerclient.h" KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/) :m_strBrokers(brokers), m_strTopics(topics), m_strGroupid(groupid), m_nPartition(nPartition), m_nCurrentOffset(offset) { } KafkaConsumerClient::~KafkaConsumerClient() { Stop(); } bool KafkaConsumerClient::Init() { std::string errstr; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); if (!conf) { std::cerr << "RdKafka create global conf failed" << endl; return false; } /*设置broker list*/ if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) { std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl; } /*设置consumer group*/ if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) { std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl; } std::string strfetch_num = "10240000"; /*每次从单个分区中拉取消息的最大尺寸*/ if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){ std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl; } /*创建kafka consumer实例*/ //Create consumer using accumulated global configuration. m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr); if (!m_pKafkaConsumer) { std::cerr << "failed to ceate consumer" << endl; } std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl; delete conf; /*创建kafka topic的配置*/ RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); if (!tconf) { std::cerr << "RdKafka create topic conf failed" << endl; return false; } if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) { std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl; } /* * Create topic handle. */ m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr); if (!m_pTopic) { std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl; } delete tconf; /* * Start consumer for topic+partition at start offset */ RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset); if (resp != RdKafka::ERR_NO_ERROR) { std::cerr << "failed to start consumer : " << errstr.c_str() << endl; } return true; } void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) { switch (message->err()) { case RdKafka::ERR__TIMED_OUT: break; case RdKafka::ERR_NO_ERROR: /* Real message */ std::cout << "Read msg at offset " << message->offset() << std::endl; if (message->key()) { std::cout << "Key: " << *message->key() << std::endl; } printf("%.*s\n", static_cast<int>(message->len()), static_cast<const char *>(message->payload())); m_nLastOffset = message->offset(); break; case RdKafka::ERR__PARTITION_EOF: /* Last message */ cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl; //Stop(); break; case RdKafka::ERR__UNKNOWN_TOPIC: case RdKafka::ERR__UNKNOWN_PARTITION: std::cerr << "Consume failed: " << message->errstr() << std::endl; Stop(); break; default: /* Errors */ std::cerr << "Consume failed: " << message->errstr() << std::endl; Stop(); break; } } void KafkaConsumerClient::Start(int timeout_ms){ RdKafka::Message *msg = NULL; m_bRun = true; while (m_bRun) { msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms); Msg_consume(msg, NULL); delete msg; m_pKafkaConsumer->poll(0); } m_pKafkaConsumer->stop(m_pTopic, m_nPartition); m_pKafkaConsumer->poll(1000); if (m_pTopic) { delete m_pTopic; m_pTopic = NULL; } if (m_pKafkaConsumer) { delete m_pKafkaConsumer; m_pKafkaConsumer = NULL; } /*销毁kafka实例*/ //Wait for RdKafka to decommission. RdKafka::wait_destroyed(5000); } void KafkaConsumerClient::Stop() { m_bRun = false; }
4.注意事项
(1)生产者
建议分区使用int partition = RD_KAFKA_PARTITION_UA;即根据key自动计算分区号
/* Use builtin partitioner to select partition*/
RD_KAFKA_PARTITION_UA,
* \p partition is the target partition, either:
* - RdKafka::Topic::PARTITION_UA (unassigned) for
* automatic partitioning using the topic's partitioner function, or
* - a fixed partition (0..N)
(2)消费者
msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
virtual Message *consume (Topic *topic, int32_t partition,
int timeout_ms) = 0;
看来消费者函数必须指定分区号,那么建议采用多进程的方式,每个进程订阅一个分区。
---
参考文章: