今天无聊写段代码。。学习一下activemq,简单封装了一下activemq 的topic api。跟jdbc很类似
主要代码:
import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; /*本工具封装了*/ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSTopic { TopicConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 TopicConnection connection = null; //用来发布的会话 TopicSession proSession = null; //2一个订阅会话 TopicSession conSession = null; //主题发布者 MessageProducer producer=null; //主题 MessageConsumer consumer=null; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 //默认构造函数,默认的连接activemq,可以写多个构造函数 public JMSTopic() { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection= connectionFactory.createTopicConnection(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.start(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //此处先固定消息为String类型 public void writeMessage(String t,String message ) { try { proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producer=proSession.createProducer(proSession.createTopic(t)); //使用message构造TextMessage TextMessage text=proSession.createTextMessage(); text.setText(message); producer.send(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } //创建发布会话应该是可以配置的,此处先固定 } public void writeMessage(String t,Object o ) { try { proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producer=proSession.createProducer(proSession.createTopic(t)); //使用message构造TextMessage ObjectMessage text=proSession.createObjectMessage(); text.setObject((Serializable) o); producer.send(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } //创建发布会话应该是可以配置的,此处先固定 } //使用某个Message监听器来监听某个Topic public void receiveMsg(String c,MessageListener ml) { try { conSession=connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); Topic t=conSession.createTopic(c); consumer=conSession.createConsumer(t); //设置过来的监视器 consumer.setMessageListener(ml); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2.测试,发送的消息是对象
a.一个序列化的Stduent 对象
1 package ch02.chat; 2 3 import java.io.Serializable; 4 5 public class Student implements Serializable { 6 private int age; 7 private String name; 8 public Student(int age,String name) 9 { 10 this.age=age; 11 this.name=name; 12 13 14 } 15 public String toString() 16 { 17 return "age ="+age+" name "+ "name"; 18 } 19 20 }
b.客户端发送
1 package ch02.chat; 2 3 public class ClientTest { 4 public static void main(String args[]) 5 { 6 JMSTopic jt=new JMSTopic(); 7 jt.writeMessage( "topic1",new Student(12,"han")); 8 9 10 11 } 12 13 }
c.客户端接受信息
1 package ch02.chat; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.ObjectMessage; 7 import javax.jms.TextMessage; 8 9 10 public class ClientTest2 { 11 12 public static void main(String args[]) 13 { 14 JMSTopic jt=new JMSTopic(); 15 16 jt.receiveMsg("topic1",new MessageListener() 17 { 18 19 @Override 20 public void onMessage(Message message) { 21 // TODO Auto-generated method stub 22 ObjectMessage tm = (ObjectMessage) message; 23 try { 24 System.out.println("Received message: " +tm.getObject()); 25 } catch (JMSException e) { 26 e.printStackTrace(); 27 } 28 29 30 } 31 32 33 } 34 35 36 ); 37 38 39 40 } 41 42 }
运行喽