Kafka简介及使用PHP处理Kafka消息

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Kafka简介及使用PHP处理Kafka消息

Kafka简介及使用PHP处理Kafka消息



Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。


Kafka的特点:



以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。


高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】


支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。


分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。


消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。


同时支持离线数据处理和实时数据处理。

Kafka的架构:


kafka架构图


201812241824542 (1).png


Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。



Kafka基本概念:



Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。


Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。


Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。


Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。

Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。


Broker:缓存代理,Kafa集群中的一台或多台服务器统称为broker。


Kafka消息发送的流程:


2018122418245425 (1).png


Kafka消息发送



下面是PHP生产、消费Kafka消息的例子(假设已经配置好Kafka):


1.从zookeeper源码src/c/src安装zookeeper c client


cd zookeeper-3.4.8/src/c
./configure
make && make install


2.编译php libzookper扩展



git clone https://github.com/Timandes/libzookeeper.git
cd libzookeeper
phpize
./configure--with-libzookeeper=/usr/local/bin/cli_mt
make && makeinstall


3.编译php zookeeper扩展


git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
phpize
./configure
make && make install


4.修改php.ini配置,添加libzookeeper和php-zookeeper扩展


extension=libzookeeper.so
extension=zookeeper.so


PHP处理Kafka消息:



1.启动zookeeper和kafka


./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties


2.创建由2个partition组成的、名为testtopic的topic


kafka_2.11-0.10.0.0/bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor --partitions --topic testtopic


3.composer安装nmred/kafka-php


composer require "nmred/kafka-php"


4.producer.php代码


<php 
require_once('./vendor/autoload.php'); 
$produce=/Kafka/Produce::getInstance('localhost:2181',3000); 
$produce->setRequireAck(-1); $topicName='testtopic';
//获取到topic下可用的partitions
$partitions=$produce->getAvailablePartitions($topicName);
$partitionCount=count($partitions); 
$count=1;//可以处理的消费者数量(可以理解为server数量)
while(true){    $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));     
//发送消息到不同的partition   
 $partitionId=$count%$partitionCount;    
$produce->setMessages('testtopic',$partitionId,array($message));   
 $result=$produce->send();    
var_dump($result);     
$count++;   
 echo"producer sleeping/n";   
 sleep(1);
}



5、consumer.php代码


<?php 
require_once('./vendor/autoload.php'); 
//获取需要处理的partitionId
$partitionId = isset($argv[1]) ? intval($argv[1]) :0; 
$consumer =/Kafka/Consumer::getInstance('localhost:2181'); 
$consumer->setGroup('test-consumer-group');
$consumer->setPartition('testtopic', $partitionId);
$consumer->setFromOffset(true);
$consumer->setMaxBytes(102400); 
while(true){    
$topic = $consumer->fetch();     
foreach ($topic as $topicName => $partition{        
foreach ($partition as $partId => $messageSet{            
foreach ($messageSet as $message){                
var_dump($message);           
}        
}    
}    
echo"consumer sleeping/n";   
sleep(1);
}


6、在3个终端界面分别运行


7、两个consumer脚本依次收到producer发送的消息


20181224183829554.png


 


目录
相关文章
|
6月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
262 0
|
5月前
|
消息中间件 存储 Kafka
go语言并发实战——日志收集系统(二) Kafka简介
go语言并发实战——日志收集系统(二) Kafka简介
105 1
|
消息中间件 存储 分布式计算
消息中间件系列教程(19) -Kafka-简介
消息中间件系列教程(19) -Kafka-简介
113 0
|
消息中间件 NoSQL 关系型数据库
Linux安装 OpenResty、Nginx、PHP、Mysql、Redis、Lua、Node、Golang、MongoDB、Kafka等
Linux安装 OpenResty、Nginx、PHP、Mysql、Redis、Lua、Node、Golang、MongoDB、Kafka等
173 0
|
6月前
|
NoSQL 关系型数据库 应用服务中间件
Linux安装 OpenResty、Nginx、PHP、Mysql、Redis、Lua、Node、Golang、MongoDB、Kafka等
Linux安装 OpenResty、Nginx、PHP、Mysql、Redis、Lua、Node、Golang、MongoDB、Kafka等
170 0
|
Java 数据安全/隐私保护
Kafka-Eagle安装及使用简介
Kafka-Eagle安装及使用简介
861 0
|
消息中间件 设计模式 Kubernetes
【微服务安全】使用 Spring Boot、Kafka、Vault 和 Kubernetes 保护微服务间通信——第 1 部分:简介和架构
【微服务安全】使用 Spring Boot、Kafka、Vault 和 Kubernetes 保护微服务间通信——第 1 部分:简介和架构
|
消息中间件 存储 缓存
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
Kafka快速入门(生产者)同步异步发送、分区、消息精确一次发送、幂等性、事务
|
消息中间件 监控 Kafka
发送kafka消息的shell脚本
开发和学习时需要造一些kafka消息,于是写了段脚本实现,在这里记录备忘,后面会常用到
394 0
发送kafka消息的shell脚本
|
消息中间件 存储 负载均衡
kafka学习 之 简介
kafka学习 之 简介
198 0
kafka学习 之 简介