Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)

完整源码下载地址:http://download.csdn.net/download/libaineu2004/10237535

配置文件参考来源

Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

源码参考https://github.com/edenhill/librdkafka/tree/master/examples/rdkafka_example.cpp


1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程

编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。

例如,我使用QtCreator之cmake模式,CMakeLists.txt如下:

cmake_minimum_required(VERSION 2.8)

project(KafkaProducerClient)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量

aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1

#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2

#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标

add_executable(${PROJECT_NAME} ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)


编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/

然后在终端执行命令,使之生效:

[root@localhost etc]# ldconfig


注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。



2、生产者源码

(1)main.cpp

#include <iostream>
#include "kafkaproducerclient.h"
using namespace std;
int main()
{
    //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
    KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("172.16.6.161:9092", "test", 0);
    KafkaprClient_->Init();
    KafkaprClient_->Send("hello world!");
    char str_msg[] = "Hello Kafka!";
    while (fgets(str_msg, sizeof(str_msg), stdin))
    {
        size_t len = strlen(str_msg);
        if (str_msg[len - 1] == '\n')
        {
            str_msg[--len] = '\0';
        }
        if (strcmp(str_msg, "end") == 0)
        {
            break;
        }
        KafkaprClient_->Send(str_msg);
    }
    return 0;
}


(2)kafkaproducerclient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
            message.errstr() << std::endl;
        if (message.key())
            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
            break;
        default:
            std::cerr << "EVENT " << event.type() <<
                " (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            break;
        }
    }
};
class KafkaProducerClient
{
public:
    KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
    virtual ~KafkaProducerClient();
    bool Init();
    void Send(const string &msg);
    void Stop();
private:
    RdKafka::Producer *m_pProducer = NULL;
    RdKafka::Topic *m_pTopic = NULL;
    KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
    KafkaProducerEventCallBack m_producerEventCallBack;
    std::string m_strTopics;
    std::string m_strBroker;
    bool m_bRun = false;
    int m_nPpartition = 0;
};
#endif // KAFKAPRODUCERCLIENT_H

(3)kafkaproducerclient.cpp

#include "kafkaproducerclient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
    : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
}
KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}
bool KafkaProducerClient::Init()
{
    string errstr = "";
    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    /*Set configuration properties,设置broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);
    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;
    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducerClient::Send(const string &msg)
{
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
                                                   RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
    m_pProducer->poll(0);
    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(100);
    }
}
void KafkaProducerClient::Stop()
{
    delete m_pTopic;
    delete m_pProducer;
}
相关文章
|
23天前
|
存储 C++ 容器
C++STL(标准模板库)处理学习应用案例
【4月更文挑战第8天】使用C++ STL,通过`std:vector`存储整数数组 `{5, 3, 1, 4, 2}`,然后利用`std::sort`进行排序,输出排序后序列:`std:vector<int> numbers; numbers = {5, 3, 1, 4, 2}; std:sort(numbers.begin(), numbers.end()); for (int number : numbers) { std::cout << number << " "; }`
21 2
|
28天前
|
JSON 机器人 Linux
推荐一款嵌入式Linux开源框架与封装-cpp-tbox
推荐一款嵌入式Linux开源框架与封装-cpp-tbox
54 3
|
2月前
|
存储 算法 安全
深入理解C++中的std::chrono库:持续时间的比较与应用
深入理解C++中的std::chrono库:持续时间的比较与应用
48 1
|
2月前
|
算法 数据处理 C++
【C++ 20 新特性 算法和迭代器库的扩展和泛化 Ranges】深入浅出C++ Ranges库 (Exploring the C++ Ranges Library)
【C++ 20 新特性 算法和迭代器库的扩展和泛化 Ranges】深入浅出C++ Ranges库 (Exploring the C++ Ranges Library)
105 1
|
1天前
|
存储 算法 C++
详解C++中的STL(标准模板库)容器
【4月更文挑战第30天】C++ STL容器包括序列容器(如`vector`、`list`、`deque`、`forward_list`、`array`和`string`)、关联容器(如`set`、`multiset`、`map`和`multimap`)和容器适配器(如`stack`、`queue`和`priority_queue`)。它们为动态数组、链表、栈、队列、集合和映射等数据结构提供了高效实现。选择合适的容器类型可优化性能,满足不同编程需求。
|
7天前
|
存储 算法 程序员
C++从入门到精通:2.2.1标准库与STL容器算法深度解析
C++从入门到精通:2.2.1标准库与STL容器算法深度解析
|
15天前
|
机器学习/深度学习 定位技术 C++
c++中常用库函数
c++中常用库函数
38 0
|
21天前
|
C++
glog --- C++日志库
glog --- C++日志库
|
29天前
|
XML JSON JavaScript
推荐一个比较好用的c++版本http协议库-cpp-httplib
推荐一个比较好用的c++版本http协议库-cpp-httplib
44 1
|
2月前
|
C++ 容器
【C++】标准库类型string
【C++】标准库类型string
164 3

热门文章

最新文章