PHP SDK连接阿里云消息队列Kafka

简介: 本文在centos8环境下,基于开源客户端php-rdkafka连接阿里云消息队列Kafka。

环境配置

1、按需安装如下依赖,其中cyrus-sasl-devel.x86_64 openssl-devel为sasl使用

yum install epel-release.noarch
yum install php-devel cyrus-sasl-devel.x86_64 openssl-devel make gcc-c++.x86_64 gcc gcc-c++ autoconf automake git

图片.png

2、编译安装librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure

图片.png

3、如果包含公网开放,要特殊注意ssl及sasl相关依赖

make
make install
图片.png

4、编译安装php-rdkafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

echo 'extension=rdkafka.so' >> /etc/php.ini

图片.png

消息收发

1、Kafka控制台创建:公网/VPC实例 类型实例。
图片.png

2、setting.php


<?php
return [
    'sasl_plain_username' => 'alikafka_post-******',
    'sasl_plain_password' => '******',
    'bootstrap_servers' => "139.196.***.***:9093,139.196.***.***:9093,139.196.***.***:9093",
    'topic_name' => 'phptopic',
    'consumer_id' => 'phpgroup'
];
?>

3、发送端代码

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

4、消费端代码

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('group.id', $setting['consumer_id']);

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>
ca-cert.pem

5、收发测试

图片.png

图片.png

更多参考

PHP SDK概述
vpc-ssl

相关文章
|
2月前
|
消息中间件 存储 Serverless
【实践】快速学会使用阿里云消息队列RabbitMQ版
云消息队列 RabbitMQ 版是一款基于高可用分布式存储架构实现的 AMQP 0-9-1协议的消息产品。云消息队列 RabbitMQ 版兼容开源 RabbitMQ 客户端,解决开源各种稳定性痛点(例如消息堆积、脑裂等问题),同时具备高并发、分布式、灵活扩缩容等云消息服务优势。
106 2
|
3月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
77 9
|
4月前
|
开发工具 iOS开发 容器
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
|
4月前
|
存储 Java 开发工具
【Azure 存储服务】Java Azure Storage SDK V12使用Endpoint连接Blob Service遇见 The Azure Storage endpoint url is malformed
【Azure 存储服务】Java Azure Storage SDK V12使用Endpoint连接Blob Service遇见 The Azure Storage endpoint url is malformed
|
4月前
|
存储 API 开发工具
【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例
【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例
|
5月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现两个阿里云账号下的Kafka进行数据的互相传输
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
开发工具 iOS开发 容器
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
iOS Objective-C 应用连接Azure Storage时,若不关闭账号的匿名访问,程序能正常运行。但关闭匿名访问后,上传到容器时会出现错误:“Public access is not permitted”。解决方法是将创建容器时的公共访问类型从`AZSContainerPublicAccessTypeContainer`改为`AZSContainerPublicAccessTypeOff`,以确保通过授权请求访问。
【Azure Blob】关闭Blob 匿名访问,iOS Objective-C SDK连接Storage Account报错
|
Web App开发 JavaScript PHP
微信JS SDK PHP Demo
原文:微信JS SDK PHP Demo 微信JS-SDK PHP Demo JS接口安全域名 自定义分享接口 jsapi_ticket 分享到朋友圈 分享给朋友 分享到QQ 原文:http://www.cnblogs.com/txw1958/p/weixin-js-sdk-php-demo.html   一、JSSDK类定义 // 注意:所有的JS接口只能在公众号绑定的域名下调用,公众号开发者需要先登录微信公众平台进入“公众号设置”的“功能设置”里填写“JS接口安全域名”。
1269 0
|
Web App开发 JavaScript PHP
微信JS SDK PHP Demo
微信JS-SDK PHP Demo JS接口安全域名 自定义分享接口 jsapi_ticket 分享到朋友圈 分享给朋友 分享到QQ 原文:http://www.cnblogs.com/txw1958/p/weixin-js-sdk-php-demo.html   一、JSSDK类定义 // 注意:所有的JS接口只能在公众号绑定的域名下调用,公众号开发者需要先登录微信公众平台进入“公众号设置”的“功能设置”里填写“JS接口安全域名”。
1386 0
|
3月前
|
安全 关系型数据库 MySQL
PHP与MySQL交互:从入门到实践
【9月更文挑战第20天】在数字时代的浪潮中,掌握PHP与MySQL的互动成为了开发动态网站和应用程序的关键。本文将通过简明的语言和实例,引导你理解PHP如何与MySQL数据库进行对话,开启你的编程之旅。我们将从连接数据库开始,逐步深入到执行查询、处理结果,以及应对常见的挑战。无论你是初学者还是希望提升技能的开发者,这篇文章都将为你提供实用的知识和技巧。让我们一起探索PHP与MySQL交互的世界,解锁数据的力量!

相关产品

  • 云消息队列 Kafka 版