在某些业务场景下,可能会用到按需分发消息。
对于AMQ他内置了很多的分发策略可供我们选择(DispatchPolicy的实现类),如:PriorityDispatchPolicy, PriorityNetworkDispatchPolicy, RoundRobinDispatchPolicy, SimpleDispatchPolicy, StrictOrderDispatchPolicy。
那我们也可以自己去实现DispatchPolicy接口,做一个适合特定业务场景的分发策略。
首先我们要去http://svn.apache.org/repos/asf/activemq这里下载对应版本的源码;
本例中为方便起见,我们拷贝了一段SimpleDispatchPolicy类的代码(路径:org.apache.activemq.broker.region.policy),当做我们的自定义类的内容,如:
public class TestDispatchPolicy implements DispatchPolicy { public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { ActiveMQDestination _destination = node.getMessage().getDestination(); // 取得Topic name 和前缀,如:topic://Topic.foo System.out.println("-------->_destination.getQualifiedName:"+ _destination.getQualifiedName()); // 取得Topic name,如:Topic.foo System.out.println("-------->_destination.getPhysicalName:"+ _destination.getPhysicalName()); synchronized (consumers) { int count = 0; for (Iterator iter = consumers.iterator(); iter.hasNext();) { Subscription sub = (Subscription)iter.next(); // 取得消费者的clientId,如:connection.setClientID("YourClientID"); System.out.println("-------->sub.getContext().getClientId:"+ sub.getContext().getClientId()); // Only dispatch to interested subscriptions if (!sub.matches(node, msgContext)) { sub.unmatched(node); continue; } sub.add(node); count++; } return count > 0; } } }
ok,接着我们做如下几个步骤:
1、将这个类放到activemq project/activemq-broker的相应路径下,在activemq-project/目录下执行mvn package;
2、待maven执行完成后,将activemq-broker-version.jar和activemq-spring-version.jar放入到apache-activemq-version-bin/lib/目录下,替换对应的文件;
3、修改activemq.xml,加入元素dispatchPolicy,完成。
以Apache ActiveMQ单点基本配置 activemq.xml为基础,修改的内容如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb"> <dispatchPolicy> <! -- 新增的分发策略 --> <testDispatchPolicy/> </dispatchPolicy> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
至此我们就实现了Activemq的自定义分发策略功能,启动activemq,查看一下控制台。
参考资源:
http://activemq.apache.org/dispatch-policies.html
http://activemq.apache.org/maven/5.9.0/apidocs/index.html?deprecated-list.html