Apache ActiveMQ 集群配置方法

构建高可用的AMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现。


AMQ实现高可用部署有三种方案:

1、Master-Slave

2、SharedFile System Master Slave

3、JDBCMaster Slave


第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;

而第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;

而第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。


集群配置方法参考:

http://activemq.apache.org/masterslave.html


“Apache ActiveMQ单点基本配置” 原配置内容:

<persistenceAdapter>  
            <kahaDB directory="${activemq.data}/kahadb"  
                    enableIndexWriteAsync="true"  
                    enableJournalDiskSyncs="false"/>  
            <!--<amqPersistenceAdapter directory="${activemq.data}/activemq-data" /> -->  
        </persistenceAdapter> 

修改为:

<persistenceAdapter>
             <kahaDB directory="X:\\shareBrokerData"
						enableIndexWriteAsync="true"
						enableJournalDiskSyncs="false"/>
        </persistenceAdapter>

注意:

1、前面提到部署一台设备上的AMQ系统,需要修改对应的端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。

2、如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统)


Producer for java:

ActiveMQConnectionFactory connectionFactory =   
        new ActiveMQConnectionFactory("failover:(
tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0,
tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration=0)");  
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
      
    Queue queue = session.createQueue(qName);  
    ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); // Default DeliveryMode.PERSISTENT  
    //producer.setPriority(3); // Default priority 4.  
    //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
      
    session.createObjectMessage();  
    jmsMessage.setObject(message);  
    while(true){        

         producer.send(jmsMessage);  
         TimeUnit.MILLISECONDS.sleep(10);

 }

Consumer for java:

final String qName = "Test.foo?consumer.prefetchSize=100";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("
failover:(
tcp://192.168.0.87:61616,
tcp://192.168.0.87:61617
)"); 
Connection connection = connectionFactory.createConnection();
connection.start();

final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
Destination destination = session.createQueue(qName);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);
System.out.println(consumer.getPrefetchNumber());

//listener 方式 
consumer.setMessageListener(new MessageListener() {
	 public void onMessage(Message msg) {
	 //TODO something.... 
		try {
			System.out.println("收到消息:"+counter.incrementAndGet()+","+msg);
		} catch (JMSException e1) {
		// TODO Auto-generated
		 catch blocke1.printStackTrace();
		}
	 }
 });

通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。

failover参数配置参考:

http://activemq.apache.org/failover-transport-reference.html





Apache ActiveMQ 集群配置方法

上一篇:使用Eval()绑定数据时使用三元运算符


下一篇:LeetCode之Climbing Stairs