//check is existed and create
if (MessageQueue.Exists(@".\Private$\MSMQtest") == false)
{
MessageQueue.Create(@".\Private$\MSMQtest", true);
}
---------------------------------------------------------------------------
//根据机器名字取得所有的队列数组
MessageQueue[] qs = MessageQueue.GetPrivateQueuesByMachine("127.0.0.1");
---------------------------------------------------------------------------
//获取MSMQ所在的机器GUID号
Guid g= MessageQueue.GetMachineId("127.0.0.1");//工作组安装计算机不支持该操作
---------------------------------------------------------------------------
// Receive message, 同步的Receive方法阻塞当前执行线程,直到一个message可以得到
System.Messaging.Message message = queue.Receive();
---------------------------------------------------------------------------
q =MessageQueue.GetPublicQueues();//工作组安装计算机不支持该操作
---------------------------------------------------------------------------
MessageQueue queue = new MessageQueue(@".\Private$\MSMQtest");
//queue.AccessMode默认值=QueueAccessMode.SendAndReceive
//Peek 队列仅可以查看消息。
//PeekAndAdmin 队列可以查看消息。消息也可以被清除。
//Receive 队列可以查看或接收消息。
//ReceiveAndAdmin 队列可以接收消息。消息也可以被清除。
//Send 队列仅可以发送消息。
//SendAndReceive 队列可以查看、接收或发送消息。
---------------------------------------------------------------------------
Guid g = queue.Category;
Console.WriteLine(g.ToString());
---------------------------------------------------------------------------
//DenySharedReceive 指定此 MessageQueue 引用的队列的共享模式。
//将 DenySharedReceive 设置为 true,以指示只有此 MessageQueue 应具有通过指定的 Path 查看或接收来自队列的消息的访问权。
//如果另一个 MessageQueue 或另一个应用程序与同一队列资源关联,则该实例或应用程序将无法查看或接收消息,但它仍可发送消息。
//如果 DenySharedReceive 为 false,则队列可供多个应用程序用来发送、查看或接收消息。
bool b = queue.DenySharedReceive;
---------------------------------------------------------------------------
Message myMessage1 = new Message("test", new BinaryMessageFormatter());
Message myMessage2 = new Message("test", new XmlMessageFormatter());
Message myMessage3 = new Message(new people() { age = 100, name = "cdd" }, new BinaryMessageFormatter());
queue.Send(myMessage1);
queue.Send(myMessage2);
queue.Send(myMessage3);
发送message的时候,指定的Formatter,在receive的时候也需要设置queue.Formatter,比如queue.Formatter = new BinaryMessageFormatter();
---------------------------------------------------------------------------
queue.Receive()会删除当前消息。
queue.Peek()不会删除当前消息。
---------------------------------------------------------------------------
///判断queue是不是空了。
static public bool IsQueueEmpty(MessageQueue myQueue)
{
bool isQueueEmpty = false;
// Connect to a queue.
//MessageQueue myQueue = new MessageQueue(".\\myQueue");
try
{
// Set Peek to return immediately.
myQueue.Peek(new TimeSpan(0));
// If an IOTimeout was not thrown, there is a message
// in the queue.
isQueueEmpty = false;
}
catch (MessageQueueException e)
{
if (e.MessageQueueErrorCode ==
MessageQueueErrorCode.IOTimeout)
{
// No message was in the queue.
isQueueEmpty = true;
}
// Handle other sources of MessageQueueException.
}
// Handle other exceptions as necessary.
// Return true if there are no messages in the queue.
return isQueueEmpty;
}
---------------------------------------------------------------------------
queue.GetMessageEnumerator2()方法,为队列中的所有消息创建枚举数对象。可以使用MoveNext方法。
GetMessageEnumerator2 创建队列中所有消息的动态列表。可通过对 GetMessageEnumerator2 返回的 MessageEnumerator 调用 RemoveCurrent,从队列中移除位于枚举数的当前位置的消息。
如果需要队列中消息的静态快照而不是与它们的动态连接,请调用 GetAllMessages。此方法返回 Message 对象的数组,这些对象表示调用该方法时的消息。
GetMessageEnumerator方法已经已过时。GetMessageEnumerator已被否决。应改用 GetMessageEnumerator2。
---------------------------------------------------------------------------
异步读取queue和同步读取queue。
while (true)
{
if (!IsQueueEmpty(queue))
{
IAsyncResult iresult = queue.BeginReceive(new TimeSpan(0), null, (ar) =>
{
lock (_lock)
{
SaveFile(@"c:\pp.txt", ar.IsCompleted.ToString());
}
});
//do something
Message mg = queue.EndReceive(iresult);
SaveFile(@"c:\vv.txt", (string)mg.Body);
}
else
{
Console.WriteLine("No MSG,Waiting 2s");
Thread.Sleep(2000);
Console.WriteLine("2s passed");
}
}
while (true)
{
if (!IsQueueEmpty(queue))
{
Message mg = queue.Receive();
}
else
{
Console.WriteLine("No MSG,Waiting 2s");
Thread.Sleep(2000);
Console.WriteLine("2s passed");
}
}
---------------------------------------------------------------------------
在MSMQ支持事务的情况下,可以使用MessageQueueTransaction
代码如下:
for (int i = 0; i < 2; i++)
{
Message myMessage1 = new Message(DateTime.Now.ToString("yyyy:MM:dd hh:mm:ss fff"), new BinaryMessageFormatter());
queue.Send(myMessage1,MessageQueueTransactionType.Single);
}
MessageQueueTransaction t = new MessageQueueTransaction();
try
{
t.Begin();
queue.Receive(t);
throw null;
t.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
t.Abort();
}
---------------------------------------------------------------------------
游标的使用
queue.CreateCursor();可以创建一个游标,然后在peek的时候,可以将这个游标作为参数传入。
要注意,第一次使用的时候,不能直接指定PeekAction.Next,比如m = queue.Peek(new TimeSpan(1000), c, PeekAction.Next);会出错。首次使用只能m = queue.Peek(new TimeSpan(1000), c, PeekAction.Current);之后游标自动指向下一个,接下来才能使用queue.Peek(new TimeSpan(1000), c, PeekAction.Next);
此处使用游标来计算消息队列里的消息个数
---------------------------------------------------------------------------
Cursor c = queue.CreateCursor();
try
{
Message m = queue.Peek(new TimeSpan(1), c, PeekAction.Current);
if (m != null)
{
count = 1;
Message m1 = new Message();
while (m1 != null)
{
try
{
m1 = queue.Peek(new TimeSpan(1), c, PeekAction.Next);
count++;
}
catch (MessageQueueException ex)
{
//http://msdn.microsoft.com/zh-cn/library/system.messaging.messagequeueerrorcode%28v=VS.80%29.aspx
if (ex.MessageQueueErrorCode!=MessageQueueErrorCode.IOTimeout)
{
throw;
}
m1 = null;
}
}
}
Console.WriteLine(m.Id);
}
catch (MessageQueueException ex1)
{
if (ex1.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
{
throw;
}
}
---------------------------------------------------------------------------
//**************************************************
// Determines whether a queue is empty. The Peek()
// method throws an exception if there is no message
// in the queue. This method handles that exception
// by returning true to the calling method.
//**************************************************
static public bool IsQueueEmpty(MessageQueue myQueue)
{
bool isQueueEmpty = false;
// Connect to a queue.
//MessageQueue myQueue = new MessageQueue(".\\myQueue");
try
{
// Set Peek to return immediately.
myQueue.Peek(new TimeSpan(0));
// If an IOTimeout was not thrown, there is a message
// in the queue.
isQueueEmpty = false;
}
catch (MessageQueueException e)
{
if (e.MessageQueueErrorCode ==
MessageQueueErrorCode.IOTimeout)
{
// No message was in the queue.
isQueueEmpty = true;
}
// Handle other sources of MessageQueueException.
}
// Handle other exceptions as necessary.
// Return true if there are no messages in the queue.
return isQueueEmpty;
}
本文转自cnn23711151CTO博客,原文链接:http://blog.51cto.com/cnn237111/617450 ,如需转载请自行联系原作者