Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(1)(★firecat推荐★)

简介: Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)

完整源码下载地址:http://download.csdn.net/download/libaineu2004/10237535

配置文件参考来源

Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

源码参考https://github.com/edenhill/librdkafka/tree/master/examples/rdkafka_example.cpp


1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程

编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。

例如,我使用QtCreator之cmake模式,CMakeLists.txt如下:

cmake_minimum_required(VERSION 2.8)

project(KafkaProducerClient)

#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量

aux_source_directory(. DIR_SRCS)

#指定编译选项,方法1

#ADD_DEFINITIONS(-lz -lpthread -lrt)

#指定编译选项,方法2

#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")

#指定生成目标

add_executable(${PROJECT_NAME} ${DIR_SRCS})

#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)

TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)


编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/

然后在终端执行命令,使之生效:

[root@localhost etc]# ldconfig


注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。



2、生产者源码

(1)main.cpp

#include <iostream>
#include "kafkaproducerclient.h"
using namespace std;
int main()
{
    //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
    KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("172.16.6.161:9092", "test", 0);
    KafkaprClient_->Init();
    KafkaprClient_->Send("hello world!");
    char str_msg[] = "Hello Kafka!";
    while (fgets(str_msg, sizeof(str_msg), stdin))
    {
        size_t len = strlen(str_msg);
        if (str_msg[len - 1] == '\n')
        {
            str_msg[--len] = '\0';
        }
        if (strcmp(str_msg, "end") == 0)
        {
            break;
        }
        KafkaprClient_->Send(str_msg);
    }
    return 0;
}


(2)kafkaproducerclient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {
        std::cout << "Message delivery for (" << message.len() << " bytes): " <<
            message.errstr() << std::endl;
        if (message.key())
            std::cout << "Key: " << *(message.key()) << ";" << std::endl;
    }
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
            break;
        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;
        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
            break;
        default:
            std::cerr << "EVENT " << event.type() <<
                " (" << RdKafka::err2str(event.err()) << "): " <<
                event.str() << std::endl;
            break;
        }
    }
};
class KafkaProducerClient
{
public:
    KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);
    virtual ~KafkaProducerClient();
    bool Init();
    void Send(const string &msg);
    void Stop();
private:
    RdKafka::Producer *m_pProducer = NULL;
    RdKafka::Topic *m_pTopic = NULL;
    KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
    KafkaProducerEventCallBack m_producerEventCallBack;
    std::string m_strTopics;
    std::string m_strBroker;
    bool m_bRun = false;
    int m_nPpartition = 0;
};
#endif // KAFKAPRODUCERCLIENT_H

(3)kafkaproducerclient.cpp

#include "kafkaproducerclient.h"
KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/)
    : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
}
KafkaProducerClient::~KafkaProducerClient()
{
    Stop();
}
bool KafkaProducerClient::Init()
{
    string errstr = "";
    /*
     * Create configuration objects
     */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    /*Set configuration properties,设置broker list*/
    if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
        std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
    }
    /* Set delivery report callback */
    conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
    conf->set("event_cb", &m_producerEventCallBack, errstr);
    /*
     * Create producer using accumulated global configuration.
    */
    m_pProducer = RdKafka::Producer::create(conf, errstr);
    if (!m_pProducer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return false;
    }
    std::cout << "% Created producer " << m_pProducer->name() << std::endl;
    /*
     * Create topic handle.
    */
    m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
                                      tconf, errstr);
    if (!m_pTopic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return false;
    }
    return true;
}
void KafkaProducerClient::Send(const string &msg)
{
    if (!m_bRun)
        return;
    /*
     * Produce message
    */
    RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
                                                   RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                   const_cast<char *>(msg.c_str()), msg.size(),
                                                   NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
        std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
    else
        std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
    m_pProducer->poll(0);
    /* Wait for messages to be delivered */  //firecat add
    while (m_bRun && m_pProducer->outq_len() > 0) {
        std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
        m_pProducer->poll(100);
    }
}
void KafkaProducerClient::Stop()
{
    delete m_pTopic;
    delete m_pProducer;
}
相关文章
|
26天前
|
存储 编译器 Linux
动态链接的魔法:Linux下动态链接库机制探讨
本文将深入探讨Linux系统中的动态链接库机制,这其中包括但不限于全局符号介入、延迟绑定以及地址无关代码等内容。
345 22
|
5月前
|
算法 C语言 C++
C++语言学习指南:从新手到高手,一文带你领略系统编程的巅峰技艺!
【8月更文挑战第22天】C++由Bjarne Stroustrup于1985年创立,凭借卓越性能与灵活性,在系统编程、游戏开发等领域占据重要地位。它继承了C语言的高效性,并引入面向对象编程,使代码更模块化易管理。C++支持基本语法如变量声明与控制结构;通过`iostream`库实现输入输出;利用类与对象实现面向对象编程;提供模板增强代码复用性;具备异常处理机制确保程序健壮性;C++11引入现代化特性简化编程;标准模板库(STL)支持高效编程;多线程支持利用多核优势。虽然学习曲线陡峭,但掌握后可开启高性能编程大门。随着新标准如C++20的发展,C++持续演进,提供更多开发可能性。
94 0
|
3月前
|
算法 C++
2022年第十三届蓝桥杯大赛C/C++语言B组省赛题解
2022年第十三届蓝桥杯大赛C/C++语言B组省赛题解
63 5
|
3月前
|
Linux API 开发工具
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
ijkplayer是由B站研发的移动端播放器,基于FFmpeg 3.4,支持Android和iOS。其源码托管于GitHub,截至2024年9月15日,获得了3.24万星标和0.81万分支,尽管已停止更新6年。本文档介绍了如何在Linux环境下编译ijkplayer的so库,以便在较新的开发环境中使用。首先需安装编译工具并调整/tmp分区大小,接着下载并安装Android SDK和NDK,最后下载ijkplayer源码并编译。详细步骤包括环境准备、工具安装及库编译等。更多FFmpeg开发知识可参考相关书籍。
120 0
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
|
3月前
|
Ubuntu Linux 编译器
Linux/Ubuntu下使用VS Code配置C/C++项目环境调用OpenCV
通过以上步骤,您已经成功在Ubuntu系统下的VS Code中配置了C/C++项目环境,并能够调用OpenCV库进行开发。请确保每一步都按照您的系统实际情况进行适当调整。
737 3
|
3月前
|
Linux C语言 C++
vsCode远程执行c和c++代码并操控linux服务器完整教程
这篇文章提供了一个完整的教程,介绍如何在Visual Studio Code中配置和使用插件来远程执行C和C++代码,并操控Linux服务器,包括安装VSCode、安装插件、配置插件、配置编译工具、升级glibc和编写代码进行调试的步骤。
455 0
vsCode远程执行c和c++代码并操控linux服务器完整教程
|
3月前
|
存储 编译器 C语言
深入计算机语言之C++:类与对象(上)
深入计算机语言之C++:类与对象(上)
|
3月前
|
存储 分布式计算 编译器
深入计算机语言之C++:C到C++的过度-2
深入计算机语言之C++:C到C++的过度-2
|
3月前
|
编译器 Linux C语言
深入计算机语言之C++:C到C++的过度-1
深入计算机语言之C++:C到C++的过度-1
|
4月前
|
JavaScript 前端开发 测试技术
一个google Test文件C++语言案例
这篇文章我们来介绍一下真正的C++语言如何用GTest来实现单元测试。
30 0