Send类
package topics; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import utils.ConnectionUtil; public class Send { private static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic");//类型主题 String msg = "hellow direct"; channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, msg.getBytes()); System.out.println("send=====>" + msg); channel.close(); connection.close(); } }
Recv1类
package topics; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class Recv1 { private static String EXCHANGE_NAME = "test_exchange_topic"; private static String QUEUE_NAME = "test_queue_topic_a"; public static void main(String[] args) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //从连接中获取一个通道 final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"goods.add");//订阅goods.add channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"goods.update");//订阅goods.update,其它类型订阅不到 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { //当消息到达时执行回调方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[Receive1]:" + message); try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
Recv2类
package topics; import com.rabbitmq.client.*; import utils.ConnectionUtil; import java.io.IOException; public class Recv2 { private static String EXCHANGE_NAME = "test_exchange_topic"; private static String QUEUE_NAME = "test_queue_topic_b"; public static void main(String[] args) throws Exception { //获取连接 Connection connection = ConnectionUtil.getConnection(); //从连接中获取一个通道 final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"goods.#");//通配订阅goods.xx下所有的消息 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { //当消息到达时执行回调方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[Receive2]:" + message); try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }