代码:
namespace RabbitMQDemo
{
public partial class PublishSubscribe : Form
{
private string exchangeName = "logs";
private string exchangeType = ExchangeType.Fanout;//广播模式
Action<string, TextBox> SetText;
private readonly static PublishSubscribe _PublishSubscribe;
static PublishSubscribe()
{
_PublishSubscribe = new PublishSubscribe();
}
/// <summary>
/// 单例模式
/// </summary>
public static PublishSubscribe SingleForm { get { return _PublishSubscribe; } }
private PublishSubscribe()
{
CheckForIllegalCrossThreadCalls = false;
InitializeComponent();
ReceiveMsg(txtConsumer1);//消费者1
ReceiveMsg(txtConsumer2);//消费者2
SetText += OnSetText;
} private void btnSendMsg_Click(object sender, EventArgs e)
{
SendMsg();
}
/// <summary>
/// 发送消息
/// </summary>
private void SendMsg()
{
string message = txtPublisher.Text;
if (message.Trim().Length <= )
{
MessageBox.Show("请输入要发送的消息");
}
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(
exchange: exchangeName,
type: exchangeType); var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName,
routingKey: "",
basicProperties: null,
body: body);
}
}
/// <summary>
/// 接收消息
/// </summary>
private void ReceiveMsg(TextBox box)
{
try
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel(); //声明交换机
channel.ExchangeDeclare(
exchange: exchangeName,
type: exchangeType); //rabbitmq随机生成队列名 durable=false exclusive=true, autodelete=true
//exchangeName发来的消息在接收端永远都是新的队列在接收
var queueName = channel.QueueDeclare().QueueName; //绑定队列和交换机
//必须绑定了名为exchangeName的queueName队列才能收到消息
channel.QueueBind(
queue: queueName,
exchange: exchangeName,
routingKey: ""); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var msg = Encoding.UTF8.GetString(ea.Body); txtConsumer1.Invoke(SetText, msg, box);
};
channel.BasicConsume(
queue: queueName,
noAck: true,
consumer: consumer);
}
catch (Exception ex)
{
MessageBox.Show(ex.ToString());
}
} private void OnSetText(string text, TextBox box)
{
box.Text += string.Format("{0}\r\n", text);
}
}
}
界面:
大概流程:
生产者发送一条消息通过exchange交换机绑定到2个队列上,两个队列上都会有这一条消息,消费者1,2向队列取出消息并做处理
测试结果: