php操作rabbitmq

<?php

header('Content-Type:text/html;charset=utf8;');
$time = 10;

$params = array(
    'exchangeName' => 'test_cache_exchange'."_".$time,
    'queueName' => 'test_cache_queue'."_".$time,
    'routeKey' => 'test_cache_route'."_".$time,
);

$connectConfig = array(
    'host' => '192.168.33.60',
    'port' => 5672,
    'login' => 'admin',
    'password' => 'admin',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));
//
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 记录日志
        echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 记录日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);//持久化
    $exchange->setName($params['exchangeName']?:'');
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']?:'');
    $queue->setFlags(AMQP_DURABLE);

    // 和普通生产者区别 在这 下面是过期时间和转发到的路由
    $queue->setArguments(array(
        'x-dead-letter-exchange' => 'delay_exchange',
        'x-dead-letter-routing-key' => 'delay_route',
        'x-message-ttl' => 20000,
    ));
    $queue->declareQueue();

    //绑定
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {

}


//$num = mt_rand(100, 500);
$num = 1;

//生成消息
$exchange->publish(date("Y-m-d H:i:s"), $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));

 

上一篇:org.springframework.amqp.AmqpIOException: java.io.IOException


下一篇:java-AMQP Spring Integration错误处理