2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置

公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务。那么到后期的话,登录服务器之后,全是

一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端

该案例的接收端,源自网上的代码片段 片内容,做了部分修改之后使用

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

日志中心的 功能要求使用注入解耦,所以,这里我也解耦了,如果日至那边使用的是 autofac,我只里使用的MEF实现注入 所以定义了相关的接口对象

IMQContextFactory:

using Ecostar.MQLogger.Core.Infrastructure;
using System; namespace Ecostar.MQConsumer.Core.Infrastructure
{
/// <summary>
/// 仅仅只有sender使用到
/// </summary>
public interface IMQContextFactory
{
MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog);
}
}

对应的实现类:MQContextFactory

using Ecostar.MQLogger.Core.Infrastructure;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Security.Cryptography; namespace Ecostar.MQConsumer.Core.Infrastructure
{
/// <summary>
/// 仅仅只有sender使用到
/// </summary>
[Export(typeof(IMQContextFactory))]
public class MQContextFactory : IMQContextFactory
{
/// <summary>
/// 上下文字典
/// </summary>
private static readonly Dictionary<string, MQContext> Contexts = new Dictionary<string, MQContext>(); /// <summary>
/// 上下文操作锁字典,只创建一次
/// </summary>
public static readonly Dictionary<string, object> contextLockers = new Dictionary<string, object>(); /// <summary>
/// 更新上下文操作锁字典时的锁,只创建一次
/// </summary>
private static readonly object contextLockersLocker = new object(); /// <summary>
/// 获取指定的上下文
/// </summary>
/// <param name="mqUri">mq地址</param>
/// <param name="toLog">日志记录</param>
/// <returns>上下文对象</returns>
public MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog)
{
var key = MD5Encrypt(mqUri);
var locker = GetFactoryLocker(key); lock (locker)
{
MQContext context;
if (!Contexts.TryGetValue(key, out context))
{
Guid contextId = Guid.NewGuid();
string logHeader = string.Format("[{0}]", contextId.ToString()); context = new MQContext()
{
ReceiveQueueName = "Logs",
Id = contextId
};
Console.WriteLine(logHeader + " 初始化发送上下文完毕"); // 获取连接
context.SendConnection = CreateConnection(mqUri);
context.SendConnection.AutoClose = false;
context.SendConnection.ConnectionShutdown += (o, e) => Console.WriteLine(" RabbitMQ错误,连接被关闭了:" + e.ReplyText);
Console.WriteLine(logHeader + " 创建连接完毕", LogLevel.Trace); // 获取通道
context.SendChannel = CreateChannel(context.SendConnection);
Console.WriteLine(logHeader + " 创建通道完毕", LogLevel.Trace); Contexts.Add(key, context); } return context;
}
} #region 私有方法
/// 创建连接
/// </summary>
/// <param name="mqUrl"></param>
/// <returns></returns>
private static IConnection CreateConnection(string mqUrl)
{
const ushort heartbeta = ; var factory = new ConnectionFactory()
{
Uri = mqUrl,
RequestedHeartbeat = heartbeta,
AutomaticRecoveryEnabled = true
}; return factory.CreateConnection();
} /// <summary>
/// 创建通道
/// </summary>
/// <param name="connection"></param>
/// <returns></returns>
private static IModel CreateChannel(IConnection connection)
{
if (connection != null)
return connection.CreateModel();
return null;
} /// <summary>
/// 获取上下文操作锁
/// </summary>
/// <param name="contextKey">上下文工厂key</param>
/// <returns></returns>
private static object GetFactoryLocker(string contextKey)
{
lock (contextLockersLocker)
{
object locker;
if (!contextLockers.TryGetValue(contextKey, out locker))
{
locker = new object();
contextLockers.Add(contextKey, locker);
} return locker;
}
} /// <summary>
/// 获取字符的MD5值
/// </summary>
/// <param name="str"></param>
/// <returns></returns>
private static string MD5Encrypt(string str)
{
MD5 md5 = new MD5CryptoServiceProvider();
byte[] result = md5.ComputeHash(System.Text.Encoding.Default.GetBytes(str));
return System.Text.Encoding.Default.GetString(result);
}
#endregion }
}

注视我写的很明白,这部分的使用 是  生产者使用的类,也就是  发送消息


下面是消费者:

IReceiver.cs:

namespace Ecostar.MQConsumer.Core
{
public interface IReceiver
{
/// <summary>
/// 初始化接收程序
/// </summary>
/// <param name="mqUrls"></param>
void InitialReceive(MQReceiverParam receiverParams); }
}

对应的实现类:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Threading; namespace Ecostar.MQConsumer.Core
{
[Export(typeof(IReceiver))]
public class Receiver : IReceiver
{
private MQContext _context;
private const ushort Heartbeta = ;
private string _queueName;
private bool _isAutoAck;
private List<string> _mqUrls;
private Func<byte[], bool> _processFunction;
private Action<string> _mqActionLogFunc;
private MQConnectionFactory _ConnectionFactoryParams; public void InitialReceive(MQReceiverParam receiverParams)
{
_queueName = receiverParams._queueName;
_isAutoAck = receiverParams._isAutoAck;
_mqUrls = receiverParams._mqUrls;
_processFunction = receiverParams._processFunction;
_mqActionLogFunc = receiverParams._mqActionLogFunc;
_ConnectionFactoryParams = receiverParams.ConnectionFactoryParam;
receiverParams._mqUrls.ForEach(url => InitReceive(_queueName, _isAutoAck, url)); } /// <summary>
/// 初始化某个节点的接收
/// </summary>
private void InitReceive(string queueName, bool isAutoAck, string mqUrl)
{
Guid contextId = Guid.NewGuid();
string logHeader = string.Format("[{0}, {1}]", queueName, contextId.ToString());
try
{
_context = new MQContext()
{
Id = contextId,
ReceiveQueueName = queueName,
IsAutoAck = isAutoAck,
ReceiveConnection = new ConnectionFactory()
{
HostName = _ConnectionFactoryParams.HostName,
UserName = _ConnectionFactoryParams.UserName,
Password = _ConnectionFactoryParams.Password,
VirtualHost = _ConnectionFactoryParams.VirtualHost
}.CreateConnection()
}; // 监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性
_context.ReceiveConnection.ConnectionShutdown += (o, e) =>
{
_mqActionLogFunc(" RabbitMQ错误,连接被关闭了:" + e.ReplyText);
};
// 获取通道
_context.ReceiveChannel = _context.ReceiveConnection?.CreateModel(); // 创建事件驱动的消费者
var consumer = new EventingBasicConsumer(_context.ReceiveChannel);
consumer.Received += (o, e) =>
{
try
{
// 接受数据处理逻辑
// e.Body
var result = _processFunction(e.Body); if (!isAutoAck)
{
if (!result)
{
Thread.Sleep(); // 未能处理完成的话,将消息重新放入队列头
_context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
_mqActionLogFunc(" 消息未处理成功,将消息重新放入队列头");
}
else if (!_context.ReceiveChannel.IsClosed)
{
// 处理成功并且通道未关闭时ack回去,删除队列中的消息
_context.ReceiveChannel.BasicAck(e.DeliveryTag, false);
_mqActionLogFunc(" 消息处理成功,发送Ack完毕");
}
}
}
catch (Exception ex)
{
Thread.Sleep();
if (!isAutoAck)
{
// 将消息重新放入队列头
_context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
}
_mqActionLogFunc(" 处理数据发生异常:" + ex.Message + ex.StackTrace);
}
}; // 一次只获取一条消息
_context.ReceiveChannel.BasicQos(, , false);
_context.ReceiveChannel.BasicConsume(_context.ReceiveQueueName, _context.IsAutoAck, consumer); _mqActionLogFunc(" 初始化队列完毕");
}
catch (Exception ex)
{
_mqActionLogFunc(" 初始化RabbitMQ出错:" + ex.Message + ex.StackTrace);
}
} }
}

使用到的参数 MQReceiverParam:

using System;
using System.Collections.Generic; namespace Ecostar.MQConsumer.Core
{
/// <summary>
/// 消费者入参
/// </summary>
public class MQReceiverParam
{
public string _queueName { get; set; }
public bool _isAutoAck { get; set; }
public List<string> _mqUrls { get; set; }
public Func<byte[], bool> _processFunction { get; set; }
public Action<string> _mqActionLogFunc { get; set; }
public MQConnectionFactory ConnectionFactoryParam { get; set; } } /// <summary>
/// 服务配置
/// </summary>
public class MQConnectionFactory
{
public string HostName {get;set;}
public string UserName {get;set;}
public string Password {get;set;}
public string VirtualHost {get;set;}
} }

重力要说明一下:

Func<byte[], bool> _processFunction { get; set; }
Action<string> _mqActionLogFunc { get; set; }

参数的对象中,有这么两个委托,原因是,如果你在学习 rabbitmq得这块内容的时候,你会发现,网上很多案例,以及官方提供的案例,写法都比较简单,而且,都是讲业务逻辑和   rabbitmq的消费的这跨功能 耦合到了一起

如果其他地方使用的时候,还是重复,创建  connection   创建queue,绑定,,,,,等相关动作,代码不仅不美观,而且显得繁琐,啰嗦,所以,这两个委托类型的参数,起到了接偶的作用,似的 具体的业务逻辑和 rabbitmq的消费逻辑 分离

使用如下:

(我是在窗体上直接放置了一个  richTextBox的控件,讲接收的信息打印出来,)

using Ecostar.MQConsumer.Core;
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.ComponentModel.Composition.Hosting;
using System.IO;
using System.Reflection;
using System.Text;
using System.Windows.Forms; namespace Ecostar.MQConsumer.UI
{
[Export]
public partial class MQMainForm : Form
{ #region Fields
private static CompositionContainer _container;//MEF 部件组合 管理
[Import]
public IReceiver Receiver { get; set; } #endregion public MQMainForm()
{
InitializeComponent();
} private void MQMainForm_Load(object sender, EventArgs e)
{
InitForm();
InitialListener();
} public void InitForm()
{
AggregateCatalog catalog = new AggregateCatalog();
catalog.Catalogs.Add(new DirectoryCatalog(Directory.GetCurrentDirectory()));
catalog.Catalogs.Add(new AssemblyCatalog(Assembly.GetExecutingAssembly()));
_container = new CompositionContainer(catalog);
} /// <summary>
/// 初始化监听程序
/// </summary>
void InitialListener()
{
MQMainForm form;
try
{
form = _container.GetExportedValue<MQMainForm>();
}
catch (Exception ex)
{ throw;
}
form.Receiver.InitialReceive(new MQReceiverParam()
{
_queueName = "testQueueName",
_isAutoAck = false,
_mqUrls = new List<string>() { "amqp://127.0.0.1:5672/" },
_processFunction = (buffer) =>
{
string receiveMsg = Encoding.UTF8.GetString(buffer);
this.rtb_receive.Invoke(new Action(() => { { this.rtb_receive.Text += receiveMsg + "\r\n"; } }));
return true; },
_mqActionLogFunc = (msg) =>
{
this.rtb_receive.Invoke(new Action(() =>
{
this.rtb_receive.Text += "====MQ Action====" + msg + "\r\n";
}));
},
ConnectionFactoryParam = new MQConnectionFactory()
{
HostName = "127.0.0.1",
UserName = "CC",
Password = "123qwe",
VirtualHost = "/"
}
});
} }
}

其中的 testQueueName,是客户端发送的 消息列队名称,也就是queue的名称,你也可以(如果是测试),在mq服务器上 人为的添加这个queue名称之后再测试。

这样一来,_processFunction 这个用于消费的方法,可以,写任意的处理方式,比如打印到控制台,输出到床体 控件显示,写入到日志,写入到数据库等等。

而且中的 _mqActionLogFunc,适用于记录mq的消费过程的日志,比如 mq消费操作执行过程中发生异常 ,那么直接找mq的问题即可。


截图中还一个:MQContext类,这是一个部分类,为了方便区分,我把消费者,生产者  公共部分分别放置到了三个部分类中:

MQContext.Consumer.cs

using RabbitMQ.Client;

namespace Ecostar.MQConsumer.Core
{
/// <summary>
/// MQ 消费者
/// </summary>
public partial class MQContext
{
// <summary>
/// 用户监听的Connection
/// </summary>
public IConnection ReceiveConnection { get; set; } /// <summary>
/// 用于监听的Channel
/// </summary>
public IModel ReceiveChannel { get; set; } /// <summary>
/// 监听队列名
/// </summary>
public string ReceiveQueueName { get; set; }
}
}

MQContext.cs

namespace Ecostar.MQConsumer.Core
{
/// <summary>
/// MQ 生产者消费者公共部分
/// </summary>
public partial class MQContext
{
/// <summary>
/// mq地址
/// </summary>
public string MQUrl { get; set; } }
}

MQContext.Producer.cs

using RabbitMQ.Client;
using System; namespace Ecostar.MQConsumer.Core
{
/// <summary>
/// MQ 生产者
/// </summary>
public partial class MQContext
{
/// <summary>
/// 用于发送消息的Connection
/// </summary>
public IConnection SendConnection { get; set; } /// <summary>
/// 用于发送消息到Channel
/// </summary>
public IModel SendChannel { get; set; } /// <summary>
/// 发送的Exchange
/// </summary>
public string Exchange { get; set; } /// <summary>
/// 是否启用自动删除
/// </summary>
public bool IsAutoAck { get; set; }
/// <summary>
/// 上下文ID
/// </summary>
public Guid Id { get; set; } /// <summary>
/// 路由
/// </summary>
public string RouteKey { get; set; }
/// <summary>
/// 是否正在运行,默认false
/// </summary>
public bool IsRunning { get; set; } /// <summary>
/// 回收此上下文
/// </summary>
public void Recovery()
{
IsRunning = false;
}
}
}

到此,这个简单的消费案例就完成了。


下面的是 生产者,(发送消息的案例),为了造数据,所以写的随意些:(一个控制台程序),nuget引入  rabbitmq.client ,指令: install-package rabbitmq.Client

using RabbitMQ.Client;
using System;
using System.Text; namespace RubbitMQClient
{
/// <summary>
/// 1.Routing (按路线发送接收)
/// </summary>
public class RoutingType
{
public static void RoutingProducer(string[] arguments)
{
arguments = new string[] { "","" }; string serverAddress = "127.0.0.1";
string account = "CC";
string password = "123qwe"; ConnectionFactory factory = new ConnectionFactory()
{
HostName = serverAddress,
UserName = account,
Password = password,
VirtualHost = "/"
};
IConnection conn = factory.CreateConnection();
for (int i = ; i < ; i++)
{
arguments[] = i.ToString();
string queueName = "testQueueName"; using (var channel = conn.CreateModel())
{
//---1.声明durable Exchange 和 Queue--------------------------------------------------------------------------------------------------------------
channel.ExchangeDeclare(Consts.EXCHANGE_NAME_DIRECT, "direct", durable: true, autoDelete: false, arguments: null);
//arguments = new[] { "12321", "32432" };
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queueName, Consts.EXCHANGE_NAME_DIRECT, routingKey: "");//queueName //---------------------------------------------------------------------------------------------------------------------- //---2.发布持久化消息到队列 ---------------------------------------------------------------------------------------------------
var props = channel.CreateBasicProperties();
//props.Priority = 3;//控制优先级
props.DeliveryMode = ;//将信息也持久化
props.Persistent = true;///SetPersistent方式提示已经过时,建议使用当前方式
string severity = getSeverity(arguments);
string message = getMessage(arguments);
byte[] buffer = Encoding.UTF8.GetBytes(message); channel.BasicPublish(Consts.EXCHANGE_NAME_DIRECT, routingKey: "", basicProperties: props, body: buffer); ////---消费消息
//BasicGetResult msgResponse = channel.BasicGet(queueName, noAck: true); //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
//Console.WriteLine(msgBody); //3.1(发布方式还有一种 基于推送的事件订阅 )第二种方式(使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息)
//var consumer = new QueueingBasicConsumer(channel);
//channel.BasicConsume(queueName, noAck: true, consumer: consumer);
//var msgResponse = consumer.Queue.Dequeue(); //blocking
//var msgBody = Encoding.UTF8.GetString(msgResponse.Body); } }
conn.Close();
Console.ReadKey();
} private static String getSeverity(String[] strings)
{
if (strings.Length < )
return "routing(direct) type info";
return strings[];
} private static String getMessage(String[] strings)
{
if (strings.Length < )
return "routing(direct) --> Hello World!";
return joinStrings(strings, " ", );
} private static String joinStrings(String[] strings, String delimiter, int startIndex)
{
return strings[].ToString();
} }
}

抽时间将上面涉及到的   mq一些相关  属性(常用的API的),在总结下,主要是零散,其实东西很简单,如何更好的,更灵活的组合到一起,是这个插件使用的 最主要一点。

上一篇:charles系列破解激活办法(最高charles4.2都可以激活)


下一篇:IDDD 实现领域驱动设计-理解领域和子域