mq新api订阅实现

mq连接

public class MessageUtils {

//获取mq的连接
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置服务地址
factory.setHost("127.0.0.1");

//设置端口
factory.setPort(5672);

//设置路径
factory.setVirtualHost("/user_num1");

//设置用户名
factory.setUsername("user1");

//设置密码
factory.setPassword("user1");

return factory.newConnection();
}
}

定义消息生产者

public class MessageProduction {

private static final String QUER_NAME = "test_simple_queue";

public static void main(String[] args) throws IOException, TimeoutException{
//获取一个连接
Connection connection = MessageUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明交换机
//fanout 分发
channel.exchangeDeclare(QUER_NAME,"fanout");

//发送消息
String msg = "liujiang ";
channel.basicPublish(QUER_NAME,"",null,msg.getBytes());

System.out.println("send"+msg);
//关闭通道
channel.close();
connection.close();
}
}

定义消息消费者

public class MessageConsumer {

private static final String QUER_NAME1 = "test_simple_queue2";
private static final String QUER_NAME = "test_simple_queue";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = MessageUtils.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUER_NAME1,false,false,false,null);
//绑定队列到交换机转发器
channel.queueBind(QUER_NAME1,QUER_NAME,"");
//保证每次之分发一个
channel.basicQos(1);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body, "UTF-8");
System.out.println("消费者[1] msg:"+message.getBytes());

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUER_NAME1, autoAck, consumer);
}

}

 

 

mq新api订阅实现

上一篇:有关oracle中scott用户下表的基本查询(二)


下一篇:windowsTerminal配置bash