public class MessageConsumer {
private static final String QUER_NAME1 = "test_simple_queue2"; private static final String QUER_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //获取连接 Connection connection = MessageUtils.getConnection(); //创建频道 final Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUER_NAME1,false,false,false,null); //绑定队列到交换机转发器 channel.queueBind(QUER_NAME1,QUER_NAME,""); //保证每次之分发一个 channel.basicQos(1); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ //获取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body, "UTF-8"); System.out.println("消费者[1] msg:"+message.getBytes());
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; //自动应答 boolean autoAck = false; //监听队列 channel.basicConsume(QUER_NAME1, autoAck, consumer); }
}
|