Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(2)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

3、消费者源码


(1)main.cpp


#include <iostream>

#include "kafkaconsumerclient.h"

using namespace std;

int main()

{

   KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("localhost:9092", "test", "1", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_END

   if (!KafkaConsumerClient_->Init())

   {

       fprintf(stderr, "kafka server initialize error\n");

       return -1;

   }

   KafkaConsumerClient_->Start(1000);

   return 0;

}


(2)kafkaconsumerclient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaConsumerClient {
public:
    KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);
    virtual ~KafkaConsumerClient();
    //初始化
    bool Init();
    //开始获取消息
    void Start(int timeout_ms);
    //停止
    void Stop();
private:
    void Msg_consume(RdKafka::Message* message, void* opaque);
private:
    std::string m_strBrokers;
    std::string m_strTopics;
    std::string m_strGroupid;
    int64_t m_nLastOffset = 0;
    RdKafka::Consumer *m_pKafkaConsumer = NULL;
    RdKafka::Topic    *m_pTopic         = NULL;
    int64_t           m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;
    int32_t           m_nPartition      = 0;
    bool m_bRun = false;
};
#endif // KAFKACONSUMERCLIENT_H

(3)kafkaconsumerclient.cpp

#include "kafkaconsumerclient.h"
KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/)
    :m_strBrokers(brokers),
      m_strTopics(topics),
      m_strGroupid(groupid),
      m_nPartition(nPartition),
      m_nCurrentOffset(offset)
{
}
KafkaConsumerClient::~KafkaConsumerClient()
{
    Stop();
}
bool KafkaConsumerClient::Init() {
    std::string errstr;
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if (!conf) {
        std::cerr << "RdKafka create global conf failed" << endl;
        return false;
    }
    /*设置broker list*/
    if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;
    }
    /*设置consumer group*/
    if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;
    }
    std::string strfetch_num = "10240000";
    /*每次从单个分区中拉取消息的最大尺寸*/
    if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;
    }
    /*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.
    m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);
    if (!m_pKafkaConsumer) {
        std::cerr << "failed to ceate consumer" << endl;
    }
    std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;
    delete conf;
    /*创建kafka topic的配置*/
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (!tconf) {
        std::cerr << "RdKafka create topic conf failed" << endl;
        return false;
    }
    if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {
        std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;
    }
    /*
     * Create topic handle.
     */
    m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;
    }
    delete tconf;
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);
    if (resp != RdKafka::ERR_NO_ERROR) {
        std::cerr << "failed to start consumer : " << errstr.c_str() << endl;
    }
    return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
    case RdKafka::ERR__TIMED_OUT:
      break;
    case RdKafka::ERR_NO_ERROR:
      /* Real message */
      std::cout << "Read msg at offset " << message->offset() << std::endl;
      if (message->key()) {
        std::cout << "Key: " << *message->key() << std::endl;
      }
      printf("%.*s\n",
        static_cast<int>(message->len()),
        static_cast<const char *>(message->payload()));
      m_nLastOffset = message->offset();
      break;
    case RdKafka::ERR__PARTITION_EOF:
      /* Last message */
      cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;
      //Stop();
      break;
    case RdKafka::ERR__UNKNOWN_TOPIC:
    case RdKafka::ERR__UNKNOWN_PARTITION:
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
    default:
      /* Errors */
      std::cerr << "Consume failed: " << message->errstr() << std::endl;
      Stop();
      break;
  }
}
void KafkaConsumerClient::Start(int timeout_ms){
    RdKafka::Message *msg = NULL;
    m_bRun = true;
    while (m_bRun) {
        msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);
        Msg_consume(msg, NULL);
        delete msg;
        m_pKafkaConsumer->poll(0);
    }
    m_pKafkaConsumer->stop(m_pTopic, m_nPartition);
    m_pKafkaConsumer->poll(1000);
    if (m_pTopic) {
        delete m_pTopic;
        m_pTopic = NULL;
    }
    if (m_pKafkaConsumer) {
        delete m_pKafkaConsumer;
        m_pKafkaConsumer = NULL;
    }
    /*销毁kafka实例*/ //Wait for RdKafka to decommission.
    RdKafka::wait_destroyed(5000);
}
void KafkaConsumerClient::Stop()
{
    m_bRun = false;
}

4.注意事项


(1)生产者


建议分区使用int partition = RD_KAFKA_PARTITION_UA;即根据key自动计算分区号


/* Use builtin partitioner to select partition*/

RD_KAFKA_PARTITION_UA,


* \p partition is the target partition, either:

*   - RdKafka::Topic::PARTITION_UA (unassigned) for

*     automatic partitioning using the topic's partitioner function, or


*   - a fixed partition (0..N)


(2)消费者


msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);

virtual Message *consume (Topic *topic, int32_t partition,

                           int timeout_ms) = 0;


看来消费者函数必须指定分区号,那么建议采用多进程的方式,每个进程订阅一个分区。




---


参考文章:


使用librdkafka 封装的C++类


相关文章
|
14天前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
26天前
|
存储 编译器 Linux
动态链接的魔法:Linux下动态链接库机制探讨
本文将深入探讨Linux系统中的动态链接库机制,这其中包括但不限于全局符号介入、延迟绑定以及地址无关代码等内容。
345 22
|
3月前
|
Linux API 开发工具
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
ijkplayer是由B站研发的移动端播放器,基于FFmpeg 3.4,支持Android和iOS。其源码托管于GitHub,截至2024年9月15日,获得了3.24万星标和0.81万分支,尽管已停止更新6年。本文档介绍了如何在Linux环境下编译ijkplayer的so库,以便在较新的开发环境中使用。首先需安装编译工具并调整/tmp分区大小,接着下载并安装Android SDK和NDK,最后下载ijkplayer源码并编译。详细步骤包括环境准备、工具安装及库编译等。更多FFmpeg开发知识可参考相关书籍。
120 0
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
|
4月前
|
消息中间件 运维 Linux
linux之centos运维kafka
linux之centos运维kafka
|
4月前
|
消息中间件 Java Linux
linux 之centos7安装kafka;;;;;待补充,未完成
linux 之centos7安装kafka;;;;;待补充,未完成
|
5月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
143 4
|
5月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
136 0
|
5月前
|
Linux 网络安全 API
【Azure 应用服务】App Service For Linux 环境中,如何从App Service中获取GitHub私有库(Private Repos)的Deploy Key(RSA key)呢?
【Azure 应用服务】App Service For Linux 环境中,如何从App Service中获取GitHub私有库(Private Repos)的Deploy Key(RSA key)呢?
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
121 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
65 1