PHP SDK连接阿里云消息队列Kafka

简介: 本文在centos8环境下,基于开源客户端php-rdkafka连接阿里云消息队列Kafka。

环境配置

1、按需安装如下依赖,其中cyrus-sasl-devel.x86_64 openssl-devel为sasl使用

yum install epel-release.noarch
yum install php-devel cyrus-sasl-devel.x86_64 openssl-devel make gcc-c++.x86_64 gcc gcc-c++ autoconf automake git

图片.png

2、编译安装librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure

图片.png

3、如果包含公网开放,要特殊注意ssl及sasl相关依赖

make
make install
图片.png

4、编译安装php-rdkafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

echo 'extension=rdkafka.so' >> /etc/php.ini

图片.png

消息收发

1、Kafka控制台创建:公网/VPC实例 类型实例。
图片.png

2、setting.php


<?php
return [
    'sasl_plain_username' => 'alikafka_post-******',
    'sasl_plain_password' => '******',
    'bootstrap_servers' => "139.196.***.***:9093,139.196.***.***:9093,139.196.***.***:9093",
    'topic_name' => 'phptopic',
    'consumer_id' => 'phpgroup'
];
?>

3、发送端代码

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

4、消费端代码

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('group.id', $setting['consumer_id']);

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>
ca-cert.pem

5、收发测试

图片.png

图片.png

更多参考

PHP SDK概述
vpc-ssl

相关文章
|
21天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
48 2
|
21天前
|
消息中间件 监控 大数据
Kafka消息队列架构与应用场景探讨:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Kafka的消息队列架构,包括Broker、Producer、Consumer、Topic和Partition等核心概念,以及消息生产和消费流程。此外,还介绍了Kafka在微服务、实时数据处理、数据管道和数据仓库等场景的应用。针对面试,文章解析了Kafka与传统消息队列的区别、实际项目挑战及解决方案,并展望了Kafka的未来发展趋势。附带Java Producer和Consumer的代码示例,帮助读者巩固技术理解,为面试做好准备。
39 0
|
2天前
|
消息中间件 存储 缓存
消息队列之 MetaQ 和 Kafka 哪个更香!
本篇文章首先介绍了MetaQ消息队列,然后介绍了作者对MetaQ和Kafka这两个消息队列的理解。
|
5天前
|
存储 运维 5G
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
数据是 5G 全连接工厂的核心要素,为支持全方位的数据收集、存储、分析等工作的高效进行,联通 5G 全连接工厂从典型的 Lambda 架构演进为 All in [Apache Doris](https://c.d4t.cn/vwDf8R) 的实时/离线一体化架构,并凭借 Doris 联邦查询能力打造统一查询网关,数据处理及查询链路大幅简化,为联通 5G 全连接工厂带来数据时效性、查询响应、存储成本、开发效率全方位的提升。
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
|
10天前
|
消息中间件 弹性计算 物联网
【阿里云弹性计算】阿里云ECS在IoT领域的应用:支撑大规模设备连接与数据处理
【5月更文挑战第26天】阿里云ECS是弹性计算服务,支持IoT设备的连接与数据处理。通过MQTT协议实现设备快速接入,配合消息队列处理异构实时数据。ECS可用于部署数据处理工具、应用服务,如智能家居控制系统,通过弹性伸缩适应负载变化。结合阿里云其他服务,ECS为IoT提供完整解决方案,助力企业数字化转型。
20 0
|
21天前
|
消息中间件 存储 传感器
Kafka消息队列原理及应用详解
【5月更文挑战第6天】Apache Kafka是高性能的分布式消息队列,常用于实时数据管道和流应用。它提供高性能、持久化、分布式和可伸缩的消息处理,支持解耦、异步通信和流量控制。Kafka的核心概念包括Broker、Topic、Partition、Producer、Consumer和Consumer Group。其特点是高吞吐、低延迟、数据持久化、分布式架构和容错性。常见应用包括实时数据流处理、日志收集、消息传递和系统间数据交换。
|
21天前
|
弹性计算 运维 Serverless
Serverless 应用引擎产品使用之在阿里函数计算中,使用阿里云API或SDK从函数计算调用ECS实例的服务如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
21天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
21天前
|
消息中间件 大数据 Kafka
Kafka与大数据:消息队列在大数据架构中的关键角色
【4月更文挑战第7天】Apache Kafka是高性能的分布式消息队列,常用于大数据架构,作为实时数据管道汇聚各类数据,并确保数据有序传递。它同时也是数据分发枢纽,支持多消费者订阅,简化系统集成。Kafka作为流处理平台的一部分,允许实时数据处理,满足实时业务需求。在数据湖建设中,它是数据入湖的关键,负责数据汇集与整理。此外,Kafka提供弹性伸缩和容错保障,适用于微服务间的通信,并在数据治理与审计中发挥作用。总之,Kafka是现代大数据体系中的重要基础设施,助力企业高效利用数据。
70 1
|
21天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
25 2

热门文章

最新文章

相关产品

  • 云消息队列 Kafka 版