Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(2)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

3、消费者源码


(1)main.cpp


#include <iostream>

#include "kafkaconsumerclient.h"

using namespace std;

int main()

{

   KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("localhost:9092", "test", "1", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END

   if (!KafkaConsumerClient_->Init())

   {

       fprintf(stderr, "kafka server initialize error\n");

       return -1;

   }

   KafkaConsumerClient_->Start(1000);

   return 0;

}


(2)kafkaconsumerclient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaConsumerClient {
public:
    KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
    virtual ~KafkaConsumerClient();
    //初始化
    bool Init();
    //开始获取消息
    void Start(int timeout_ms);
    //停止
    void Stop();
private:
    void Msg_consume(RdKafka::Message* message, void* opaque);
private:
    std::string m_strBrokers;
    std::string m_strTopics;
    std::string m_strGroupid;
    int64_t m_nLastOffset = 0;
    RdKafka::Consumer *m_pKafkaConsumer = NULL;
    RdKafka::Topic    *m_pTopic         = NULL;
    int64_t           m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;
    int32_t           m_nPartition      = 0;
    bool m_bRun = false;
};
#endif // KAFKACONSUMERCLIENT_H

(3)kafkaconsumerclient.cpp

#include "kafkaconsumerclient.h"
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
      m_strTopics(topics),
      m_strGroupid(groupid),
      m_nPartition(nPartition),
      m_nCurrentOffset(offset)
{
}
KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}
bool KafkaConsumerClient::Init() {
    std::string errstr;
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf) {
        std::cerr << "RdKafka create global conf failed" << endl;
        return false;
    }
    /*设置broker list*/
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    /*设置consumer group*/
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    std::string strfetch_num = "10240000";
    /*每次从单个分区中拉取消息的最大尺寸*/
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    /*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer) {
        std::cerr << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;
    /*创建kafka topic的配置*/
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf) {
        std::cerr << "RdKafka create topic conf failed" << endl;
        return false;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    /*
     * Create topic handle.
     */
    m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
    }
    delete tconf;
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
    }
    return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
      break;
    case RdKafka::ERR_NO_ERROR:
      /* Real message */
      std::cout << "Read msg at offset " << message->offset() << std::endl;
      if (message->key()) {
        std::cout << "Key: " << *message->key() << std::endl;
      }
      printf("%.*s\n",
        static_cast<int>(message->len()),
        static_cast<const char *>(message->payload()));
      m_nLastOffset = message->offset();
      break;
    case RdKafka::ERR__PARTITION_EOF:
      /* Last message */
      cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
      //Stop();
      break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
    default:
      /* Errors */
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
  }
}
void KafkaConsumerClient::Start(int timeout_ms){
    RdKafka::Message *msg = NULL;
    m_bRun = true;
    while (m_bRun) {
        msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
        Msg_consume(msg, NULL);
        delete msg;
        m_pKafkaConsumer->poll(0);
    }
    m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
    m_pKafkaConsumer->poll(1000);
    if (m_pTopic) {
        delete m_pTopic;
        m_pTopic = NULL;
    }
    if (m_pKafkaConsumer) {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }
    /*销毁kafka实例*/ //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(5000);
}
void KafkaConsumerClient::Stop()
{
    m_bRun = false;
}

4.注意事项


(1)生产者


建议分区使用int partition = RD_KAFKA_PARTITION_UA;即根据key自动计算分区号


/* Use builtin partitioner to select partition*/

RD_KAFKA_PARTITION_UA,


* \p partition is the target partition, either:

*   - RdKafka::Topic::PARTITION_UA (unassigned) for

*     automatic partitioning using the topic's partitioner function, or


*   - a fixed partition (0..N)


(2)消费者


msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);

virtual Message *consume (Topic *topic, int32_t partition,

                           int timeout_ms) = 0;


看来消费者函数必须指定分区号,那么建议采用多进程的方式,每个进程订阅一个分区。




---


参考文章:


使用librdkafka 封装的C++类


相关文章
|
10天前
|
Linux 编译器 Android开发
FFmpeg开发笔记(九)Linux交叉编译Android的x265库
在Linux环境下,本文指导如何交叉编译x265的so库以适应Android。首先,需安装cmake和下载android-ndk-r21e。接着,下载x265源码,修改crosscompile.cmake的编译器设置。配置x265源码,使用指定的NDK路径,并在配置界面修改相关选项。随后,修改编译规则,编译并安装x265,调整pc描述文件并更新PKG_CONFIG_PATH。最后,修改FFmpeg配置脚本启用x265支持,编译安装FFmpeg,将生成的so文件导入Android工程,调整gradle配置以确保顺利运行。
32 1
FFmpeg开发笔记(九)Linux交叉编译Android的x265库
|
28天前
|
JSON 机器人 Linux
推荐一款嵌入式Linux开源框架与封装-cpp-tbox
推荐一款嵌入式Linux开源框架与封装-cpp-tbox
54 3
|
2月前
|
算法 Linux 测试技术
Linux C++开发中的代码优化之道:把握时机与策略
Linux C++开发中的代码优化之道:把握时机与策略
49 0
|
20天前
|
存储 算法 Linux
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
【实战项目】网络编程:在Linux环境下基于opencv和socket的人脸识别系统--C++实现
43 6
|
3天前
|
存储 Linux C++
【进厂修炼 - First week】Linux & C++
【进厂修炼 - First week】Linux & C++
|
8天前
|
Linux C++
【代码片段】Linux C++打印当前函数调用堆栈
【代码片段】Linux C++打印当前函数调用堆栈
11 0
|
12天前
|
Linux 网络安全 开发工具
【GitLab私有仓库】在Linux上用Gitlab搭建自己的私有库并配置cpolar内网穿透
【GitLab私有仓库】在Linux上用Gitlab搭建自己的私有库并配置cpolar内网穿透
|
2月前
|
存储 安全 Ubuntu
【Linux 应用开发 】Linux环境下动态链接库路径(RPATH)的调整策略
【Linux 应用开发 】Linux环境下动态链接库路径(RPATH)的调整策略
72 1
|
2月前
|
监控 Linux 编译器
Linux C++ 定时器任务接口深度解析: 从理论到实践
Linux C++ 定时器任务接口深度解析: 从理论到实践
80 2
|
2月前
|
算法 NoSQL Linux
Linux C++环境下避免死锁的全面策略解析与实践指南
Linux C++环境下避免死锁的全面策略解析与实践指南
64 0

热门文章

最新文章