假设rabbitmq配置了集群,且客户端连接rabbitmq-server通过lvs实现HA但一般情况下不建议做LB。在分布式系统的环境下,由于节点的非预知性,使用spring amqp模板进行配置不足以灵活到满足弹性扩展的需求,因此,更加方便的方式是通过rabbitmq原生的java client进行订阅和发布。在我们的场景中,某些节点需要同时是发布端和订阅端以便做到弹性扩展,无需额外的配置。以fanout类型为例,如下所示:
发布端:
/**
* @Title: Send.java
* @Package com.cyl.rabbitmq
* @Description: TODO(用一句话描述该文件做什么)
* @author zjhua@hundsun.com
* @date 2016年4月25日 下午12:52:59
* @version V1.0
*/
package com.cyl.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author zjhua
*
*/
public class Send {
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection;
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout_random", "fanout");
String message = "Hello World ";
for(int i=0;i<10000;i++) {
channel.basicPublish("fanout_random", "", null, (message + i).getBytes());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
订阅端:
/**
* @Title: Reqv.java
* @Package com.cyl.rabbitmq
* @Description: TODO(用一句话描述该文件做什么)
* @author zjhua@hundsun.com
* @date 2016年4月25日 下午12:56:33
* @version V1.0
*/
package com.cyl.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.*;
/**
* @author zjhua
*
*/
public class Reqv {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout_random", "fanout");
String queueName = channel.queueDeclare().getQueue(); --对于某些场景,比如缓存同步,使用exclusive/auto-delete的queue会比较合适
channel.queueBind(queueName, "fanout_random", "");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
如果要同时作为订阅端、发布端,只要在容器启动时配置监听事件,其中包含订阅端逻辑即可。发布端作为基础服务供业务子系统使用。