MessageQueue的使用方法(一)

 //check is existed and create
if (MessageQueue.Exists(@".\Private$\MSMQtest") == false)
{
    MessageQueue.Create(@".\Private$\MSMQtest", true);
}
---------------------------------------------------------------------------
 //根据机器名字取得所有的队列数组
MessageQueue[] qs = MessageQueue.GetPrivateQueuesByMachine("127.0.0.1");

---------------------------------------------------------------------------
//获取MSMQ所在的机器GUID号 
Guid g= MessageQueue.GetMachineId("127.0.0.1");//工作组安装计算机不支持该操作
---------------------------------------------------------------------------
// Receive message, 同步的Receive方法阻塞当前执行线程,直到一个message可以得到    
System.Messaging.Message message = queue.Receive();

---------------------------------------------------------------------------            
q =MessageQueue.GetPublicQueues();//工作组安装计算机不支持该操作
---------------------------------------------------------------------------
MessageQueue queue = new MessageQueue(@".\Private$\MSMQtest");
//queue.AccessMode默认值=QueueAccessMode.SendAndReceive
//Peek     队列仅可以查看消息。 
//PeekAndAdmin     队列可以查看消息。消息也可以被清除。 
//Receive     队列可以查看或接收消息。 
//ReceiveAndAdmin     队列可以接收消息。消息也可以被清除。 
//Send     队列仅可以发送消息。 
//SendAndReceive     队列可以查看、接收或发送消息。 
---------------------------------------------------------------------------
Guid g = queue.Category;
Console.WriteLine(g.ToString());
---------------------------------------------------------------------------
//DenySharedReceive 指定此 MessageQueue 引用的队列的共享模式。
//将 DenySharedReceive 设置为 true,以指示只有此 MessageQueue 应具有通过指定的 Path 查看或接收来自队列的消息的访问权。
//如果另一个 MessageQueue 或另一个应用程序与同一队列资源关联,则该实例或应用程序将无法查看或接收消息,但它仍可发送消息。
//如果 DenySharedReceive 为 false,则队列可供多个应用程序用来发送、查看或接收消息。
bool b = queue.DenySharedReceive;

---------------------------------------------------------------------------
Message myMessage1 = new Message("test", new BinaryMessageFormatter());
Message myMessage2 = new Message("test", new XmlMessageFormatter());
Message myMessage3 = new Message(new people() { age = 100, name = "cdd" }, new BinaryMessageFormatter());

queue.Send(myMessage1);
queue.Send(myMessage2);
queue.Send(myMessage3);

发送message的时候,指定的Formatter,在receive的时候也需要设置queue.Formatter,比如queue.Formatter = new BinaryMessageFormatter();
---------------------------------------------------------------------------
 queue.Receive()会删除当前消息。
 queue.Peek()不会删除当前消息。
---------------------------------------------------------------------------
 ///判断queue是不是空了。
static public bool IsQueueEmpty(MessageQueue myQueue)
        {
            bool isQueueEmpty = false;

            // Connect to a queue.
            //MessageQueue myQueue = new MessageQueue(".\\myQueue");

            try
            {
                // Set Peek to return immediately.
                myQueue.Peek(new TimeSpan(0));

                // If an IOTimeout was not thrown, there is a message 
                // in the queue.
                isQueueEmpty = false;
            }

            catch (MessageQueueException e)
            {
                if (e.MessageQueueErrorCode ==
                    MessageQueueErrorCode.IOTimeout)
                {
                    // No message was in the queue.
                    isQueueEmpty = true;
                }

                // Handle other sources of MessageQueueException.
            }

            // Handle other exceptions as necessary.

            // Return true if there are no messages in the queue.
            return isQueueEmpty;

        }

---------------------------------------------------------------------------
queue.GetMessageEnumerator2()方法,为队列中的所有消息创建枚举数对象。可以使用MoveNext方法。
GetMessageEnumerator2 创建队列中所有消息的动态列表。可通过对 GetMessageEnumerator2 返回的 MessageEnumerator 调用 RemoveCurrent,从队列中移除位于枚举数的当前位置的消息。 
如果需要队列中消息的静态快照而不是与它们的动态连接,请调用 GetAllMessages。此方法返回 Message 对象的数组,这些对象表示调用该方法时的消息。 

GetMessageEnumerator方法已经已过时。GetMessageEnumerator已被否决。应改用 GetMessageEnumerator2。 
---------------------------------------------------------------------------
异步读取queue和同步读取queue。

while (true)
{
    if (!IsQueueEmpty(queue))
    {
        IAsyncResult iresult = queue.BeginReceive(new TimeSpan(0), null, (ar) =>
        {
            lock (_lock)
            {
                SaveFile(@"c:\pp.txt", ar.IsCompleted.ToString());
            }
        });
        //do something
        Message mg = queue.EndReceive(iresult);
        SaveFile(@"c:\vv.txt", (string)mg.Body);
    }
    else
    {
        Console.WriteLine("No MSG,Waiting 2s");
        Thread.Sleep(2000);
        Console.WriteLine("2s passed");

    }
}


while (true)
{
    if (!IsQueueEmpty(queue))
    {
        Message mg = queue.Receive();
    }
    else
    {
        Console.WriteLine("No MSG,Waiting 2s");
        Thread.Sleep(2000);
        Console.WriteLine("2s passed");

    }
}

---------------------------------------------------------------------------
在MSMQ支持事务的情况下,可以使用MessageQueueTransaction
代码如下:
for (int i = 0; i < 2; i++)
{
Message myMessage1 = new Message(DateTime.Now.ToString("yyyy:MM:dd hh:mm:ss fff"), new BinaryMessageFormatter());
queue.Send(myMessage1,MessageQueueTransactionType.Single);
}

MessageQueueTransaction t = new MessageQueueTransaction();
try
{
t.Begin();
queue.Receive(t);
throw null;
t.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
t.Abort();
}

---------------------------------------------------------------------------
游标的使用
queue.CreateCursor();可以创建一个游标,然后在peek的时候,可以将这个游标作为参数传入。
要注意,第一次使用的时候,不能直接指定PeekAction.Next,比如m = queue.Peek(new TimeSpan(1000), c, PeekAction.Next);会出错。首次使用只能m = queue.Peek(new TimeSpan(1000), c, PeekAction.Current);之后游标自动指向下一个,接下来才能使用queue.Peek(new TimeSpan(1000), c, PeekAction.Next);

此处使用游标来计算消息队列里的消息个数
---------------------------------------------------------------------------
Cursor c = queue.CreateCursor();
            try
            {
                Message m = queue.Peek(new TimeSpan(1), c, PeekAction.Current);
                if (m != null)
                {
                    count = 1;
                    Message m1 = new Message();
                    while (m1 != null)
                    {
                        try
                        {
                            m1 = queue.Peek(new TimeSpan(1), c, PeekAction.Next);
                            count++;
                        }
                        catch (MessageQueueException ex)
                        {
                        //http://msdn.microsoft.com/zh-cn/library/system.messaging.messagequeueerrorcode%28v=VS.80%29.aspx
                            if (ex.MessageQueueErrorCode!=MessageQueueErrorCode.IOTimeout)
                            {
                                throw;
                            }

                            m1 = null;
                        }
                    }
                }
                Console.WriteLine(m.Id);

            }
            catch (MessageQueueException ex1)
            {
                if (ex1.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
                {
                    throw;
                }
            }

 

---------------------------------------------------------------------------
        //**************************************************
        // Determines whether a queue is empty. The Peek()
        // method throws an exception if there is no message
        // in the queue. This method handles that exception 
        // by returning true to the calling method.
        //**************************************************

        static public bool IsQueueEmpty(MessageQueue myQueue)
        {
            bool isQueueEmpty = false;

            // Connect to a queue.
            //MessageQueue myQueue = new MessageQueue(".\\myQueue");

            try
            {
                // Set Peek to return immediately.
                myQueue.Peek(new TimeSpan(0));

                // If an IOTimeout was not thrown, there is a message 
                // in the queue.
                isQueueEmpty = false;
            }

            catch (MessageQueueException e)
            {
                if (e.MessageQueueErrorCode ==
                    MessageQueueErrorCode.IOTimeout)
                {
                    // No message was in the queue.
                    isQueueEmpty = true;
                }

                // Handle other sources of MessageQueueException.
            }

            // Handle other exceptions as necessary.

            // Return true if there are no messages in the queue.
            return isQueueEmpty;

        }

 






















本文转自cnn23711151CTO博客,原文链接:http://blog.51cto.com/cnn237111/617450 ,如需转载请自行联系原作者

上一篇:Flink Forward Asia 2021 正式启动!议题火热征集中!


下一篇:数仓实时化改造:Hudi on Flink 在顺丰的实践应用