helloworld项目是直连模型,(1对1,1个生产者和1个消费者)
durable参数说明:
是否持久化队列,重启rabbitmq后,durable=false会丢失队列;druable=true,重启rabbitmq,消息会丢失
消费者的绑定参数与生产者要一致。都为druable=true,autoDelete=false,exclusive=false,arguments=null,queue。
workqueue模型,(1对多):
RabbitMQUtils.java
package com.gavin.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtils { private static ConnectionFactory connectionFactory; static{ connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); } public static Connection getConnection() throws IOException, TimeoutException { return connectionFactory.newConnection(); } public static void close(Channel channel,Connection connection) throws IOException, TimeoutException { if (channel != null) channel.close(); if(connection != null) connection.close(); } }
生产者:
package com.gavin.mq.workqueue; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作队列模型:让多个消费绑定到一个队列,共同消费队列中的消息 * 多个消费者平均分配 */ public class WorkProvider { @Test public void testSendMessage() throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); for(int i = 0;i<10;i++){ channel.basicPublish("","work", MessageProperties.PERSISTENT_BASIC,(i+"hello work queue").getBytes()); } RabbitMQUtils.close(channel,connection); } }
消费者:
package com.gavin.mq.workqueue; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
消费者2:
package com.gavin.mq.workqueue; import com.gavin.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class WorkConsumer2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumer2:"+new String(body)); } }); } }