public abstract class MqHelper { protected static IConnection defaultConnection = null; private static ConnectionFactory ConnectionFactory; public static string MqConnection = ConfigurationManager.AppSettings["MqConnection"]; static MqHelper() { ConnectionFactory = new ConnectionFactory(); ConnectionFactory.Uri = new Uri(MqConnection); ConnectionFactory.AutomaticRecoveryEnabled = true; ConnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); defaultConnection = ConnectionFactory.CreateConnection(); } /// <summary> /// 获取默认的消息队列连接 /// 今天的参考资料说:“线程间不能共享通道实例”。 /// </summary> public static IConnection DefaultConnection { get { if (defaultConnection == null) { //读取配置文件 string uri = System.Configuration.ConfigurationManager.AppSettings["MqConnection"]; defaultConnection = CreateConnectionByUri(uri); #region 例子 // factory.Uri ="amqp://guest:guest@localhost:5672/%2f" //factory.HostName = "localhost"; //主机地址 //factory.VirtualHost = "/"; //多租户时 虚拟主机地址 //factory.Port = 5672; //服务端口号 //factory.UserName = "guest"; //用户名 //factory.Password = "guest"; //密码 //factory.CreateConnection(); #endregion } return defaultConnection; } } /// <summary> /// 通过Uri 创建消息队里连接 /// </summary> /// <param name="uri">amqp://guest:guest@localhost:5672/vhost</param> /// <returns></returns> public static IConnection CreateConnectionByUri(string uri) { var factory = new ConnectionFactory(); factory.Uri = new Uri(uri); return factory.CreateConnection(); } /// <summary> /// 创建连接 /// </summary> /// <param name="hostName">消息队列服务器地址</param> /// <param name="vHost">虚拟主机</param> /// <param name="port">端口号</param> /// <param name="userName">用户名, 默认值:guest</param> /// <param name="password">用户密码, 默认值:guest</param> /// <returns></returns> /// <example> /// using (var connection = MqHelper.CreateConnection()) /// using (var channel = connection.CreateModel()) /// { /// ///ToDo:处理 /// } /// </example> public static IConnection CreateConnection(string hostName = "", string virtualHost = "/", int port = 5672, string userName = "", string password = "") { var factory = new ConnectionFactory(); //主机地址 factory.HostName = hostName; //多租户时 虚拟主机地址 factory.VirtualHost = virtualHost; //端口号 factory.Port = port; //用户名密码为空时从配置文件获取 factory.UserName = string.IsNullOrEmpty(userName) ? "guest" : userName; factory.Password = string.IsNullOrEmpty(password) ? "guest" : password; IConnection connection = factory.CreateConnection(); return connection; } /// <summary> /// 创建通道 /// </summary> /// <param name="conn"></param> /// <returns></returns> public static IModel CetateChannel(IConnection conn = null) { conn = conn ?? DefaultConnection; return conn.CreateModel(); } /// <summary> /// 声明交换器 /// </summary> /// <param name="exchangeName">交换器名称</param> /// <param name="type">类型</param> /// <param name="chanel">通道</param> /// <param name="durable">是否持久化,默认: true</param> /// <param name="autoDelete">自动删除:一旦客户端连接断开则自动删除,默认: false</param> /// <param name="arguments">其他参数:如果安装了队列优先级插件则可以设置优先级</param> public static void ExchangeDeclare(string exchangeName, string type, IModel chanel = null, bool durable = true, bool autoDelete = false, IDictionary<string, object> arguments = null) { IModel ch = chanel ?? CetateChannel(); ch.ExchangeDeclare(exchange: exchangeName, //队列名 type: type, // 类型 durable: true, //是否持久化 autoDelete: false, //一旦客户端连接断开则自动删除queue arguments: null); //如果安装了队列优先级插件则可以设置优先级 if (chanel == null) { //销毁自动创建的 通道 ch.Close(); ch.Dispose(); } } /// <summary> /// 声明队列或者查询队列信息 /// 通过配置加载程序中无需手动声明,队列多次声明不会重复创建 /// 但是:队列多次被声明,并且配置与已有的不一致时,会抛出 OperationInterruptedException 异常 /// </summary> /// <param name="queueName">队列名称</param> /// <param name="channel">与消息队列连接的通道</param> /// <param name="durable">是否持久化,默认: true</param> /// <param name="exclusive">排它,默认:false </param> /// <param name="autoDelete">自动删除:一旦客户端连接断开则自动删除,默认: false</param> /// <param name="arguments">其他参数:如果安装了队列优先级插件则可以设置优先级</param> public static QueueDeclareOk QueueDeclare(string queueName, IModel channel, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary<string, object> arguments = null) { return channel.QueueDeclare(queue: queueName, //队列名 durable: true, //是否持久化 exclusive: false, //排它 autoDelete: false, //一旦客户端连接断开则自动删除queue arguments: null); //如果安装了队列优先级插件则可以设置优先级 } /// <summary> /// 发送消息 /// </summary> /// <param name="exchange">交换器</param> /// <param name="routingKey">routingKey</param> /// <param name="message">消息</param> /// <param name="chanel"></param> /// <param name="mandatory"></param> /// <param name="basicProperties"></param> public static void Publish(string exchange, string routingKey, dynamic message, IModel chanel = null, bool mandatory = false, IBasicProperties basicProperties = null) { IModel ch = chanel ?? CetateChannel(); var body = MessageUtility.GetBytes(message); //ch.BasicPublish(exchange: exchange, routingKey: routingKey, mandatory: mandatory, basicProperties: basicProperties, body:body); ch.BasicPublish(exchange, routingKey, mandatory, basicProperties, body); //销毁自动创建的通道 if (chanel == null) { ch.Close(); ch.Dispose(); } } /// <summary> /// 发送消息 /// </summary> /// <param name="exchange">交换器</param> /// <param name="routingKey">routingKey</param> /// <param name="body">消息内容</param> /// <param name="chanel"></param> /// <param name="mandatory"></param> /// <param name="basicProperties"></param> public static void Publish(string exchange, string routingKey, byte[] body, IModel chanel = null, bool mandatory = false, IBasicProperties basicProperties = null) { IModel ch = (chanel != null) ? chanel : CetateChannel(); ch.BasicPublish(exchange: exchange, routingKey: routingKey, mandatory: mandatory, basicProperties: basicProperties, body: body); //销毁自动创建的通道 if (chanel == null) { ch.Close(); ch.Dispose(); } } }
public static class MessageUtility { /// <summary> /// 序列化: 将对象转二进制数组 /// </summary> /// <param name="obj"></param> /// <returns></returns> public static byte[] GetBytes(dynamic obj) { if (obj == null) { return null; } string json = JsonConvert.SerializeObject(obj); return Encoding.UTF8.GetBytes(json); } /// <summary> /// 反序列化:二进制数组转类 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="messageBody"></param> /// <returns></returns> public static T ReadBytes<T>(byte[] messageBody) { if (messageBody == null || messageBody.Length < 1) { return default(T); } var json = Encoding.UTF8.GetString(messageBody); if (typeof(T) == typeof(string)) { return (T)(object)(json); } T obj = JsonConvert.DeserializeObject<T>(json); return obj; } }
public static class MqService { public static Dictionary<string, IModel> Channels = new Dictionary<string, IModel>(); static Dictionary<string, string> Tags = new Dictionary<string, string>(); /// <summary> /// 注册消息队列接收 /// </summary> /// <param name="routKey">路由</param> /// <param name="OnReceived">收到后的消息处理</param> public static IModel RegisterConsume(string routKey, EventHandler<BasicDeliverEventArgs> OnReceived, bool autoAck = true) { IModel sysChannel = Channels.ContainsKey(routKey) ? Channels[routKey] : null; if (sysChannel == null || sysChannel.IsClosed) { sysChannel = MqHelper.CetateChannel(); EventingBasicConsumer syslConsumer = new EventingBasicConsumer(sysChannel); syslConsumer.Received += OnReceived; var tag = sysChannel.BasicConsume(routKey, autoAck, syslConsumer); if (!Channels.ContainsKey(routKey)) { Channels.Add(routKey, sysChannel); Tags.Add(routKey, tag); } else { Channels[routKey] = sysChannel; Tags[routKey] = tag; } } return sysChannel; } /// <summary> /// 消息接收以后ACK /// </summary> /// <param name="e"></param> /// <param name="routKey"></param> public static void Ack(this BasicDeliverEventArgs e, string routKey) { if (!Channels.ContainsKey(routKey)) { return; } IModel sysChannel = Channels[routKey]; sysChannel.BasicAck(e.DeliveryTag, false); } ////事件处理的例子 //private static void SystemLog_OnReceived(object sender, BasicDeliverEventArgs e) //{ // try // { // IModel sysChannel = Channels[routKey]; // channel.basicAck(e.DeliveryTag, false); // SystemLog log = MessageUtility.ReadBytes<SystemLog>(e.Body); // int val = dataService.AddLog(log); // if (val < 1) // { // LogManager.GetLogger(Assembly.GetExecutingAssembly().GetName().Name).Info("忽略系统日志:" + JsonConvert.SerializeObject(log)); // } // } // catch (Exception ex) // { // LogManager.GetLogger(Assembly.GetExecutingAssembly().GetName().Name).Error(ex); // } //} /// <summary> /// 关闭 消息队列监听 /// </summary> /// <param name="routKey"></param> public static void UnRegister(string routKey) { IModel chanel = null; try { if (!Channels.ContainsKey(routKey) && !Tags.ContainsKey(routKey)) { return; } chanel = Channels[routKey]; var tag = Tags[routKey]; chanel.BasicCancel(tag); Channels.Remove(routKey); Tags.Remove(routKey); } finally { chanel?.Close(); } } }
调用:
//读取配置文件
public string RoutKey = ConfigurationManager.AppSettings["MQ_Info"];
//接收MQ消息
MqService.RegisterConsume(RoutKey, OnReceived, false);
private void OnReceived(object sender, BasicDeliverEventArgs e)
{
try
{
//把消息进行格式转换
var model = MessageUtility.ReadBytes<Model>(e.Body);
//确认
MqService.Ack(e, RoutKey);
}
catch (Exception ex)
{
}
}