获取源码
客户端代码:
namespace RabbitMQDemo
{
public partial class RPC : Form
{
private readonly static RPC _RPC;
Action<string, TextBox> SetText;
static RPC()
{
_RPC = new RPC();
}
/// <summary>
/// 单例模式
/// </summary>
public static RPC SingleForm { get { return _RPC; } }
private RPC()
{
CheckForIllegalCrossThreadCalls = false;
InitializeComponent();
} private void btnSendMsg_Click(object sender, EventArgs e)
{//RPC客户端发出请求
string message = txtPublisher.Text;
if (message.Trim().Length <= )
{
MessageBox.Show("请输入要发送的消息");
}
RpcClient client = new RpcClient();
var response = client.Call(message);
txtRpcClient.Text += string.Format("{0}\r\n", response);
client.Close();
} /// <summary>
/// 客户端类
/// </summary>
private class RpcClient
{
#region 参数
/// <summary>
/// rabbitmq连接
/// </summary>
private readonly IConnection connection;
/// <summary>
/// 通道
/// </summary>
private readonly IModel channel;
/// <summary>
/// 客户端关联的队列
/// </summary>
private readonly string replyQueueName;
/// <summary>
/// 消费者
/// </summary>
private readonly EventingBasicConsumer consumer;
//private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>(); private readonly BlockingCollection<string> resQueue = new BlockingCollection<string>();
/// <summary>
/// 消息属性
/// </summary>
private readonly IBasicProperties props;
#endregion
/// <summary>
/// 构造函数
/// </summary>
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
//关联response,request和replyQueueName
var correlationID = Guid.NewGuid().ToString();
props.CorrelationId = correlationID;
props.ReplyTo = replyQueueName; consumer.Received += (model, ea) =>
{
var response = Encoding.UTF8.GetString(ea.Body);
//确定返回的响应是这个请求发出的
if (ea.BasicProperties.CorrelationId == correlationID)
resQueue.Add(response);
};
} public string Call(string msg)
{
var msgBytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: msgBytes); channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
noAck: true); return resQueue.Take();
} public void Close()
{
connection.Close();
}
}//class
}
}
服务端代码:
namespace RpcServer
{
public partial class RpcServer : Form
{
private readonly static RpcServer _RpcServer;
Action<string, TextBox> SetText;
static RpcServer()
{
_RpcServer = new RpcServer();
}
/// <summary>
/// 单例模式
/// </summary>
public static RpcServer SingleForm { get { return _RpcServer; } }
private RpcServer()
{
CheckForIllegalCrossThreadCalls = false;
InitializeComponent();
ReceiveMsg(txtRpcServer);//服务端
SetText += OnSetText;
} /// <summary>
/// 服务端接收消息
/// </summary>
private void ReceiveMsg(TextBox box)
{
try
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel(); //声明队列
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null); //每个消费者最多消费一条消息,没返回消息确认之前不再接收消息
channel.BasicQos(, , false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
var msg = Encoding.UTF8.GetString(body);
//服务端显示内容
box.Invoke(SetText, msg, box);
response = "我将给你回复:已收到消息-" + msg; var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "",
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
//手动向rabbitmq发送消息确认
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
};
channel.BasicConsume(queue: "rpc_queue",
noAck: false,//手动确认消息
consumer: consumer);
}
catch (Exception ex)
{
MessageBox.Show(ex.ToString());
}
} private void OnSetText(string text, TextBox box)
{
box.Text += string.Format("{0}\r\n", text);
}
}
}
界面:
大概流程:
客户端模拟发送一个请求到队列,服务端从队列消费消息并模拟发送一个响应到队列,客户端消费该消息(新建2个winform程序测试,一个客户端,一个服务端)
vs同时启动两个winform程序:鼠标点击解决方案-右键属性-多个启动项目-操作改为启动-确定-即可
测试结果: