windows下kafka的环境配置及rdkafka库的应用

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: windows下kafka的环境配置及rdkafka库的应用

参考:

https://blog.csdn.net/woshixiazaizhe/article/details/80610432

1、下载并安装Win32OpenSSL-1_0_2s

2、从github上下载代码:

https://github.com/edenhill/librdkafka/tree/v0.11.6-RC4,解压并用vs2017打开win32/librdkafka.vcxproj工程文件

3、选中librdkafka工程,右键属性-Platform Toolset,改为Visual Stdio 2015(v140)

4、找到regexp.c文件,在开始部分添加代码:

#if _MSC_VER>=1900
#include "stdio.h" 
_ACRTIMP_ALT FILE* __cdecl __acrt_iob_func(unsigned);
#ifdef __cplusplus 
extern "C"
#endif 
FILE* __cdecl __iob_func(unsigned i) {
  return __acrt_iob_func(i);
}
#endif /* _MSC_VER>=1900 */

增加legacy_stdio_definitions.lib

5、编译代码

6、选中librdkafkacpp工程后,照步骤4修改平台工具集为Visual Stdio 2015(v140)

7、编译librdkafkacpp代码,此时rdfkafka已经算编译成功了。

8、将rdkafka.h和rdkafkacpp.h移动到新建的inc目录,将librdkafka.lib,librdkafkacpp.lib移动到lib文件夹下,为下面的c++ kafka应用作准备。

9、新建rdkafka的producer工程,写producer相关代码:

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include "rdkafkacpp.h"
static bool run = true;
static void sigterm(int sig) {
  run = false;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
  void dr_cb(RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
      message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};
class ExampleEventCb : public RdKafka::EventCb {
public:
  void event_cb(RdKafka::Event &event) {
    switch (event.type())
    {
    case RdKafka::Event::EVENT_ERROR:
      std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
        event.str() << std::endl;
      if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
        run = false;
      break;
    case RdKafka::Event::EVENT_STATS:
      std::cerr << "\"STATS\": " << event.str() << std::endl;
      break;
    case RdKafka::Event::EVENT_LOG:
      fprintf(stderr, "LOG-%i-%s: %s\n",
        event.severity(), event.fac().c_str(), event.str().c_str());
      break;
    default:
      std::cerr << "EVENT " << event.type() <<
        " (" << RdKafka::err2str(event.err()) << "): " <<
        event.str() << std::endl;
      break;
    }
  }
};
int main()
{
  std::string brokers = "localhost";
  std::string errstr;
  std::string topic_str = "test";
  int32_t partition = RdKafka::Topic::PARTITION_UA;
  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  conf->set("bootstrap.servers", brokers, errstr);
  ExampleEventCb ex_event_cb;
  conf->set("event_cb", &ex_event_cb, errstr);
  signal(SIGINT, sigterm);
  signal(SIGTERM, sigterm);
  ExampleDeliveryReportCb ex_dr_cb;
  conf->set("dr_cb", &ex_dr_cb, errstr);
  RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
  if (!producer) {
    std::cerr << "Failed to create producer: " << errstr << std::endl;
    exit(1);
  }
  std::cout << "% Created producer " << producer->name() << std::endl;
  RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str,
    tconf, errstr);
  if (!topic) {
    std::cerr << "Failed to create topic: " << errstr << std::endl;
    exit(1);
  }
  for (std::string line; run && std::getline(std::cin, line);) {
    if (line.empty()) {
      producer->poll(0);
      continue;
    }
    RdKafka::ErrorCode resp =
      producer->produce(topic, partition,
        RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
        const_cast<char *>(line.c_str()), line.size(),
        NULL, NULL);
    if (resp != RdKafka::ERR_NO_ERROR)
      std::cerr << "% Produce failed: " <<
      RdKafka::err2str(resp) << std::endl;
    else
      std::cerr << "% Produced message (" << line.size() << " bytes)" <<
      std::endl;
    producer->poll(0);
  }
  run = true;
  // 退出前处理完输出队列中的消息
  while (run && producer->outq_len() > 0) {
    std::cerr << "Waiting for " << producer->outq_len() << std::endl;
    producer->poll(1000);
  }
  delete conf;
  delete tconf;
  delete topic;
  delete producer;
  RdKafka::wait_destroyed(5000);
  return 0;
}

完成后将步骤6的inc和lib目录添加进工程配置里面,然后编译代码。(如果报找不到librdkafka.dll或者librdkafkacpp.dll,就将对应的dll拷贝到Debug\Release的生成文件目录中。)

8、下载jdk,配置java环境变量JAVA_HOME及PATH

9、下载zookeeper-3.4.14.tar,解压后复制zoo_sample.cfg重命名成zoo.cfg,修改编辑zoo.cfg,修改dataDir为【dataDir=/data】

10、添加环境变量

ZOOKEEPER_HOME          D:\Program Files\zookeeper-3.5.2-alpha
   Path 在现有的值后面添加     ;%ZOOKEEPER_HOME%\bin;

11、运行zookeeper:zkserver

12、下载kafka_2.12-2.3.0,解压后打开目录D:\kafka_2.12-0.11.0.0\config下server.properties文件,把log.dirs修改为【log.dirs=D:\kafka-logs】

12、运行kafka集群环境:.\bin\windows\kafka-server-start.bat .\config\server.properties

13、运行kafka的producer:.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

14、运行kafka的customer:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

15、在producer的cmd里面写消息内容,可以看到customer的cmd接收到了消息。

16、启动步骤9的rdkafka的producer程序(程序中端口设置为9092,ip设置为licalhost,topic设置为test),在启动窗口发送消息,可以看到customer的cmd里面接收到了消息。

windows下kafka安装及c++实现kafka的源码地址:

https://download.csdn.net/download/qq_23350817/11715765

相关文章
|
2天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
17 5
|
13天前
|
XML 缓存 前端开发
Electron-builder 是如何打包 Windows 应用的?
本文首发于微信公众号“前端徐徐”,作者徐徐深入解析了 electron-builder 在 Windows 平台上的打包流程。文章详细介绍了 `winPackager.ts`、`AppxTarget.ts`、`MsiTarget.ts` 和 `NsisTarget.ts` 等核心文件,涵盖了目标创建、图标处理、代码签名、资源编辑、应用签名、性能优化等内容,并分别讲解了 AppX/MSIX、MSI 和 NSIS 安装程序的生成过程。通过这些内容,读者可以更好地理解和使用 electron-builder 进行 Windows 应用的打包和发布。
56 0
|
2月前
|
消息中间件 Java Kafka
windows服务器重装系统之后,Kafka服务如何恢复?
windows服务器重装系统之后,Kafka服务如何恢复?
26 8
|
24天前
|
数据可视化 程序员 C#
C#中windows应用窗体程序的输入输出方法实例
C#中windows应用窗体程序的输入输出方法实例
23 0
|
3月前
|
Unix Linux Ruby
在windows和linux上高效快捷地发布Dash应用
在windows和linux上高效快捷地发布Dash应用
|
3月前
|
Linux iOS开发 开发者
跨平台开发不再难:.NET Core如何让你的应用在Windows、Linux、macOS上自如游走?
【8月更文挑战第28天】本文提供了一份详尽的.NET跨平台开发指南,涵盖.NET Core简介、环境配置、项目结构、代码编写、依赖管理、构建与测试、部署及容器化等多个方面,帮助开发者掌握关键技术与最佳实践,充分利用.NET Core实现高效、便捷的跨平台应用开发与部署。
119 3
|
3月前
|
vr&ar C# 图形学
WPF与AR/VR的激情碰撞:解锁Windows Presentation Foundation应用新维度,探索增强现实与虚拟现实技术在现代UI设计中的无限可能与实战应用详解
【8月更文挑战第31天】增强现实(AR)与虚拟现实(VR)技术正迅速改变生活和工作方式,在游戏、教育及工业等领域展现出广泛应用前景。本文探讨如何在Windows Presentation Foundation(WPF)环境中实现AR/VR功能,通过具体示例代码展示整合过程。尽管WPF本身不直接支持AR/VR,但借助第三方库如Unity、Vuforia或OpenVR,可实现沉浸式体验。例如,通过Unity和Vuforia在WPF中创建AR应用,或利用OpenVR在WPF中集成VR功能,从而提升用户体验并拓展应用功能边界。
60 0
|
3月前
|
存储 开发者 C#
WPF与邮件发送:教你如何在Windows Presentation Foundation应用中无缝集成电子邮件功能——从界面设计到代码实现,全面解析邮件发送的每一个细节密武器!
【8月更文挑战第31天】本文探讨了如何在Windows Presentation Foundation(WPF)应用中集成电子邮件发送功能,详细介绍了从创建WPF项目到设计用户界面的全过程,并通过具体示例代码展示了如何使用`System.Net.Mail`命名空间中的`SmtpClient`和`MailMessage`类来实现邮件发送逻辑。文章还强调了安全性和错误处理的重要性,提供了实用的异常捕获代码片段,旨在帮助WPF开发者更好地掌握邮件发送技术,提升应用程序的功能性与用户体验。
52 0
|
3月前
|
C# Windows 监控
WPF应用跨界成长秘籍:深度揭秘如何与Windows服务完美交互,扩展功能无界限!
【8月更文挑战第31天】WPF(Windows Presentation Foundation)是 .NET 框架下的图形界面技术,具有丰富的界面设计和灵活的客户端功能。在某些场景下,WPF 应用需与 Windows 服务交互以实现后台任务处理、系统监控等功能。本文探讨了两者交互的方法,并通过示例代码展示了如何扩展 WPF 应用的功能。首先介绍了 Windows 服务的基础知识,然后阐述了创建 Windows 服务、设计通信接口及 WPF 客户端调用服务的具体步骤。通过合理的交互设计,WPF 应用可获得更强的后台处理能力和系统级操作权限,提升应用的整体性能。
96 0
|
3月前
|
网络安全 API 数据安全/隐私保护
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)