Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍

简介: Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍

分配分区


在分配分区的时候,要注意。对于一个已经创建了分区的主题且已经指定了分区,那么之后的producer代码如果是直接修改partitioner部分的代码,直接引入key值进行分区的重新分配的话,是不行的,会继续按照之前的分区进行添加(之前的分区是分区0,只有一个)。此时如果在程序中查看partition_cnt我们是可以看到,该值并没有因为config/server.properties的修改而变化,这是因为此时的partition_cnt是针对该已经创建的主题topic的。

而如果尚自单纯修改代码中的partition_cnt在用于计算分区值时候:djb_hash(key->c_str(), key->size()) % 5 是会得到如下结果的:提示分区不存在。

image.png

我们可以通过rdkafka_example来查看某个topic下对应的partition数量的:

./rdkafka_example -L -t helloworld_kugou -b localhost:9092


image.png


从中我们可以看到helloworld_kugou主题只有一个partition,而helloworld_kugou1主题是有5个partition的,这个和我们预期的相符合。

我们可以对已经创建的主题修改其分区:

./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partition 5 --topic helloworld_kugou

修改完之后,我们可以看出,helloworld_kugou已经变为5个分区了。


image.png

image.png


具体示例:


创建topic为helloworld_kugou_test,5个partition。我们可以看到,在producer端进行输入之前,在预先设置好的log目录下是已经有5个partition:

image.png


producer端代码:

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb 
{
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
    if (message.key())
      std::cout << "Key: " << *(message.key()) << ";" << std::endl;
  }
};
class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;
      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;
      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;
      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};
/* Use of this partitioner is pretty pointless since no key is provided
 * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
    public:
        int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque) 
        {
            std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
            return djb_hash(key->c_str(), key->size()) % partition_cnt;
        }
    private:
        static inline unsigned int djb_hash (const char *str, size_t len) 
        {
        unsigned int hash = 5381;
        for (size_t i = 0 ; i < len ; i++)
            hash = ((hash << 5) + hash) + str[i];
        std::cout<<"hash1="<<hash<<std::endl;
        return hash;
        }
};
void TestProducer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//自行制定主题topic
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    int use_ccb = 0;
    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) 
     {
          std::cerr << errstr << std::endl;
          exit(1);
     }
    /*
    * Set configuration properties
    */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);
    ExampleDeliveryReportCb ex_dr_cb;
    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);
    /*
     * Create producer using accumulated global configuration.
     */
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) 
    {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;
    /*
     * Create topic handle.
     */
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }
    /*
     * Read messages from stdin and produce to broker.
     */
    for (std::string line; run && std::getline(std::cin, line);)
    {
        if (line.empty())
        {
            producer->poll(0);
            continue;
        }
      /*
       * Produce message
        // 1. topic
        // 2. partition
        // 3. flags
        // 4. payload
        // 5. payload len
        // 6. std::string key
        // 7. msg_opaque? NULL
       */
      std::string key=line.substr(0,5);//根据line前5个字符串作为key值
      // int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());
      // std::cout<<"hash="<<a<<std::endl;
      RdKafka::ErrorCode resp = producer->produce(topic, partition,
          RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
          const_cast<char *>(line.c_str()), line.size(),
          key.c_str(), key.size(), NULL);//这里可以设计key值,因为会根据key值放在对应的partition
        if (resp != RdKafka::ERR_NO_ERROR)
            std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;
        else
            std::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;
        producer->poll(0);//对于socket进行读写操作。poll方法才是做实际的IO操作的。return the number of events served
    }
    //
    run = true;
    while (run && producer->outq_len() > 0) {
      std::cerr << "Waiting for " << producer->outq_len() << std::endl;
      producer->poll(1000);
    }
    delete topic;
    delete producer;
}

Consumer端代码:

void msg_consume(RdKafka::Message* message, void* opaque)
{
    switch (message->err())
    {
        case RdKafka::ERR__TIMED_OUT:
            break;
        case RdKafka::ERR_NO_ERROR:
          /* Real message */
            std::cout << "Read msg at offset " << message->offset() << std::endl;
            if (message->key())
            {
                std::cout << "Key: " << *message->key() << std::endl;
            }
            printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));
            break;
        case RdKafka::ERR__PARTITION_EOF:
              /* Last message */
              if (exit_eof)
              {
                  run = false;
              }
              break;
        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
            std::cerr << "Consume failed: " << message->errstr() << std::endl;
            run = false;
            break;
    default:
        /* Errors */
        std::cerr << "Consume failed: " << message->errstr() << std::endl;
        run = false;
    }
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
    public:
        void consume_cb (RdKafka::Message &msg, void *opaque)
        {
            msg_consume(&msg, opaque);
        }
};
void TestConsumer()
{
    std::string brokers = "localhost";
    std::string errstr;
    std::string topic_str="helloworld_kugou_test";//helloworld_kugou
    MyHashPartitionerCb hash_partitioner;
    int32_t partition = RdKafka::Topic::PARTITION_UA;//为何不能用??在Consumer这里只能写0???无法自动吗???
    partition = 3;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump = false;
    int opt;
    int use_ccb = 0;
    //Create configuration objects
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) 
    {
        std::cerr << errstr << std::endl;
        exit(1);
    }
    /*
    * Set configuration properties
    */
    conf->set("metadata.broker.list", brokers, errstr);
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);
    ExampleDeliveryReportCb ex_dr_cb;
    /* Set delivery report callback */
    conf->set("dr_cb", &ex_dr_cb, errstr);
    /*
     * Create consumer using accumulated global configuration.
     */
    RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
    if (!consumer) 
    {
      std::cerr << "Failed to create consumer: " << errstr << std::endl;
      exit(1);
    }
    std::cout << "% Created consumer " << consumer->name() << std::endl;
    /*
     * Create topic handle.
     */
    RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
    if (!topic)
    {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }
    /*
     * Start consumer for topic+partition at start offset
     */
    RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
    if (resp != RdKafka::ERR_NO_ERROR) {
      std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
      exit(1);
    }
    ExampleConsumeCb ex_consume_cb;
    /*
     * Consume messages
     */
    while (run)
    {
        if (use_ccb)
        {
            consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);
      }
      else 
      {
          RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
          msg_consume(msg, NULL);
          delete msg;
      }
      consumer->poll(0);
    }
    /*
     * Stop consumer
     */
    consumer->stop(topic, partition);
    consumer->poll(1000);
    delete topic;
    delete consumer;
}


那么在producer端怎么根据key值获取具体是进入哪个partition的呢?是否有接口可以查看呢?这个有待补充。

相关文章
|
5月前
|
Linux 编译器 开发工具
【Linux快速入门(三)】Linux与ROS学习之编译基础(Cmake编译)
【Linux快速入门(三)】Linux与ROS学习之编译基础(Cmake编译)
347 2
|
3月前
|
自然语言处理 编译器 C语言
为什么C/C++编译腰要先完成汇编
C/C++ 编译过程中先生成汇编语言是历史、技术和实践的共同选择。历史上,汇编语言作为成熟的中间表示方式,简化了工具链;技术上,分阶段编译更高效,汇编便于调试和移植;实践中,保留汇编阶段降低了复杂度,增强了可移植性和优化能力。即使在现代编译器中,汇编仍作为重要桥梁,帮助开发者更好地理解和优化代码。
73 25
为什么C/C++编译腰要先完成汇编
|
4月前
|
Ubuntu Linux Go
golang编译成Linux可运行文件
本文介绍了如何在 Linux 上编译和运行 Golang 程序,涵盖了本地编译和交叉编译的步骤。通过这些步骤,您可以轻松地将 Golang 程序编译成适合 Linux 平台的可执行文件,并在目标服务器上运行。掌握这些技巧,可以提高开发和部署 Golang 应用的效率。
606 14
|
5月前
|
自然语言处理 编译器 Linux
告别头文件,编译效率提升 42%!C++ Modules 实战解析 | 干货推荐
本文中,阿里云智能集团开发工程师李泽政以 Alinux 为操作环境,讲解模块相比传统头文件有哪些优势,并通过若干个例子,学习如何组织一个 C++ 模块工程并使用模块封装第三方库或是改造现有的项目。
557 56
|
5月前
|
自然语言处理 编译器 Linux
|
6月前
|
Linux API 开发工具
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
ijkplayer是由B站研发的移动端播放器,基于FFmpeg 3.4,支持Android和iOS。其源码托管于GitHub,截至2024年9月15日,获得了3.24万星标和0.81万分支,尽管已停止更新6年。本文档介绍了如何在Linux环境下编译ijkplayer的so库,以便在较新的开发环境中使用。首先需安装编译工具并调整/tmp分区大小,接着下载并安装Android SDK和NDK,最后下载ijkplayer源码并编译。详细步骤包括环境准备、工具安装及库编译等。更多FFmpeg开发知识可参考相关书籍。
220 0
FFmpeg开发笔记(五十九)Linux编译ijkplayer的Android平台so库
|
5月前
|
Linux
Linux - 如何编译源码安装软件
源码编译安装通常包括三个步骤:1) `./configure` 检测平台特征和依赖项,生成 Makefile;2) `make` 编译源码,生成可执行文件;3) `make install` 将可执行文件安装到指定目录并配置环境变量。
172 0
|
1月前
|
Linux
linux命令详细说明以及案例
本文介绍了常用的 Linux 命令及其详细说明和示例,包括:`ls`(列出目录内容)、`cd`(更改目录)、`rm` 和 `mv`(删除与移动文件)、`grep`(搜索文本)、`cat`(显示文件内容)以及 `chmod`(更改文件权限)。每个命令均配有功能描述、选项说明及实际案例,帮助用户更好地掌握 Linux 命令的使用方法。
158 56
|
17天前
|
Linux 定位技术
Linux系统中的cd命令:目录切换技巧
踏过千山,越过万水,人生就是一场不断前行的旅程,总充满了未知与挑战。然而,“cd”命令如同你的旅伴,会带你穿梭在如棋盘一般的文件系统中,探索每一处未知。希望你能从“cd”命令中找到乐趣,像是掌控了一种络新妙的魔法,去向未知进发,开始你的探索之旅。
95 24
|
9天前
|
Linux
Linux命令的基本格式解析
总的来说,Linux命令的基本格式就像一个食谱,它可以指导你如何使用你的计算机。通过学习和实践,你可以成为一个真正的“计算机厨师”,创造出各种“美味”的命令。
52 15

热门文章

最新文章

下一篇
oss创建bucket