Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(2)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

3、消费者源码


(1)main.cpp


#include <iostream>

#include "kafkaconsumerclient.h"

using namespace std;

int main()

{

   KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("localhost:9092", "test", "1", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END

   if (!KafkaConsumerClient_->Init())

   {

       fprintf(stderr, "kafka server initialize error\n");

       return -1;

   }

   KafkaConsumerClient_->Start(1000);

   return 0;

}


(2)kafkaconsumerclient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaConsumerClient {
public:
    KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
    virtual ~KafkaConsumerClient();
    //初始化
    bool Init();
    //开始获取消息
    void Start(int timeout_ms);
    //停止
    void Stop();
private:
    void Msg_consume(RdKafka::Message* message, void* opaque);
private:
    std::string m_strBrokers;
    std::string m_strTopics;
    std::string m_strGroupid;
    int64_t m_nLastOffset = 0;
    RdKafka::Consumer *m_pKafkaConsumer = NULL;
    RdKafka::Topic    *m_pTopic         = NULL;
    int64_t           m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;
    int32_t           m_nPartition      = 0;
    bool m_bRun = false;
};
#endif // KAFKACONSUMERCLIENT_H

(3)kafkaconsumerclient.cpp

#include "kafkaconsumerclient.h"
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
      m_strTopics(topics),
      m_strGroupid(groupid),
      m_nPartition(nPartition),
      m_nCurrentOffset(offset)
{
}
KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}
bool KafkaConsumerClient::Init() {
    std::string errstr;
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf) {
        std::cerr << "RdKafka create global conf failed" << endl;
        return false;
    }
    /*设置broker list*/
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    /*设置consumer group*/
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    std::string strfetch_num = "10240000";
    /*每次从单个分区中拉取消息的最大尺寸*/
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    /*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer) {
        std::cerr << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;
    /*创建kafka topic的配置*/
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf) {
        std::cerr << "RdKafka create topic conf failed" << endl;
        return false;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    /*
     * Create topic handle.
     */
    m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
    }
    delete tconf;
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
    }
    return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
      break;
    case RdKafka::ERR_NO_ERROR:
      /* Real message */
      std::cout << "Read msg at offset " << message->offset() << std::endl;
      if (message->key()) {
        std::cout << "Key: " << *message->key() << std::endl;
      }
      printf("%.*s\n",
        static_cast<int>(message->len()),
        static_cast<const char *>(message->payload()));
      m_nLastOffset = message->offset();
      break;
    case RdKafka::ERR__PARTITION_EOF:
      /* Last message */
      cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
      //Stop();
      break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
    default:
      /* Errors */
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
  }
}
void KafkaConsumerClient::Start(int timeout_ms){
    RdKafka::Message *msg = NULL;
    m_bRun = true;
    while (m_bRun) {
        msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
        Msg_consume(msg, NULL);
        delete msg;
        m_pKafkaConsumer->poll(0);
    }
    m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
    m_pKafkaConsumer->poll(1000);
    if (m_pTopic) {
        delete m_pTopic;
        m_pTopic = NULL;
    }
    if (m_pKafkaConsumer) {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }
    /*销毁kafka实例*/ //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(5000);
}
void KafkaConsumerClient::Stop()
{
    m_bRun = false;
}

4.注意事项


(1)生产者


建议分区使用int partition = RD_KAFKA_PARTITION_UA;即根据key自动计算分区号


/* Use builtin partitioner to select partition*/

RD_KAFKA_PARTITION_UA,


* \p partition is the target partition, either:

*   - RdKafka::Topic::PARTITION_UA (unassigned) for

*     automatic partitioning using the topic's partitioner function, or


*   - a fixed partition (0..N)


(2)消费者


msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);

virtual Message *consume (Topic *topic, int32_t partition,

                           int timeout_ms) = 0;


看来消费者函数必须指定分区号,那么建议采用多进程的方式,每个进程订阅一个分区。




---


参考文章:


使用librdkafka 封装的C++类


相关文章
|
6月前
|
API C++ Windows
Visual C++运行库、.NET Framework和DirectX运行库的作用及常见问题解决方案,涵盖MSVCP140.dll丢失、0xc000007b错误等典型故障的修复方法
本文介绍Visual C++运行库、.NET Framework和DirectX运行库的作用及常见问题解决方案,涵盖MSVCP140.dll丢失、0xc000007b错误等典型故障的修复方法,提供官方下载链接与系统修复工具使用指南。
1562 2
|
6月前
|
缓存 算法 程序员
C++STL底层原理:探秘标准模板库的内部机制
🌟蒋星熠Jaxonic带你深入STL底层:从容器内存管理到红黑树、哈希表,剖析迭代器、算法与分配器核心机制,揭秘C++标准库的高效设计哲学与性能优化实践。
C++STL底层原理:探秘标准模板库的内部机制
|
6月前
|
Ubuntu API C++
C++标准库、Windows API及Ubuntu API的综合应用
总之,C++标准库、Windows API和Ubuntu API的综合应用是一项挑战性较大的任务,需要开发者具备跨平台编程的深入知识和丰富经验。通过合理的架构设计和有效的工具选择,可以在不同的操作系统平台上高效地开发和部署应用程序。
272 11
|
6月前
|
IDE 编译器 开发工具
msvcp100.dll,msvcp120.dll,msvcp140.dll,Microsoft Visual C++ 2015 Redistributable,Visual C++ 运行库安装
MSVC是Windows下C/C++开发核心工具,集成编译器、链接器与调试器,配合Visual Studio使用。其运行时库(如msvcp140.dll)为程序提供基础函数支持,常因缺失导致软件无法运行。通过安装对应版本的Microsoft Visual C++ Redistributable可解决此类问题,广泛应用于桌面软件、游戏及系统级开发。
845 2
|
7月前
|
并行计算 C++ Windows
|
6月前
|
消息中间件 Kafka Linux
Linux下安装Kafka 3.9.1
本文介绍Kafka 3.9.1版本的安装与配置,包括通过ZooKeeper或KRaft模式启动Kafka。涵盖环境变量设置、日志路径修改、集群UUID生成、存储格式化及服务启停操作,适用于Linux环境下的部署实践。
905 0
|
11月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
416 16
|
10月前
|
Linux 编译器 vr&ar
Linux的动态库与静态库
静态库在编译时直接嵌入到最终的可执行文件中。
220 0
|
存储 网络协议 Linux
【Linux】进程IO|系统调用|open|write|文件描述符fd|封装|理解一切皆文件
本文详细介绍了Linux中的进程IO与系统调用,包括 `open`、`write`、`read`和 `close`函数及其用法,解释了文件描述符(fd)的概念,并深入探讨了Linux中的“一切皆文件”思想。这种设计极大地简化了系统编程,使得处理不同类型的IO设备变得更加一致和简单。通过本文的学习,您应该能够更好地理解和应用Linux中的进程IO操作,提高系统编程的效率和能力。
646 34