参考:
https://blog.csdn.net/woshixiazaizhe/article/details/80610432
1、下载并安装Win32OpenSSL-1_0_2s
2、从github上下载代码:
https://github.com/edenhill/librdkafka/tree/v0.11.6-RC4,解压并用vs2017打开win32/librdkafka.vcxproj工程文件
3、选中librdkafka工程,右键属性-Platform Toolset,改为Visual Stdio 2015(v140)
4、找到regexp.c文件,在开始部分添加代码:
#if _MSC_VER>=1900 #include "stdio.h" _ACRTIMP_ALT FILE* __cdecl __acrt_iob_func(unsigned); #ifdef __cplusplus extern "C" #endif FILE* __cdecl __iob_func(unsigned i) { return __acrt_iob_func(i); } #endif /* _MSC_VER>=1900 */
增加legacy_stdio_definitions.lib
5、编译代码
6、选中librdkafkacpp工程后,照步骤4修改平台工具集为Visual Stdio 2015(v140)
7、编译librdkafkacpp代码,此时rdfkafka已经算编译成功了。
8、将rdkafka.h和rdkafkacpp.h移动到新建的inc目录,将librdkafka.lib,librdkafkacpp.lib移动到lib文件夹下,为下面的c++ kafka应用作准备。
9、新建rdkafka的producer工程,写producer相关代码:
#include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <getopt.h> #include "rdkafkacpp.h" static bool run = true; static void sigterm(int sig) { run = false; } class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { public: void dr_cb(RdKafka::Message &message) { std::cout << "Message delivery for (" << message.len() << " bytes): " << message.errstr() << std::endl; if (message.key()) std::cout << "Key: " << *(message.key()) << ";" << std::endl; } }; class ExampleEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) run = false; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } }; int main() { std::string brokers = "localhost"; std::string errstr; std::string topic_str = "test"; int32_t partition = RdKafka::Topic::PARTITION_UA; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); conf->set("bootstrap.servers", brokers, errstr); ExampleEventCb ex_event_cb; conf->set("event_cb", &ex_event_cb, errstr); signal(SIGINT, sigterm); signal(SIGTERM, sigterm); ExampleDeliveryReportCb ex_dr_cb; conf->set("dr_cb", &ex_dr_cb, errstr); RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); if (!producer) { std::cerr << "Failed to create producer: " << errstr << std::endl; exit(1); } std::cout << "% Created producer " << producer->name() << std::endl; RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); if (!topic) { std::cerr << "Failed to create topic: " << errstr << std::endl; exit(1); } for (std::string line; run && std::getline(std::cin, line);) { if (line.empty()) { producer->poll(0); continue; } RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast<char *>(line.c_str()), line.size(), NULL, NULL); if (resp != RdKafka::ERR_NO_ERROR) std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl; else std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl; producer->poll(0); } run = true; // 退出前处理完输出队列中的消息 while (run && producer->outq_len() > 0) { std::cerr << "Waiting for " << producer->outq_len() << std::endl; producer->poll(1000); } delete conf; delete tconf; delete topic; delete producer; RdKafka::wait_destroyed(5000); return 0; }
完成后将步骤6的inc和lib目录添加进工程配置里面,然后编译代码。(如果报找不到librdkafka.dll或者librdkafkacpp.dll,就将对应的dll拷贝到Debug\Release的生成文件目录中。)
8、下载jdk,配置java环境变量JAVA_HOME及PATH
9、下载zookeeper-3.4.14.tar,解压后复制zoo_sample.cfg重命名成zoo.cfg,修改编辑zoo.cfg,修改dataDir为【dataDir=/data】
10、添加环境变量
ZOOKEEPER_HOME D:\Program Files\zookeeper-3.5.2-alpha Path 在现有的值后面添加 ;%ZOOKEEPER_HOME%\bin;
11、运行zookeeper:zkserver
12、下载kafka_2.12-2.3.0,解压后打开目录D:\kafka_2.12-0.11.0.0\config下server.properties文件,把log.dirs修改为【log.dirs=D:\kafka-logs】
12、运行kafka集群环境:.\bin\windows\kafka-server-start.bat .\config\server.properties
13、运行kafka的producer:.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
14、运行kafka的customer:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
15、在producer的cmd里面写消息内容,可以看到customer的cmd接收到了消息。
16、启动步骤9的rdkafka的producer程序(程序中端口设置为9092,ip设置为licalhost,topic设置为test),在启动窗口发送消息,可以看到customer的cmd里面接收到了消息。
windows下kafka安装及c++实现kafka的源码地址: