RabbitMQ / AMQP: single queue, multiple consumers for same message?

Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

No, not if the consumers are on the same queue. From RabbitMQ’s AMQP Concepts guide:

it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.

This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.

Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

No it’s not, single queue/multiple consumers with each consumer handling the same message ID isn’t possible. Having the exchange route the message onto into two separate queues is indeed better.

As I don’t require too complex routing, a fanout exchange will handle this nicely. I didn’t focus too much on Exchanges earlier as node-amqp has the concept of a ‘default exchange’ allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.

Here’s my fanout exchange, both sending and receiving:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
 
    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })
  
    setInterval( function() {    
      var test_message="TEST "+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

Leave a Comment