[ActiveMQ]发送消息与接收消息测试Demo

ubuntu下:
安装并启动ActiveMQ
[ActiveMQ]发送消息与接收消息测试Demo

1.官网http://activemq.apache.org/下载最新版本的ActiveMQ,并解压;

2.进入对应目录,sudo ./activemq start

3,启动成功后,登录http://localhose:8161/admin/,登陆账号和密码都为admin,创建一个queue名为jyQueue;
[ActiveMQ]发送消息与接收消息测试Demo

启动成功;

发送端:

package MQjar.main;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.BooleanEditor;

import javax.jms.*;

/**
 * Created by IT-2 on 2017/3/6.
 */
public class Sender {
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args){
        //ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;

        //Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;

        //Session:一个发送或接收消息的线程
        Session session;

        //Destination:消息的目的地;消息的接收者
        Destination destination;

        //MessageProducer:消息发送者
        MessageProducer producer;

        //TextMessage message;
        //构造ConnectionFactory实例对象,此处采用ActiveMQ的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://192.168.233.135:61616"
        );

        try{
            //构造从工厂得到连接对象
            connection = connectionFactory.createConnection();

            //启动
            connection.start();

            //获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            //获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在ActiveMQ的console配置
            destination = session.createQueue("jyQueue");

            //得到消息生成者(也就是发送者)
            producer = session.createProducer(destination);

            //设置不持久化,此处学习所用,实际情况请根据项目决定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //构造消息,这里写死了,项目就是参数,或者方法获取
            sendMessage(session,producer);
            session.commit();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                if(null != connection)
                    connection.close();
            }catch (Throwable ignore){}
        }
    }

public static void sendMessage(Session session, MessageProducer producer) throws Exception{
    for (int i = 1; i <= SEND_NUMBER; i++){
        TextMessage message = session.createTextMessage("ActiveMQ...发送消息..." + i);

        //发送消息到目的地
        System.out.println("发送消息:" + "ActiveMQ...发送的消息..." + i);
        producer.send(message);
    }
}
}

接收端:

package MQjar.main;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.BooleanEditor;

import javax.jms.*;


/**
 * Created by IT-2 on 2017/3/6.
 */
public class Receiver {
    public static void main(String[] args) {
        //ConnectionFactory : 连接工厂,JMS用它来创建连接
        ConnectionFactory connectionFactory;

        //Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;

        //Session:一个发送或接收消息的线程
        Session session;

        //Destination:消息的目的地;消息的接收者
        Destination destination;

        //消费者,消息接收者
        MessageConsumer consumer;

        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://192.168.233.135:61616"
        );
        try {
            //构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            //启动
            connection.start();
            //获取操作连接
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在ActiveMQ的console配置
            destination = session.createQueue("jyQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里定位100s
                TextMessage message = (TextMessage) consumer.receive(100000);

                if (null != message) {
                    System.out.println("收到消息..." + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection) connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

运行发送端:

[ActiveMQ]发送消息与接收消息测试Demo

再运行接收端:

[ActiveMQ]发送消息与接收消息测试Demo

参考资料:
ActiveMQ实例1–简单的发送和接收消息

ActiveMQ使用教程

上一篇:ML之Xgboost:利用Xgboost模型(7f-CrVa+网格搜索调参)对数据集(比马印第安人糖尿病)进行二分类预测


下一篇:MaxCompute在高德大数据上的应用