完整源码下载地址:http://download.csdn.net/download/libaineu2004/10237535
配置文件参考来源
Global configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
源码参考https://github.com/edenhill/librdkafka/tree/master/examples/rdkafka_example.cpp
1、调用librdkafka库(https://github.com/edenhill/librdkafka)自主编程
编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。
例如,我使用QtCreator之cmake模式,CMakeLists.txt如下:
cmake_minimum_required(VERSION 2.8)
project(KafkaProducerClient)
#查找当前目录下的所有源文件,并将名称保存到DIR_SRCS变量
aux_source_directory(. DIR_SRCS)
#指定编译选项,方法1
#ADD_DEFINITIONS(-lz -lpthread -lrt)
#指定编译选项,方法2
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -lz -lpthread -lrt")
#指定生成目标
add_executable(${PROJECT_NAME} ${DIR_SRCS})
#指定在链接目标文件的时候需要链接的外部库,其效果类似gcc的编译参数“-l”,可以解决外部库的依赖问题
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka)
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory
此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/
然后在终端执行命令,使之生效:
[root@localhost etc]# ldconfig
注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。
2、生产者源码
(1)main.cpp
#include <iostream> #include "kafkaproducerclient.h" using namespace std; int main() { //KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0); KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("172.16.6.161:9092", "test", 0); KafkaprClient_->Init(); KafkaprClient_->Send("hello world!"); char str_msg[] = "Hello Kafka!"; while (fgets(str_msg, sizeof(str_msg), stdin)) { size_t len = strlen(str_msg); if (str_msg[len - 1] == '\n') { str_msg[--len] = '\0'; } if (strcmp(str_msg, "end") == 0) { break; } KafkaprClient_->Send(str_msg); } return 0; }
(2)kafkaproducerclient.h
#ifndef KAFKAPRODUCERCLIENT_H #define KAFKAPRODUCERCLIENT_H #include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <list> #include <librdkafka/rdkafkacpp.h> #include <vector> #include <fstream> using std::string; using std::list; using std::cout; using std::endl; using std::vector; using std::fstream; class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message &message) { std::cout << "Message delivery for (" << message.len() << " bytes): " << message.errstr() << std::endl; if (message.key()) std::cout << "Key: " << *(message.key()) << ";" << std::endl; } }; class KafkaProducerEventCallBack : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; class KafkaProducerClient { public: KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0); virtual ~KafkaProducerClient(); bool Init(); void Send(const string &msg); void Stop(); private: RdKafka::Producer *m_pProducer = NULL; RdKafka::Topic *m_pTopic = NULL; KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack; KafkaProducerEventCallBack m_producerEventCallBack; std::string m_strTopics; std::string m_strBroker; bool m_bRun = false; int m_nPpartition = 0; }; #endif // KAFKAPRODUCERCLIENT_H
(3)kafkaproducerclient.cpp
#include "kafkaproducerclient.h" KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/) : m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition) { } KafkaProducerClient::~KafkaProducerClient() { Stop(); } bool KafkaProducerClient::Init() { string errstr = ""; /* * Create configuration objects */ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); /*Set configuration properties,设置broker list*/ if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){ std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl; } /* Set delivery report callback */ conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr); conf->set("event_cb", &m_producerEventCallBack, errstr); /* * Create producer using accumulated global configuration. */ m_pProducer = RdKafka::Producer::create(conf, errstr); if (!m_pProducer) { std::cerr << "Failed to create producer: " << errstr << std::endl; return false; } std::cout << "% Created producer " << m_pProducer->name() << std::endl; /* * Create topic handle. */ m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics, tconf, errstr); if (!m_pTopic) { std::cerr << "Failed to create topic: " << errstr << std::endl; return false; } return true; } void KafkaProducerClient::Send(const string &msg) { if (!m_bRun) return; /* * Produce message */ RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast<char *>(msg.c_str()), msg.size(), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl; else std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl; m_pProducer->poll(0); /* Wait for messages to be delivered */ //firecat add while (m_bRun && m_pProducer->outq_len() > 0) { std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl; m_pProducer->poll(100); } } void KafkaProducerClient::Stop() { delete m_pTopic; delete m_pProducer; }