代码:
namespace RabbitMQDemo
{
public partial class Topics : Form
{
private string exchangeName = "topic_logs"; //
private string exchangeType = ExchangeType.Topic;//交换机类型
private readonly static Topics _Topics;
Action<string, TextBox> SetText;
static Topics()
{
_Topics = new Topics();
}
/// <summary>
/// 单例模式
/// </summary>
public static Topics SingleForm { get { return _Topics; } }
private Topics()
{
CheckForIllegalCrossThreadCalls = false;
InitializeComponent();
ReceiveMsg(txtConsumer1);
ReceiveMsg(txtConsumer2);
SetText += OnSetText;
} private void btnSendMsg_Click(object sender, EventArgs e)
{
SendMsg();
} private void SendMsg()
{
string message = txtPublisher.Text;
if (message.Trim().Length <= )
{
MessageBox.Show("请输入要发送的消息");
} var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchangeName,
type: exchangeType); var routingKey = cbBoxRoutingKey.Text;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);
}
} private void ReceiveMsg(TextBox box)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: exchangeName,
type: exchangeType);
var queueName = channel.QueueDeclare().QueueName; if (box.Name == "txtConsumer1")
{//消费者1
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "*.*.one");
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "one.#");
}
else if (box.Name == "txtConsumer2")
{//消费者2
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "*.*.two");
channel.QueueBind(queue: queueName,
exchange: exchangeName,
routingKey: "two.#");
} var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var msg = Encoding.UTF8.GetString(ea.Body); box.Invoke(SetText, msg, box);
}; channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
}
private void OnSetText(string text, TextBox box)
{
box.Text += string.Format("{0}\r\n", text);
} private void Topics_Load(object sender, EventArgs e)
{
string[] dataSource = new string[] { "send.msg.one", "send.msg.two", "one.receive.msg", "two.receive.msg" };
cbBoxRoutingKey.DataSource = dataSource; cbBoxRoutingKey.DropDownStyle = ComboBoxStyle.DropDown;
}
}
}
界面:
大概流程:
生产者绑定一条消息和type='topic'的交换机以及路由键routingKey来发送到队列,消费者绑定同样的交换机和路由键来获取并处理消息
测试结果: