PHP SDK连接阿里云消息队列Kafka

环境配置

1、按需安装如下依赖,其中cyrus-sasl-devel.x86_64 openssl-devel为sasl使用

yum install epel-release.noarch
yum install php-devel cyrus-sasl-devel.x86_64 openssl-devel make gcc-c++.x86_64 gcc gcc-c++ autoconf automake git

PHP SDK连接阿里云消息队列Kafka

2、编译安装librdkafka

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure

PHP SDK连接阿里云消息队列Kafka

3、如果包含公网开放,要特殊注意ssl及sasl相关依赖

make
make install
PHP SDK连接阿里云消息队列Kafka

4、编译安装php-rdkafka

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

echo 'extension=rdkafka.so' >> /etc/php.ini

PHP SDK连接阿里云消息队列Kafka

消息收发

1、Kafka控制台创建:公网/VPC实例 类型实例。
PHP SDK连接阿里云消息队列Kafka

2、setting.php


<?php
return [
    'sasl_plain_username' => 'alikafka_post-******',
    'sasl_plain_password' => '******',
    'bootstrap_servers' => "139.196.***.***:9093,139.196.***.***:9093,139.196.***.***:9093",
    'topic_name' => 'phptopic',
    'consumer_id' => 'phpgroup'
];
?>

3、发送端代码

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

4、消费端代码

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('enable.ssl.certificate.verification','false');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('group.id', $setting['consumer_id']);

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>
ca-cert.pem

5、收发测试

PHP SDK连接阿里云消息队列Kafka

PHP SDK连接阿里云消息队列Kafka

更多参考

PHP SDK概述
vpc-ssl

上一篇:如何知道Hibernate已经成功运用了二级缓存EhCache


下一篇:安装Oracle JDK 7.0与8.0 for Mac OS X后Eclipse启动报错的解决之道