Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)

完整源码下载地址:http://download.csdn.net/download/libaineu2004/10237535

配置文件参考来源

Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

源码参考https://github.com/edenhill/librdkafka/tree/master/examples/rdkafka_example.cpp


1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程

编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。

例如,我使用QtCreator之cmake模式,CMakeLists.txt如下:

cmake_minimum_required(VERSION 2.8)

project(KafkaProducerClient)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量

aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1

#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2

#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标

add_executable(${PROJECT_NAME} ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)


编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/

然后在终端执行命令,使之生效:

[root@localhost etc]# ldconfig


注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。



2、生产者源码

(1)main.cpp

#include <iostream>
#include "kafkaproducerclient.h"
using namespace std;
int main()
{
    //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
    KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("172.16.6.161:9092", "test", 0);
    KafkaprClient_->Init();
    KafkaprClient_->Send("hello world!");
    char str_msg[] = "Hello Kafka!";
    while (fgets(str_msg, sizeof(str_msg), stdin))
    {
        size_t len = strlen(str_msg);
        if (str_msg[len - 1] == '\n')
        {
            str_msg[--len] = '\0';
        }
        if (strcmp(str_msg, "end") == 0)
        {
            break;
        }
        KafkaprClient_->Send(str_msg);
    }
    return 0;
}


(2)kafkaproducerclient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
            message.errstr() << std::endl;
        if (message.key())
            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
            break;
        default:
            std::cerr << "EVENT " << event.type() <<
                " (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            break;
        }
    }
};
class KafkaProducerClient
{
public:
    KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
    virtual ~KafkaProducerClient();
    bool Init();
    void Send(const string &msg);
    void Stop();
private:
    RdKafka::Producer *m_pProducer = NULL;
    RdKafka::Topic *m_pTopic = NULL;
    KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
    KafkaProducerEventCallBack m_producerEventCallBack;
    std::string m_strTopics;
    std::string m_strBroker;
    bool m_bRun = false;
    int m_nPpartition = 0;
};
#endif // KAFKAPRODUCERCLIENT_H

(3)kafkaproducerclient.cpp

#include "kafkaproducerclient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
    : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
}
KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}
bool KafkaProducerClient::Init()
{
    string errstr = "";
    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    /*Set configuration properties,设置broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);
    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;
    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducerClient::Send(const string &msg)
{
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
                                                   RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
    m_pProducer->poll(0);
    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(100);
    }
}
void KafkaProducerClient::Stop()
{
    delete m_pTopic;
    delete m_pProducer;
}
相关文章
|
1月前
|
算法 C++ 容器
C++标准库(速查)总结
C++标准库(速查)总结
65 6
|
1月前
|
存储 算法 C++
C++ STL 初探:打开标准模板库的大门
C++ STL 初探:打开标准模板库的大门
96 10
|
1月前
|
存储 程序员 C++
C++常用基础知识—STL库(2)
C++常用基础知识—STL库(2)
70 5
|
1月前
|
存储 自然语言处理 程序员
C++常用基础知识—STL库(1)
C++常用基础知识—STL库(1)
53 1
|
2月前
|
编译器 API C语言
超级好用的C++实用库之跨平台实用方法
超级好用的C++实用库之跨平台实用方法
40 6
|
2月前
|
安全 C++
超级好用的C++实用库之环形内存池
超级好用的C++实用库之环形内存池
48 5
|
2月前
|
缓存 网络协议 Linux
超级好用的C++实用库之套接字
超级好用的C++实用库之套接字
34 1
|
2月前
|
存储 算法 安全
超级好用的C++实用库之sha256算法
超级好用的C++实用库之sha256算法
101 1
|
2月前
|
存储 算法 安全
超级好用的C++实用库之国密sm4算法
超级好用的C++实用库之国密sm4算法
55 0
|
2月前
|
网络协议 Linux C++
超级好用的C++实用库之网络
超级好用的C++实用库之网络
49 0
下一篇
无影云桌面