消息生产者:
import org.apache.qpid.amqp_1_0.jms.impl.*; import javax.jms.*; /** * 生产者<br> * */ class Publisher { public static void main(String[] args) throws Exception { String user = "admin"; String password = "admin"; String host = "localhost"; int port = 5672; ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password); Connection connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("vinces"); MessageProducer producer = session.createProducer(dest); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage msg = session.createTextMessage("测试的ActiveMQ消息 "); // 生产一个消息 producer.send(msg); // 提交 session.commit(); System.out.println("发送成功!"); // 关闭连接 connection.close(); System.exit(0); } }
消息消费者:
import org.apache.qpid.amqp_1_0.jms.impl.*; import javax.jms.*; /** * 消费者 <br> * */ class Listener { public static void main(String []args) throws JMSException { String user = "admin"; String password = "admin"; String host = "localhost"; int port = 5672; ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password); Connection connection = factory.createConnection(user, password); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("vinces"); MessageConsumer consumer = session.createConsumer(dest); System.out.println("Waiting for messages..."); while(true) { TextMessage message = (TextMessage) consumer.receive(1000); if(message != null){ System.out.println(message.getText()); if("exit".equals(message.getText())) { System.out.println("exit"); connection.close(); System.exit(1); } } } } }