熟悉activemq的初步试用

 

1.在服务器(阿里云ubuntu16.04)上安装activemq,我是直接下载activemq:

wget http://archive.apache.org/dist/activemq/apache-activemq/5.6.0/apache-activemq-5.6.0-bin.tar.gz

2.解压及安装,可以通过activemq  --help  查看一些命令及参数信息;

3.启动activemq;

4.编写简单的java demo:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger; /**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:07
*/
public class Producter {
/**
* ActiveMq 的默认用户名
*/
private static final String USERNAME = "xx";
/**
* ActiveMq 的默认登录密码
*/
private static final String PASSWORD = "xx";
/**
* ActiveMQ 的链接地址
*/
private static final String BROKEN_URL = "xx"; AtomicInteger count = new AtomicInteger(0);
/**
* 链接工厂
*/
ConnectionFactory connectionFactory; /**
* 链接对象
*/
Connection connection; /**
* 事务管理
*/
Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){
try {
//创建一个链接工厂
// connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://xx");
//从工厂中创建一个链接
connection = connectionFactory.createConnection();
//开启链接
connection.start();
//创建一个事务(这里通过参数可以设置事务的级别)
session = connection.createSession(true,Session.SESSION_TRANSACTED);
} catch (JMSException e) {
e.printStackTrace();
}
} public void sendMessage(String disname){
try {
//创建一个消息队列
Queue queue = session.createQueue(disname);
//消息生产者
MessageProducer messageProducer = null;
if(threadLocal.get()!=null){
messageProducer = threadLocal.get();
}else{
messageProducer = session.createProducer(queue);
threadLocal.set(messageProducer);
}
while(true){
Thread.sleep(1000);
int num = count.getAndIncrement();
//创建一条消息
TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
"productor:生产者!,count:"+num);
System.out.println(Thread.currentThread().getName()+
"productor:生产东西!,count:"+num);
//发送消息
messageProducer.send(msg);
//提交事务
session.commit();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

  

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger; /**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:08
*/
public class Comsumer { /**
* ActiveMq 的默认用户名
*/
private static final String USERNAME = "xx";
/**
* ActiveMq 的默认登录密码
*/
private static final String PASSWORD = "xx";
/**
* ActiveMQ 的链接地址
*/
private static final String BROKEN_URL = "xx"; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
AtomicInteger count = new AtomicInteger(); public void init(){
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://xx");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
} public void getMessage(String disname){
try {
Queue queue = session.createQueue(disname);
MessageConsumer consumer = null; if(threadLocal.get()!=null){
consumer = threadLocal.get();
}else{
consumer = session.createConsumer(queue);
threadLocal.set(consumer);
}
while(true){
Thread.sleep(1000);
TextMessage msg = (TextMessage) consumer.receive();
if(msg!=null) {
msg.acknowledge();
System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

  

test:

/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:48
*/
public class TestConsumer {
public static void main(String[] args){
Comsumer comsumer = new Comsumer();
comsumer.init();
TestConsumer testConsumer = new TestConsumer();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
new Thread(testConsumer.new ConsumerMq(comsumer)).start();
} private class ConsumerMq implements Runnable{
Comsumer comsumer;
public ConsumerMq(Comsumer comsumer){
this.comsumer = comsumer;
} @Override
public void run() {
while(true){
try {
comsumer.getMessage("zq-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

  

/**
* Created by IntelliJ IDEA.
*
* @author
* Description:
* Date: 2017/11/26
* Time: 12:17
*/
public class TestMq {
public static void main(String[] args){
Producter producter = new Producter();
producter.init();
TestMq testMq = new TestMq();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//Thread 1
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 2
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 3
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 4
new Thread(testMq.new ProductorMq(producter)).start();
//Thread 5
new Thread(testMq.new ProductorMq(producter)).start();
} private class ProductorMq implements Runnable{
Producter producter;
public ProductorMq(Producter producter){
this.producter = producter;
} @Override
public void run() {
while(true){
try {
producter.sendMessage("zq-MQ");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

  

以上仅仅是一个简单的demo,实际生产环境种使用,须要考虑使用场景,配置activemq的配置文件,以及与项目的整合问题。

上一篇:关于高性能javascript 笔记


下一篇:ZOJ 3362 Beer Problem(SPFA费用流应用)