IBM.WMQ订阅主题,连续获取消息解决办法

去队列里面一直获取消息,一开始想到了两种解决方案:

第一:订阅一次获取一次消息,正常的话每次都能获取到,但是要及时去清理订阅并且时间粒度不好控制

第二:订阅一次,再获取消息这里加死循环,超时MQ已经做了,所以可以不用控制线程等待,获取到消息了以后,直接通过自定义事件的机制去及时处理消息

从最终实验结果来看,第二种是最优的做法,可以做到随时获取到消息,又不占用资源。接下来我把最终的实现代码分享处理,希望对大家有所帮助,有不对的地方,请及时联系我。

订阅主题:

/// <summary>
        /// 订阅主题。订阅一次并尝试一直获取消息
        /// </summary>
        /// <param name="business"></param>
        public void SubTopic1(string business)
        {
            IBMWMQEventSource msgHandler = new IBMWMQEventSource();
            IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener();
            MQSubscription subs = null;
            try
            {
                using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                {
                    subs = new MQSubscription(mqmgr);
                    if (mqmgr.IsConnected)
                    {
                        //订阅事件
                        msgEventListener.Subscribe(msgHandler);

                        int option = MQC.MQSO_CREATE + MQC.MQSO_RESUME + MQC.MQSO_NON_DURABLE + MQC.MQSO_MANAGED + MQC.MQSO_FAIL_IF_QUIESCING;
                        string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business);
                        string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business);

                        try
                        {
                            subs.Subscribe(subName, option, topicName);
                        }
                        catch (MQException ex)
                        {
                            string code = ex.Reason.ToString();
                            //引发事件
                            msgHandler.RaiseErroeMsgEvent(business, code);
                        }

                        while (true)
                        {
                            msgHandler.RaiseErroeMsgEvent(business, string.Format("开始尝试获取 {0} 消息...", business));
                            try
                            {
                                MQMessage incoming = new MQMessage();
                                MQGetMessageOptions gmo = new MQGetMessageOptions();
                                gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED;
                                gmo.Options |= MQC.MQGMO_WAIT;
                                gmo.Options |= MQC.MQGMO_SYNCPOINT;

                                subs.Get(incoming, gmo);
                                string message = incoming.ReadAll();

                                if (!string.IsNullOrEmpty(message))
                                {
                                    //引发事件
                                    msgHandler.RaiseNewMsgEvent(business, message);
                                }
                            }
                            catch (MQException ex)
                            {
                                string code = ex.Reason.ToString();
                                //引发事件
                                msgHandler.RaiseErroeMsgEvent(business, code);
                            }
                        }
                    }
                }
            }
            catch (MQException e)
            {
                string code = e.Reason.ToString();
                //引发事件
                msgHandler.RaiseErroeMsgEvent(business, code);
            }
            finally
            {
                //if (subs != null)
                //{
                //    subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE);
                //}
            }
        }

接下来开始自定义事件:

定义事件中心:

public class IBMWMQEventSource
    {
        /// <summary>
        /// 新消息处理委托
        /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        public delegate void NewMsgHandler(string business, string msg);
        /// <summary>
        /// 新消息处理事件
        /// </summary>
        public event NewMsgHandler NewMsgEventHandler;

        /// <summary>
        /// 错误消息处理委托
        /// </summary>
        /// <param name="errorCode"></param>
        public delegate void ErrorMsgHandler(string business, string errorCode);
        /// <summary>
        /// 错误消息处理事件
        /// </summary>
        public event ErrorMsgHandler ErrorMsgEventHandler;

        /// <summary>
        /// 引发新消息处理事件的方法
        /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        public void RaiseNewMsgEvent(string business, string msg)
        {
            if (NewMsgEventHandler != null)
            {
                NewMsgEventHandler.Invoke(business, msg);
            }
        }

        /// <summary>
        /// 引发错误消息处理事件的方法
        /// </summary>
        /// <param name="business"></param>
        /// <param name="errorCode"></param>
        public void RaiseErroeMsgEvent(string business, string errorCode)
        {
            if (ErrorMsgEventHandler != null)
            {
                ErrorMsgEventHandler.Invoke(business, errorCode);
            }
        }
    }

定义事件监听器:

public class IBMWMQMsgEventListener : MsgHandler
    {
        /// <summary>
        /// 订阅事件
        /// </summary>
        public void Subscribe(IBMWMQEventSource eventSource)
        {
            eventSource.NewMsgEventHandler += this.OnNewMsgHandler;
            eventSource.ErrorMsgEventHandler += this.OnErrorMsgHandler;
        }

        /// <summary>
        /// 取消订阅事件
        /// </summary>
        public void UnSubscribe(IBMWMQEventSource eventSource)
        {
            eventSource.NewMsgEventHandler -= this.OnNewMsgHandler;
            eventSource.ErrorMsgEventHandler -= this.OnErrorMsgHandler;
        }
    }

定义消息处理方法:

public class MsgHandler : BaseJobManager
    {
        /// <summary>
        /// 消息自定义业务处理
        /// </summary>
        private static CustomBusinessHandler customBusinessHandler = new CustomBusinessHandler();

        /// <summary>
        /// 处理新消息
        /// </summary>
        /// <param name="business"></param>
        /// <param name="msg"></param>
        protected virtual void OnNewMsgHandler(string business, string msg)
        {
            //获取配置文件
            List<JobConfigEntity> configs = InitJobConfig();
            string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name;

            switch (business)
            {
                case IBMWMQConfig.BUSINESS_NAME_ZDFZ09:
                    msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZDF_Z09);
                    customBusinessHandler.DO_ZDF_Z09(business, msg);
                    break;
                case IBMWMQConfig.BUSINESS_NAME_ZAPZ10:
                    msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZAP_Z10);
                    customBusinessHandler.DO_ZAP_Z10(business, msg);
                    break;
                case IBMWMQConfig.BUSINESS_NAME_OULR24:
                    msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_OUL_R24);
                    customBusinessHandler.DO_OUL_R24(business, msg);
                    break;
            }

            this.WriteInfo(this.GetType(), string.Format("收到来自{0}的消息,数据包:{1}", businessName, msg));
        }

        /// <summary>
        /// 处理错误消息
        /// </summary>
        /// <param name="business"></param>
        /// <param name="errorCode"></param>
        protected virtual void one rrorMsgHandler(string business, string errorCode)
        {
            if (!string.IsNullOrEmpty(errorCode))
            {
                //获取配置文件
                List<JobConfigEntity> configs = InitJobConfig();
                string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name;

                //TODO 其他消息内容校验
                if (errorCode.Equals("2033"))
                {
                    this.WriteInfo(this.GetType(), string.Format("{0} 无消息({1})。", businessName, errorCode));
                }
                else if (errorCode.Equals("2085"))
                {
                    this.WriteInfo(this.GetType(), string.Format("{0} 主题不存在({1})。", businessName, errorCode));
                }
                else if (errorCode.Equals("2429"))
                {
                    this.WriteInfo(this.GetType(), string.Format("{0} 主题已被订阅({1})。", businessName, errorCode));
                }
                else if (errorCode.Equals("2538"))
                {
                    this.WriteInfo(this.GetType(), string.Format("无法连接消息队列主机({0})。", errorCode));
                }
                else
                {
                    if (errorCode.Length == 4)
                    {
                        this.WriteInfo(this.GetType(), string.Format("未知错误消息,错误原因编码:{0}。", errorCode));
                    }
                    else
                    {
                        this.WriteInfo(this.GetType(), string.Format("{0} {1}", businessName, errorCode));
                    }
                }
            }
        }
    }

去掉消息头:

/// <summary>
        /// 去掉消息头
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="name"></param>
        /// <returns></returns>
        public string RemoveMsgHeader(string msg, string name)
        {
            msg = msg.Trim();

            //去掉消息头
            int index = msg.IndexOf("<" + name, StringComparison.Ordinal);
            if (index > 0)
            {
                string temp = msg.Substring(0, index);
                msg = msg.Substring(index, msg.Length - temp.Length);
                msg = msg.Trim();
            }

            return msg;
        }

配置信息:

public class IBMWMQConfig
    {
        /// <summary>
        /// MQ主机地址
        /// </summary>
        private static readonly string CONNECTION_HOST = ConfigHelper.GetValue("IBM_WMQ_CONNECTION_HOST");
        /// <summary>
        /// 通讯端口
        /// </summary>
        private const int CONNECTION_PORT = 4421;
        /// <summary>
        /// CLIENT_ID
        /// </summary>
        private const string CLIENT_ID = "";
        /// <summary>
        /// 通道名称
        /// </summary>
        public const string CHANNEL = "";
        /// <summary>
        /// 队列管理器名称
        /// </summary>
        public const string QUEUE_MGR_NAME = "";
        /// <summary>
        /// 订阅主题持久化标识,{0}标识具体业务
        /// </summary>
        public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ";
        /// <summary>
        /// 主题名称模板,{0}标识具体业务
        /// </summary>
        public static readonly string TOPIC_TEMPLATE = "{0}.REQ";
        /// <summary>
        /// IBM.WMQ连接字符串
        /// </summary>
        public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT);
        
    }

调用:

         CustomMsgHandler helper = new CustomMsgHandler();
         helper.SubTopic1(IBMWMQConfig.BUSINESS_NAME_ZAPZ10);

 

上一篇:corefx 源码学习:NetworkStream.ReadAsync 是如何从 Socket 异步读取数据的


下一篇:自定义全局异常处理器