activemq api的封装

今天无聊写段代码。。学习一下activemq,简单封装了一下activemq 的topic api。跟jdbc很类似

主要代码:

import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
/*本工具封装了*/


import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSTopic {
    TopicConnectionFactory connectionFactory;
    // Connection :JMS 客户端到JMS Provider 的连接
    TopicConnection connection = null;
    //用来发布的会话
    
    TopicSession proSession = null;
    //2一个订阅会话
    TopicSession conSession = null;
    
    //主题发布者
    MessageProducer producer=null;
    //主题
    MessageConsumer consumer=null;
    
    
    // Destination :消息的目的地;消息发送给谁.
    Destination destination;
    // MessageProducer:消息发送者
    
    //默认构造函数,默认的连接activemq,可以写多个构造函数
    public JMSTopic() 
    { 
        connectionFactory =  new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            connection= connectionFactory.createTopicConnection();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        try {
            connection.start();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    
    //此处先固定消息为String类型
    public void writeMessage(String t,String message )
    {
        try {
            proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            
            producer=proSession.createProducer(proSession.createTopic(t));
            
            //使用message构造TextMessage 
            TextMessage text=proSession.createTextMessage();
            text.setText(message);
            producer.send(text);
            
            
            
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        //创建发布会话应该是可以配置的,此处先固定
        
        
    }
    public void writeMessage(String t,Object o )
    {
        try {
            proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            
            producer=proSession.createProducer(proSession.createTopic(t));
            
            //使用message构造TextMessage 
            ObjectMessage text=proSession.createObjectMessage();
            text.setObject((Serializable) o);
            producer.send(text);
            
            
            
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        //创建发布会话应该是可以配置的,此处先固定
        
        
    }
        
    //使用某个Message监听器来监听某个Topic
    public void receiveMsg(String c,MessageListener ml)
    {
        try {
            conSession=connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            Topic t=conSession.createTopic(c);
            consumer=conSession.createConsumer(t);
            //设置过来的监视器
            consumer.setMessageListener(ml);
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        
        
        
    }
    

}

2.测试,发送的消息是对象

a.一个序列化的Stduent 对象

activemq api的封装
 1 package ch02.chat;
 2 
 3 import java.io.Serializable;
 4 
 5 public class Student implements Serializable {
 6     private int age;
 7     private String name;
 8     public Student(int age,String name)
 9     {
10         this.age=age;
11         this.name=name;
12         
13         
14     }
15     public String toString()
16     {
17         return "age ="+age+"  name "+ "name";
18     }
19 
20 }
View Code

b.客户端发送

activemq api的封装
 1 package ch02.chat;
 2 
 3 public class ClientTest {
 4     public static void main(String args[])
 5     {
 6         JMSTopic jt=new JMSTopic();
 7         jt.writeMessage( "topic1",new Student(12,"han"));
 8         
 9         
10         
11     }
12 
13 }
View Code

c.客户端接受信息

activemq api的封装
 1 package ch02.chat;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.TextMessage;
 8 
 9 
10 public class ClientTest2 {
11     
12     public static void main(String args[])
13     {
14         JMSTopic jt=new JMSTopic();
15         
16     jt.receiveMsg("topic1",new MessageListener()
17     {
18 
19         @Override
20         public void onMessage(Message message) {
21             // TODO Auto-generated method stub
22             ObjectMessage tm = (ObjectMessage) message;  
23             try {  
24                 System.out.println("Received message: " +tm.getObject());  
25             } catch (JMSException e) {  
26                 e.printStackTrace();  
27             }  
28             
29             
30         }
31         
32         
33     }
34             
35             
36             );
37         
38         
39         
40     }
41 
42 }
View Code

 

运行喽

 

activemq api的封装

上一篇:windows堆栈溢出利用的七种方式


下一篇:HapiJS开发手册