The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
1. creater a publisher to create a queue
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 20; i++) { var body = Encoding.UTF8.GetBytes(message + i.ToString()); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message + i.ToString()); } }
2. create two consumers
var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" Consumer1 Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(1000); Console.WriteLine(" [x] Done"); // Note: it is possible to access the channel via // ((EventingBasicConsumer)sender).Model here channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
3. attentions
4. Testing (1th sleep 2 seconds, 2th sleep 1 second )
By default, RabbitMQ will send items to consumers in a average way. Of cause based on the processing speed.