【linux-centos】php-rdkafka扩展安装

安装php-rdkafka 扩展
***先安装librdkafka
>>>源码方式安装
  git clone https://github.com/edenhill/librdkafka
  librdkafka-master.zip
  unzip librdkafka-master.zip
  cd librdkafka-master
  ./configure make make install

>>>yum方式安装
  yum install librdkafka-devel

  yum install libtool
***下载扩展包:

  git clone https://github.com/arnaud-lb/php-rdkafka.git
  php-rdkafka-5.x.zip
  unzip php-rdkafka-5.x.zip
  cd php-rdkafka-5.x
  phpize
  ./configure
  make && make install

可能会报错,configure: error: Please reinstall the rdkafka distribution
说明没安装librdkafka

vim /etc/php.ini
extension=rdkafka.so

php -m | grep rdkafka 查看是否安装成功

***使用kafka扩展

生产者:
$objRdKafka = new RdKafka\Producer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("test");
// 从终端接收输入
$oInputHandler = fopen(‘php://stdin‘, ‘r‘);
while (true) {
echo "\nEnter messages:\n";
$sMsg = trim(fgets($oInputHandler));
// 空消息意味着退出
if (empty($sMsg)) {
break;
}
// 发送消息
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
}

echo "done\n";

消费者:
$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("test");
/**
* consumeStart
* 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息
* 第二个参数标识从什么位置开始拉取消息,可选值为
* RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息
* RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息
* RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样
*/
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
// 第一个参数是分区,第二个参数是超时时间
$oMsg = $oObjTopic->consume(0, 1000);

// 没拉取到消息时,返回NULL
if (!$oMsg) {
usleep(10000);
continue;
}

if ($oMsg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $oMsg->payload, "\n";
}
}

先启动生产,再启动消费会造成数据丢失,不在再队列中存着


【linux-centos】php-rdkafka扩展安装

上一篇:javaSE 基础笔记之常见类的使用


下一篇:.NET架构小技巧(2)——访问修饰符正确姿势