出处:https://www.cnblogs.com/hanfan/p/9842301.html
网上很多人已经总结的很好了,比如今天看到的这个。https://www.cnblogs.com/LipeiNet/p/9877189.html
我就不总结了,贴点代码。
RabbitMQConnect.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
using System;
using System.IO;
using System.Net.Sockets;
using Polly;
using Polly.Retry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
namespace Common.Tool.RabbitMQ
{ public class RabbitMQConnect
{
static string host = "127.0.0.1" ;
static string UserName = "H" ;
static string password = "H" ;
public readonly static IConnectionFactory _connectionFactory;
IConnection _connection;
object sync_root = new object ();
bool _disposed;
static RabbitMQConnect()
{
//if (host == "localhost")
//{
// _connectionFactory = new ConnectionFactory() { HostName = host };
//}
//else
{
_connectionFactory = new ConnectionFactory() { HostName = host, UserName = UserName, Password = password };
}
}
public bool IsConnected => this ._connection != null && this ._connection.IsOpen && this ._disposed;
public IModel CreateModel()
{
if (! this .IsConnected)
{
this .TryConnect();
}
return this ._connection.CreateModel();
}
public bool TryConnect()
{
lock ( this .sync_root)
{
RetryPolicy policy = RetryPolicy.Handle<SocketException>() //如果我们想指定处理多个异常类型通过OR即可
.Or<BrokerUnreachableException>() //ConnectionFactory.CreateConnection期间无法打开连接时抛出异常
.WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
}); // 重试次数,提供等待特定重试尝试的持续时间的函数,每次重试时调用的操作。
policy.Execute(() =>
{
this ._connection = _connectionFactory.CreateConnection();
});
if ( this .IsConnected)
{
//当连接被破坏时引发。如果在添加事件处理程序时连接已经被销毁对于此事件,事件处理程序将立即被触发。
this ._connection.ConnectionShutdown += this .OnConnectionShutdown;
//在连接调用的回调中发生异常时发出信号。当ConnectionShutdown处理程序抛出异常时,此事件将发出信号。如果将来有更多的事件出现在RabbitMQ.Client.IConnection上,那么这个事件当这些事件处理程序中的一个抛出异常时,它们将被标记。
this ._connection.CallbackException += this .OnCallbackException;
this ._connection.ConnectionBlocked += this .OnConnectionBlocked;
//LogHelperNLog.Info($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
return true ;
}
else
{
// LogHelperNLog.Info("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false ;
}
}
}
void OnConnectionShutdown( object sender, ShutdownEventArgs reason)
{
if ( this ._disposed) return ;
//RabbitMQ连接正在关闭。 尝试重新连接...
//LogHelperNLog.Info("A RabbitMQ connection is on shutdown. Trying to re-connect...");
this .TryConnect();
}
/// <summary>
///
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnCallbackException( object sender, CallbackExceptionEventArgs e)
{
if ( this ._disposed) return ;
// LogHelperNLog.Info("A RabbitMQ connection throw exception. Trying to re-connect...");
this .TryConnect();
}
private void OnConnectionBlocked( object sender, ConnectionBlockedEventArgs e)
{
if ( this ._disposed) return ;
// LogHelperNLog.Info("A RabbitMQ connection is shutdown. Trying to re-connect...");
this .TryConnect();
}
public void Dispose()
{
if ( this ._disposed) return ;
this ._disposed = true ;
try
{
this ._connection.Dispose();
}
catch (IOException ex)
{
//_logger.LogCritical(ex.ToString());
// LogHelperNLog.Error(ex);
}
}
}
} |
RabbitMQSend.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System.Text;
namespace Common.Tool.RabbitMQ
{ public class RabbitMQSend
{
/// <summary>
/// Newtonsoft.Json利用IsoDateTimeConverter处理日期类型
/// </summary>
static IsoDateTimeConverter dtConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" };
static RabbitMQConnect connection= null ;
static RabbitMQSend()
{
connection = new RabbitMQConnect();
}
/// <summary>
/// 添加信息到队列
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="item">信息</param>
/// <param name="queueName">队列名</param>
public static void PushMsgToMq<T>(T item, string queueName)
{
string msg = JsonConvert.SerializeObject(item, dtConverter);
using (global::RabbitMQ.Client.IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: true ,
exclusive: false ,
autoDelete: false ,
arguments: null );
//Construct a completely empty content header for use with the Basic content class.
//构造一个完全空的内容标头,以便与Basic内容类一起使用。
global::RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true ;
byte [] body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "" ,
routingKey: queueName,
basicProperties: properties,
body: body);
}
}
}
} |
RabbitMQReceive.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Common.Tool.RabbitMQ
{ public class RabbitMQReceive : IDisposable
{
IConnection connection = null ;
IModel channel = null ;
public void BindReceiveMqMsg<T>(Func<T, bool > func, Action< string > log, string queueName)
{
this .connection = RabbitMQConnect._connectionFactory.CreateConnection(); //创建与指定端点的连接。
this .channel = this .connection.CreateModel(); //创建并返回新的频道,会话和模型。
this .channel.QueueDeclare(queue: queueName, //队列名称
durable: true , //是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
exclusive: false , //是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景
autoDelete: false , //是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
arguments: null ); //队列中的消息什么时候会自动被删除?
this .channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false ); //(Spec方法)配置Basic内容类的QoS参数。
//第一个参数是可接收消息的大小的 0不受限制
//第二个参数是处理消息最大的数量 1 那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞
//第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。
EventingBasicConsumer consumer = new EventingBasicConsumer( this .channel); //构造函数,它将Model属性设置为给定值。
consumer.Received += (model, bdea) =>
{
byte [] body = bdea.Body;
string message = Encoding.UTF8.GetString(body);
log?.Invoke(message);
T item = JsonConvert.DeserializeObject<T>(message);
bool result = func(item);
if (result)
{
//(Spec方法)确认一个或多个已传送的消息。
this .channel.BasicAck(deliveryTag: bdea.DeliveryTag, multiple: false );
}
};
this .channel.BasicConsume(queue: queueName, noAck: false , consumer: consumer); //The consumer is started with noAck = false(i.e.BasicAck is required), an empty consumer tag (i.e. the server creates and returns a fresh consumer tag), noLocal=false and exclusive=false.
}
public void Dispose()
{
if ( this .channel != null )
{
this .channel.Close();
}
if ( this .connection != null )
{
this .connection.Close();
}
}
}
} |