去队列里面一直获取消息,一开始想到了两种解决方案:
第一:订阅一次获取一次消息,正常的话每次都能获取到,但是要及时去清理订阅并且时间粒度不好控制
第二:订阅一次,再获取消息这里加死循环,超时MQ已经做了,所以可以不用控制线程等待,获取到消息了以后,直接通过自定义事件的机制去及时处理消息
从最终实验结果来看,第二种是最优的做法,可以做到随时获取到消息,又不占用资源。接下来我把最终的实现代码分享处理,希望对大家有所帮助,有不对的地方,请及时联系我。
订阅主题:
/// <summary> /// 订阅主题。订阅一次并尝试一直获取消息 /// </summary> /// <param name="business"></param> public void SubTopic1(string business) { IBMWMQEventSource msgHandler = new IBMWMQEventSource(); IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener(); MQSubscription subs = null; try { using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO)) { subs = new MQSubscription(mqmgr); if (mqmgr.IsConnected) { //订阅事件 msgEventListener.Subscribe(msgHandler); int option = MQC.MQSO_CREATE + MQC.MQSO_RESUME + MQC.MQSO_NON_DURABLE + MQC.MQSO_MANAGED + MQC.MQSO_FAIL_IF_QUIESCING; string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business); string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business); try { subs.Subscribe(subName, option, topicName); } catch (MQException ex) { string code = ex.Reason.ToString(); //引发事件 msgHandler.RaiseErroeMsgEvent(business, code); } while (true) { msgHandler.RaiseErroeMsgEvent(business, string.Format("开始尝试获取 {0} 消息...", business)); try { MQMessage incoming = new MQMessage(); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED; gmo.Options |= MQC.MQGMO_WAIT; gmo.Options |= MQC.MQGMO_SYNCPOINT; subs.Get(incoming, gmo); string message = incoming.ReadAll(); if (!string.IsNullOrEmpty(message)) { //引发事件 msgHandler.RaiseNewMsgEvent(business, message); } } catch (MQException ex) { string code = ex.Reason.ToString(); //引发事件 msgHandler.RaiseErroeMsgEvent(business, code); } } } } } catch (MQException e) { string code = e.Reason.ToString(); //引发事件 msgHandler.RaiseErroeMsgEvent(business, code); } finally { //if (subs != null) //{ // subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE); //} } }
接下来开始自定义事件:
定义事件中心:
public class IBMWMQEventSource { /// <summary> /// 新消息处理委托 /// </summary> /// <param name="business"></param> /// <param name="msg"></param> public delegate void NewMsgHandler(string business, string msg); /// <summary> /// 新消息处理事件 /// </summary> public event NewMsgHandler NewMsgEventHandler; /// <summary> /// 错误消息处理委托 /// </summary> /// <param name="errorCode"></param> public delegate void ErrorMsgHandler(string business, string errorCode); /// <summary> /// 错误消息处理事件 /// </summary> public event ErrorMsgHandler ErrorMsgEventHandler; /// <summary> /// 引发新消息处理事件的方法 /// </summary> /// <param name="business"></param> /// <param name="msg"></param> public void RaiseNewMsgEvent(string business, string msg) { if (NewMsgEventHandler != null) { NewMsgEventHandler.Invoke(business, msg); } } /// <summary> /// 引发错误消息处理事件的方法 /// </summary> /// <param name="business"></param> /// <param name="errorCode"></param> public void RaiseErroeMsgEvent(string business, string errorCode) { if (ErrorMsgEventHandler != null) { ErrorMsgEventHandler.Invoke(business, errorCode); } } }
定义事件监听器:
public class IBMWMQMsgEventListener : MsgHandler { /// <summary> /// 订阅事件 /// </summary> public void Subscribe(IBMWMQEventSource eventSource) { eventSource.NewMsgEventHandler += this.OnNewMsgHandler; eventSource.ErrorMsgEventHandler += this.OnErrorMsgHandler; } /// <summary> /// 取消订阅事件 /// </summary> public void UnSubscribe(IBMWMQEventSource eventSource) { eventSource.NewMsgEventHandler -= this.OnNewMsgHandler; eventSource.ErrorMsgEventHandler -= this.OnErrorMsgHandler; } }
定义消息处理方法:
public class MsgHandler : BaseJobManager { /// <summary> /// 消息自定义业务处理 /// </summary> private static CustomBusinessHandler customBusinessHandler = new CustomBusinessHandler(); /// <summary> /// 处理新消息 /// </summary> /// <param name="business"></param> /// <param name="msg"></param> protected virtual void OnNewMsgHandler(string business, string msg) { //获取配置文件 List<JobConfigEntity> configs = InitJobConfig(); string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name; switch (business) { case IBMWMQConfig.BUSINESS_NAME_ZDFZ09: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZDF_Z09); customBusinessHandler.DO_ZDF_Z09(business, msg); break; case IBMWMQConfig.BUSINESS_NAME_ZAPZ10: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZAP_Z10); customBusinessHandler.DO_ZAP_Z10(business, msg); break; case IBMWMQConfig.BUSINESS_NAME_OULR24: msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_OUL_R24); customBusinessHandler.DO_OUL_R24(business, msg); break; } this.WriteInfo(this.GetType(), string.Format("收到来自{0}的消息,数据包:{1}", businessName, msg)); } /// <summary> /// 处理错误消息 /// </summary> /// <param name="business"></param> /// <param name="errorCode"></param> protected virtual void one rrorMsgHandler(string business, string errorCode) { if (!string.IsNullOrEmpty(errorCode)) { //获取配置文件 List<JobConfigEntity> configs = InitJobConfig(); string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name; //TODO 其他消息内容校验 if (errorCode.Equals("2033")) { this.WriteInfo(this.GetType(), string.Format("{0} 无消息({1})。", businessName, errorCode)); } else if (errorCode.Equals("2085")) { this.WriteInfo(this.GetType(), string.Format("{0} 主题不存在({1})。", businessName, errorCode)); } else if (errorCode.Equals("2429")) { this.WriteInfo(this.GetType(), string.Format("{0} 主题已被订阅({1})。", businessName, errorCode)); } else if (errorCode.Equals("2538")) { this.WriteInfo(this.GetType(), string.Format("无法连接消息队列主机({0})。", errorCode)); } else { if (errorCode.Length == 4) { this.WriteInfo(this.GetType(), string.Format("未知错误消息,错误原因编码:{0}。", errorCode)); } else { this.WriteInfo(this.GetType(), string.Format("{0} {1}", businessName, errorCode)); } } } } }
去掉消息头:
/// <summary> /// 去掉消息头 /// </summary> /// <param name="msg"></param> /// <param name="name"></param> /// <returns></returns> public string RemoveMsgHeader(string msg, string name) { msg = msg.Trim(); //去掉消息头 int index = msg.IndexOf("<" + name, StringComparison.Ordinal); if (index > 0) { string temp = msg.Substring(0, index); msg = msg.Substring(index, msg.Length - temp.Length); msg = msg.Trim(); } return msg; }
配置信息:
public class IBMWMQConfig { /// <summary> /// MQ主机地址 /// </summary> private static readonly string CONNECTION_HOST = ConfigHelper.GetValue("IBM_WMQ_CONNECTION_HOST"); /// <summary> /// 通讯端口 /// </summary> private const int CONNECTION_PORT = 4421; /// <summary> /// CLIENT_ID /// </summary> private const string CLIENT_ID = ""; /// <summary> /// 通道名称 /// </summary> public const string CHANNEL = ""; /// <summary> /// 队列管理器名称 /// </summary> public const string QUEUE_MGR_NAME = ""; /// <summary> /// 订阅主题持久化标识,{0}标识具体业务 /// </summary> public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ"; /// <summary> /// 主题名称模板,{0}标识具体业务 /// </summary> public static readonly string TOPIC_TEMPLATE = "{0}.REQ"; /// <summary> /// IBM.WMQ连接字符串 /// </summary> public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT); }
调用:
CustomMsgHandler helper = new CustomMsgHandler(); helper.SubTopic1(IBMWMQConfig.BUSINESS_NAME_ZAPZ10);