Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(2)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

3、消费者源码


(1)main.cpp


#include <iostream>

#include "kafkaconsumerclient.h"

using namespace std;

int main()

{

   KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("localhost:9092", "test", "1", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END

   if (!KafkaConsumerClient_->Init())

   {

       fprintf(stderr, "kafka server initialize error\n");

       return -1;

   }

   KafkaConsumerClient_->Start(1000);

   return 0;

}


(2)kafkaconsumerclient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaConsumerClient {
public:
    KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
    virtual ~KafkaConsumerClient();
    //初始化
    bool Init();
    //开始获取消息
    void Start(int timeout_ms);
    //停止
    void Stop();
private:
    void Msg_consume(RdKafka::Message* message, void* opaque);
private:
    std::string m_strBrokers;
    std::string m_strTopics;
    std::string m_strGroupid;
    int64_t m_nLastOffset = 0;
    RdKafka::Consumer *m_pKafkaConsumer = NULL;
    RdKafka::Topic    *m_pTopic         = NULL;
    int64_t           m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;
    int32_t           m_nPartition      = 0;
    bool m_bRun = false;
};
#endif // KAFKACONSUMERCLIENT_H

(3)kafkaconsumerclient.cpp

#include "kafkaconsumerclient.h"
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
      m_strTopics(topics),
      m_strGroupid(groupid),
      m_nPartition(nPartition),
      m_nCurrentOffset(offset)
{
}
KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}
bool KafkaConsumerClient::Init() {
    std::string errstr;
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf) {
        std::cerr << "RdKafka create global conf failed" << endl;
        return false;
    }
    /*设置broker list*/
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    /*设置consumer group*/
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    std::string strfetch_num = "10240000";
    /*每次从单个分区中拉取消息的最大尺寸*/
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    /*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer) {
        std::cerr << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;
    /*创建kafka topic的配置*/
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf) {
        std::cerr << "RdKafka create topic conf failed" << endl;
        return false;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    /*
     * Create topic handle.
     */
    m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
    }
    delete tconf;
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
    }
    return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
      break;
    case RdKafka::ERR_NO_ERROR:
      /* Real message */
      std::cout << "Read msg at offset " << message->offset() << std::endl;
      if (message->key()) {
        std::cout << "Key: " << *message->key() << std::endl;
      }
      printf("%.*s\n",
        static_cast<int>(message->len()),
        static_cast<const char *>(message->payload()));
      m_nLastOffset = message->offset();
      break;
    case RdKafka::ERR__PARTITION_EOF:
      /* Last message */
      cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
      //Stop();
      break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
    default:
      /* Errors */
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
  }
}
void KafkaConsumerClient::Start(int timeout_ms){
    RdKafka::Message *msg = NULL;
    m_bRun = true;
    while (m_bRun) {
        msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
        Msg_consume(msg, NULL);
        delete msg;
        m_pKafkaConsumer->poll(0);
    }
    m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
    m_pKafkaConsumer->poll(1000);
    if (m_pTopic) {
        delete m_pTopic;
        m_pTopic = NULL;
    }
    if (m_pKafkaConsumer) {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }
    /*销毁kafka实例*/ //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(5000);
}
void KafkaConsumerClient::Stop()
{
    m_bRun = false;
}

4.注意事项


(1)生产者


建议分区使用int partition = RD_KAFKA_PARTITION_UA;即根据key自动计算分区号


/* Use builtin partitioner to select partition*/

RD_KAFKA_PARTITION_UA,


* \p partition is the target partition, either:

*   - RdKafka::Topic::PARTITION_UA (unassigned) for

*     automatic partitioning using the topic's partitioner function, or


*   - a fixed partition (0..N)


(2)消费者


msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);

virtual Message *consume (Topic *topic, int32_t partition,

                           int timeout_ms) = 0;


看来消费者函数必须指定分区号,那么建议采用多进程的方式,每个进程订阅一个分区。




---


参考文章:


使用librdkafka 封装的C++类


相关文章
|
1月前
|
算法 C++ 容器
C++标准库(速查)总结
C++标准库(速查)总结
67 6
|
1月前
|
存储 算法 C++
C++ STL 初探:打开标准模板库的大门
C++ STL 初探:打开标准模板库的大门
101 10
|
1月前
|
存储 程序员 C++
C++常用基础知识—STL库(2)
C++常用基础知识—STL库(2)
71 5
|
1月前
|
Linux API 开发工具
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
ijkplayer是由B站研发的移动端播放器,基于FFmpeg 3.4,支持Android和iOS。其源码托管于GitHub,截至2024年9月15日,获得了3.24万星标和0.81万分支,尽管已停止更新6年。本文档介绍了如何在Linux环境下编译ijkplayer的so库,以便在较新的开发环境中使用。首先需安装编译工具并调整/tmp分区大小,接着下载并安装Android SDK和NDK,最后下载ijkplayer源码并编译。详细步骤包括环境准备、工具安装及库编译等。更多FFmpeg开发知识可参考相关书籍。
92 0
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
|
1月前
|
存储 自然语言处理 程序员
C++常用基础知识—STL库(1)
C++常用基础知识—STL库(1)
58 1
|
2月前
|
编译器 API C语言
超级好用的C++实用库之跨平台实用方法
超级好用的C++实用库之跨平台实用方法
40 6
|
2月前
|
安全 C++
超级好用的C++实用库之环形内存池
超级好用的C++实用库之环形内存池
48 5
|
2月前
|
缓存 网络协议 Linux
超级好用的C++实用库之套接字
超级好用的C++实用库之套接字
34 1
|
2月前
|
存储 算法 安全
超级好用的C++实用库之sha256算法
超级好用的C++实用库之sha256算法
105 1
|
2月前
|
存储 算法 安全
超级好用的C++实用库之国密sm4算法
超级好用的C++实用库之国密sm4算法
61 0
下一篇
无影云桌面