1、首先安装kafka扩展
#安装librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2 $ git clone https://github.com/edenhill/librdkafka.git $ ./configure $ make $ sudo make install #安装 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1 $ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ phpize $ ./configure $ make all -j 5 $ sudo make install
2、生产者代码示例
rcf=newRdKafka\Conf();rcf->set('group.id', 'test'); //topicname
cf=newRdKafka\TopicConf();cf->set('offset.store.method', 'broker');
cf−>set(′auto.offset.reset′,′smallest′);rk = new RdKafka\Producer(rcf);rk->setLogLevel(LOG_DEBUG);
rk−>addBrokers("127.0.0.1");//brokeraddrtopic = rk−>newTopic("test",cf); //topicname
for(i=0;i < 10; i++) {topic->produce(0,0,'test' . $i);
}
3、消费者代码示例
rcf=newRdKafka\Conf();rcf->set('group.id', 'test');
rcf−>set(′broker.version.fallback′,′0.8.2′);//brokername,kafkaversioncf = new RdKafka\TopicConf();
cf−>set(′auto.offset.reset′,′smallest′);cf->set('auto.commit.enable', true);
rk=newRdKafka\Consumer(rcf);
rk−>setLogLevel(LOGDEBUG);rk->addBrokers("127.0.0.1"); //brokeraddr
topic=rk->newTopic("test", cf);//topicname,topicobjecttopic->consumeStart(0,10); //partition,offset
msg=topic->consume(0, 1000); //partition,timeout
var_dump($msg);