Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍(1)

简介: Linux下kafka之C/C++客户端库librdkafka的编译,安装以及函数介绍

https://github.com/edenhill/librdkafka

librdkafka是一个开源的Kafka客户端C/C++实现,提供了Kafka生产者、消费者接口。


一、安装librdkafka

首先在github上下载librdkafka源码,解压后进行编译;

cd librdkafka-master

chmod 777 configure lds-gen.py

./configure

make

make install

在make的时候,如果是64位Linux会报下面这个异常

/bin/ld:librdkafka.lds:1: syntax error in VERSION script

只要Makefile.config第46行里面的WITH_LDS=y这一行注释掉就不会报错了。


注释掉:#WITH_LDS=y,然后再make


最终头文件和库文件会分别安装在


/usr/local/include/librdkafka

/usr/local/lib




二、调用librdkafka库自主编程


编译用户自己的应用程序,编译选项要加上-lrdkafka -lz -lpthread -lrt这些选项。


例如,我使用QtCreator之qmake模式,.pro文件如下:


QMAKE_LFLAGS += -lrdkafka -lrdkafka++ -lz -lpthread -lrt

#-lrdkafka等价于 LIBS += /usr/local/lib/librdkafka.so

编译通过,但是运行时会报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

此时需要在/etc/ld.so.conf中加入librdkafka.so所在的目录:/usr/local/lib/

然后在终端执行命令,使之生效:

[root@localhost etc]# ldconfig


注意,/usr/local/lib/每次有库文件更新,都需要终端重新运行一次ldconfig这条命令。



三、启动kafka


详情参考我的文章:我个人的kafka_2.12-1.0.0实践:安装与测试(★firecat推荐★)




四、用法介绍

源文件来自/librdkafka-master/examples/rdkafka_example.cpp和rdkafka_consumer_example.cpp


Producer的使用方法:


创建kafka客户端配置占位符:

conf = rd_kafka_conf_new();即创建一个配置对象(rd_kafka_conf_t)。并通过rd_kafka_conf_set进行brokers的配置。


设置信息的回调:

用以反馈信息发送的成败。通过rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);实现。


创建producer实例:

1)初始化:

应用程序需要初始化一个顶层对象(rd_kafka_t)的基础容器,用于全局配置和共享状态。

通过调用rd_kafka_new()创建。创建之后,该实例就占有了conf对象,所以conf对象们在rd_kafka_new()调用之后是不能被再次使用的,而且在rd_kafka_new()调用之后也不需要释放配置资源的。

2)创建topic:

创建的topi对象是可以复用的(producer的实例化对象(rd_kafka_t)也是允许复用的,所以这两者就没有必要频繁创建)

实例化一个或多个 topic(rd_kafka_topic_t)用于生产或消费。

topic 对象保存 topic 级别的属性,并且维护一个映射,

该映射保存所有可用 partition 和他们的领导 broker 。

通过调用rd_kafka_topic_new()创建(rd_kafka_topic_new(rk, topic, NULL);)。

注:rd_kafka_t 和 rd_kafka_topic_t都源于可选的配置 API。

不使用该 API 将导致 librdkafka 使用列在文档CONFIGURATION.md中的默认配置。


3)Producer API:

通过调用RD_KAFKA_PRODUCER设置一个或多个rd_kafka_topic_t对象,就可以准备好接收消息,并组装和发送到 broker。

rd_kafka_produce()函数接受如下参数:

rkt : 待生产的topic,之前通过rd_kafka_topic_new()生成

partition : 生产的 partition。如果设置为RD_KAFKA_PARTITION_UA(未赋值的),则会根据builtin partitioner去选择一个确定 partition。kafka会回调partitioner进行均衡选取,partitioner方法需要自己实现。可以轮询或者传入key进行hash。未实现则采用默认的随机方法rd_kafka_msg_partitioner_random随机选择。

可以尝试通过partitioner来设计partition的取值。

msgflags : 0 或下面的值:

RD_KAFKA_MSG_F_COPY 表示librdkafka 在信息发送前立即从 payload 做一份拷贝。如果 payload 是不稳定存储,如栈,需要使用这个参数。这是以防消息主体所在的缓存不是长久使用的,才预先将信息进行拷贝。

RD_KAFKA_MSG_F_FREE 表示当 payload 使用完后,让 librdkafka 使用free(3)释放。 就是在使用完消息后,将释放消息缓存。

这两个标志互斥,如果都不设置,payload 既不会被拷贝也不会被 librdkafka 释放。

如果RD_KAFKA_MSG_F_COPY标志不设置,就不会有数据拷贝,librdkafka 将占用 payload 指针(消息主体)直到消息被发送或失败。librdkafka 处理完消息后,会调用发送报告回调函数,让应用程序重新获取 payload 的所有权。

如果设置了RD_KAFKA_MSG_F_FREE,应用程序就不要在发送报告回调函数中释放 payload。

payload,len : 消息 payload(message payload,即值),消息长度

key,keylen : 可选的消息键及其长度,用于分区。将会用于 topic 分区回调函数,如果有,会附加到消息中发送给 broker。

msg_opaque : 可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。


rd_kafka_produce() 是一个非阻塞 API,该函数会将消息塞入一个内部队列并立即返回。

如果队列中的消息数超过queue.buffering.max.messages属性配置的值,rd_kafka_produce()通过返回 -1,并将errno设置为ENOBUFS这样的错误码来反馈错误。

提示: 见 examples/rdkafka_performance.c 获取生产者的使用。


Consumer的使用方法:


consumer API要比producer API多一些状态。 在使用RD_KAFKA_CONSUMER类型(调用rd_kafka_new时设置的函数参数)创建rd_kafka_t 对象,再通过调用rd_kafka_brokers_add对上述new出来的Kafka handle(rk)进行broker的添加(rd_kafka_brokers_add(rk, brokers)),

然后创建rd_kakfa_topic_t对象之后,


rd_kafka_query_watermark_offsets


创建topic:

rtk = rd_kafka_topic_new(rk, topic, topic_conf)


开始消费:

调用rd_kafka_consumer_start()函数(rd_kafka_consume_start(rkt, partition, start_offset))启动对给定partition的consumer。

调用rd_kafka_consumer_start需要的参数如下:

rkt : 要消费的 topic ,之前通过rd_kafka_topic_new()创建。

partition : 要消费的 partition。

offset : 消费开始的消息偏移量。可以是绝对的值或两种特殊的偏移量:

RD_KAFKA_OFFSET_BEGINNING 从该 partition 的队列的最开始消费(最早的消息)。

RD_KAFKA_OFFSET_END 从该 partition 产生的下一个消息开始消费。

RD_KAFKA_OFFSET_STORED 使用偏移量存储。


当一个 topic+partition 消费者被启动,librdkafka 不断尝试从 broker 批量获取消息来保持本地队列有queued.min.messages数量的消息。

本地消息队列通过 3 个不同的消费 API 向应用程序提供服务:


   rd_kafka_consume() - 消费一个消息

   rd_kafka_consume_batch() - 消费一个或多个消息

   rd_kafka_consume_callback() - 消费本地队列中的所有消息,且每一个都调用回调函数

这三个 API 的性能按照升序排列,rd_kafka_consume()最慢,rd_kafka_consume_callback()最快。不同的类型满足不同的应用需要。

被上述函数消费的消息返回rd_kafka_message_t类型。

rd_kafka_message_t的成员:


* err - 返回给应用程序的错误信号。如果该值不是零,payload字段应该是一个错误的消息,err是一个错误码(rd_kafka_resp_err_t)。

* rkt,partition - 消息的 topic 和 partition 或错误。

* payload,len - 消息的数据或错误的消息 (err!=0)。

* key,key_len - 可选的消息键,生产者指定。

* offset - 消息偏移量。

不管是payload和key的内存,还是整个消息,都由 librdkafka 所拥有,且在rd_kafka_message_destroy()被调用后不要使用。

librdkafka 为了避免消息集的多余拷贝,会为所有从内存缓存中接收的消息共享同一个消息集,这意味着如果应用程序保留单个rd_kafka_message_t,将会阻止内存释放并用于同一个消息集的其他消息。

当应用程序从一个 topic+partition中消费完消息,应该调用rd_kafka_consume_stop()来结束消费。该函数同时会清空当前本地队列中的所有消息。

提示: 见 examples/rdkafka_performance.c 获取消费者的使用。


在Kafka broker中server.properties文件配置(参数log.dirs=/data2/logs/kafka/)使得写入到消息队列中的topic在该目录下对分区的形式进行存储。每个分区partition下是由segment file组成,而segment file包括2大部分:分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。

segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。


具体示例:


本文所采用的是cpp方式,和上述介绍的只是函数使用上的不同,业务逻辑是一样的。

在producer过程中直接是使用PARTITION_UA 但是在消费的时候,不能够指定partition值为PARTITION_UA因为该值其实是-1,对于Consumer端来说,是无意义的。根据源码可以知道当不指定partitioner的时候,其实是有一个默认的partitioner,就是Consistent-Random partitioner所谓的一致性随机partitioner。一致性hash对关键字进行map映射之后到一个特定的partition。

函数原型:


rd_kafka_msg_partitioner_consistent_random (

          const rd_kafka_topic_t *rkt,

          const void *key, size_t keylen,

          int32_t partition_cnt,

          void *opaque, void *msg_opaque);

PARTITION_UA其实是Unassigned partition的意思,即是未赋值的分区。RD_KAFKA_PARTITION_UA (unassigned)其实是自动采用topic下的partitioner函数,当然也可以直接采用固定的值。

在配置文件config/server.properties中是可以设置partition的数量num.partitions。


相关文章
|
8月前
|
消息中间件 Kafka Linux
Linux下安装Kafka 3.9.1
本文介绍Kafka 3.9.1版本的安装与配置,包括通过ZooKeeper或KRaft模式启动Kafka。涵盖环境变量设置、日志路径修改、集群UUID生成、存储格式化及服务启停操作,适用于Linux环境下的部署实践。
1124 0
|
存储 Linux C语言
Linux C/C++之IO多路复用(aio)
这篇文章介绍了Linux中IO多路复用技术epoll和异步IO技术aio的区别、执行过程、编程模型以及具体的编程实现方式。
882 1
Linux C/C++之IO多路复用(aio)
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
456 16
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3576 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
消息中间件 Kafka Docker
docker compose 安装 kafka
通过本文的步骤,您可以快速在本地使用 Docker Compose 安装并配置 Kafka 和 Zookeeper。Docker Compose 简化了多容器应用的管理,方便快速搭建和测试分布式系统。
2202 2
|
Ubuntu Linux 编译器
Linux/Ubuntu下使用VS Code配置C/C++项目环境调用OpenCV
通过以上步骤,您已经成功在Ubuntu系统下的VS Code中配置了C/C++项目环境,并能够调用OpenCV库进行开发。请确保每一步都按照您的系统实际情况进行适当调整。
3205 3
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
388 0
Linux C/C++之线程基础
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
编译器 C++ 开发者
【C++篇】深度解析类与对象(下)
在上一篇博客中,我们学习了C++的基础类与对象概念,包括类的定义、对象的使用和构造函数的作用。在这一篇,我们将深入探讨C++类的一些重要特性,如构造函数的高级用法、类型转换、static成员、友元、内部类、匿名对象,以及对象拷贝优化等。这些内容可以帮助你更好地理解和应用面向对象编程的核心理念,提升代码的健壮性、灵活性和可维护性。
|
编译器 C++ 容器
【c++11】c++11新特性(上)(列表初始化、右值引用和移动语义、类的新默认成员函数、lambda表达式)
C++11为C++带来了革命性变化,引入了列表初始化、右值引用、移动语义、类的新默认成员函数和lambda表达式等特性。列表初始化统一了对象初始化方式,initializer_list简化了容器多元素初始化;右值引用和移动语义优化了资源管理,减少拷贝开销;类新增移动构造和移动赋值函数提升性能;lambda表达式提供匿名函数对象,增强代码简洁性和灵活性。这些特性共同推动了现代C++编程的发展,提升了开发效率与程序性能。
538 12