Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)

完整源码下载地址:http://download.csdn.net/download/libaineu2004/10237535

配置文件参考来源

Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

源码参考https://github.com/edenhill/librdkafka/tree/master/examples/rdkafka_example.cpp


1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程

编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。

例如,我使用QtCreator之cmake模式,CMakeLists.txt如下:

cmake_minimum_required(VERSION 2.8)

project(KafkaProducerClient)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量

aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1

#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2

#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标

add_executable(${PROJECT_NAME} ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)


编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/

然后在终端执行命令,使之生效:

[root@localhost etc]# ldconfig


注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。



2、生产者源码

(1)main.cpp

#include <iostream>
#include "kafkaproducerclient.h"
using namespace std;
int main()
{
    //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
    KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("172.16.6.161:9092", "test", 0);
    KafkaprClient_->Init();
    KafkaprClient_->Send("hello world!");
    char str_msg[] = "Hello Kafka!";
    while (fgets(str_msg, sizeof(str_msg), stdin))
    {
        size_t len = strlen(str_msg);
        if (str_msg[len - 1] == '\n')
        {
            str_msg[--len] = '\0';
        }
        if (strcmp(str_msg, "end") == 0)
        {
            break;
        }
        KafkaprClient_->Send(str_msg);
    }
    return 0;
}


(2)kafkaproducerclient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
            message.errstr() << std::endl;
        if (message.key())
            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
            break;
        default:
            std::cerr << "EVENT " << event.type() <<
                " (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            break;
        }
    }
};
class KafkaProducerClient
{
public:
    KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
    virtual ~KafkaProducerClient();
    bool Init();
    void Send(const string &msg);
    void Stop();
private:
    RdKafka::Producer *m_pProducer = NULL;
    RdKafka::Topic *m_pTopic = NULL;
    KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
    KafkaProducerEventCallBack m_producerEventCallBack;
    std::string m_strTopics;
    std::string m_strBroker;
    bool m_bRun = false;
    int m_nPpartition = 0;
};
#endif // KAFKAPRODUCERCLIENT_H

(3)kafkaproducerclient.cpp

#include "kafkaproducerclient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
    : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
}
KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}
bool KafkaProducerClient::Init()
{
    string errstr = "";
    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    /*Set configuration properties,设置broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);
    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;
    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducerClient::Send(const string &msg)
{
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
                                                   RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
    m_pProducer->poll(0);
    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(100);
    }
}
void KafkaProducerClient::Stop()
{
    delete m_pTopic;
    delete m_pProducer;
}
相关文章
|
3月前
|
安全 Linux vr&ar
Linux的动态库和静态库
Linux的动态库和静态库
|
13天前
|
Linux API 开发工具
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
ijkplayer是由B站研发的移动端播放器,基于FFmpeg 3.4,支持Android和iOS。其源码托管于GitHub,截至2024年9月15日,获得了3.24万星标和0.81万分支,尽管已停止更新6年。本文档介绍了如何在Linux环境下编译ijkplayer的so库,以便在较新的开发环境中使用。首先需安装编译工具并调整/tmp分区大小,接着下载并安装Android SDK和NDK,最后下载ijkplayer源码并编译。详细步骤包括环境准备、工具安装及库编译等。更多FFmpeg开发知识可参考相关书籍。
54 0
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
|
2月前
|
数据安全/隐私保护 C语言 C++
C++(七)封装
本文档详细介绍了C++封装的概念及其应用。封装通过权限控制对外提供接口并隐藏内部数据,增强代码的安全性和可维护性。文档首先解释了`class`中的权限修饰符(`public`、`private`、`protected`)的作用,并通过示例展示了如何使用封装实现栈结构。接着介绍了构造器和析构器的使用方法,包括初始化列表的引入以及它们在内存管理和对象生命周期中的重要性。最后,通过分文件编程的方式展示了如何将类定义和实现分离,提高代码的模块化和复用性。
|
3月前
|
Linux API
在Linux中,程序产生了库日志虽然删除了,但磁盘空间未更新是什么原因?
在Linux中,程序产生了库日志虽然删除了,但磁盘空间未更新是什么原因?
|
3月前
|
Linux 网络安全 API
【Azure 应用服务】App Service For Linux 环境中,如何从App Service中获取GitHub私有库(Private Repos)的Deploy Key(RSA key)呢?
【Azure 应用服务】App Service For Linux 环境中,如何从App Service中获取GitHub私有库(Private Repos)的Deploy Key(RSA key)呢?
|
3月前
|
小程序 Linux 开发者
Linux之缓冲区与C库IO函数简单模拟
通过上述编程实例,可以对Linux系统中缓冲区和C库IO函数如何提高文件读写效率有了一个基本的了解。开发者需要根据应用程序的具体需求来选择合适的IO策略。
28 0
|
14天前
|
存储 编译器 对象存储
【C++打怪之路Lv5】-- 类和对象(下)
【C++打怪之路Lv5】-- 类和对象(下)
19 4
|
14天前
|
编译器 C语言 C++
【C++打怪之路Lv4】-- 类和对象(中)
【C++打怪之路Lv4】-- 类和对象(中)
16 4
|
13天前
|
存储 安全 C++
【C++打怪之路Lv8】-- string类
【C++打怪之路Lv8】-- string类
14 1
|
24天前
|
存储 编译器 C++
【C++类和对象(下)】——我与C++的不解之缘(五)
【C++类和对象(下)】——我与C++的不解之缘(五)