Problem

You might have been in that situation, you just implemented a RabbitMQ consumer in Node.js and it works fine. Next thing you know, you are on-call and are woken up in the middle of the night because the queue is full and it is not being consumed.

On a closer look, you see the logs of the consumer and it says IllegalOperationError: Channel closed.

IllegalOperationError: Channel closed
at ConfirmChannel.<anonymous> (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/channel.js:159:11)
    at ConfirmChannel.ack (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/channel_model.js:207:10)
    at /Users/sebastianscheibe/Code/rabbitmq-reconnect/src/test_timeout.ts:52:19
    at runNextTicks (node:internal/process/task_queues:60:5)
    at listOnTimeout (node:internal/timers:533:9)
    at processTimers (node:internal/timers:507:7)

You restart the consumer and it works again.

How can automatically reconnect to RabbitMQ after an error, such as timeout or channel closed?

Solution

Luckily, the Node.js library amqp-connection-manager has a built-in reconnect strategy. The library is a drop in replacement for the amqplib but is slightly different initialized. Also is the connection and channel creation not awaited, but has instead a callback mechanism.

Besides changing this, I found that adding listeners for the events such as error/blocking are helpful. In the following, is an example on how you can use it.

import amqp from 'amqp-connection-manager'

const AMQP_USERNAME = process.env.AMQP_USERNAME || 'guest'
const AMQP_PASSWORD = process.env.AMQP_PASSWORD || 'guest'
const AMQP_HOST = process.env.AMQP_HOST || 'localhost'
const AMQP_PORT = process.env.AMQP_PORT || 5672

const EXCHANGE = 'jobs'
const ROUTING_KEY = 'jobs.someKey'
const QUEUE = 'jobs.queue'

async function test () {
    // No need for await, the connection is established in the background
    const connection = amqp.connect(`amqp://${AMQP_USERNAME}:${AMQP_PASSWORD}@${AMQP_HOST}:${AMQP_PORT}`)
    
    // You want to define apropiate handlers for each of these
    connection.on('connect', () => { console.log('Connected!') })
    connection.on('disconnect', params => { console.log('Disconnected: ', params.err) })
    connection.on('connectFailed', params => { console.log('connectFailed: ', params) })
    connection.on('unblocked', () => { console.log('Unblocked') })
    connection.on('blocked', params => { console.log('Blocked, reason: ', params.reason) })

    // No need for await, the channel is established in the background
    // The setup function is called each time the connection is established
    const channel = connection.createChannel({
        setup: async (channel: Channel) => {
            await channel.assertExchange(EXCHANGE, 'direct', { durable: true })
            await channel.assertQueue(QUEUE, { durable: true })
            await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY)

            await channel.consume(QUEUE, async (msg) => {
                console.log('Received message: ', msg.content.toString())
                await new Promise(resolve => setTimeout(resolve, 1000))
                try {
                    channel.ack(msg)
                } catch (e) {
                    console.error('Error while acking message: ', e)
                }
            })
        }
    })
    
    
    // Test sending messages
    let i = 0
    while (true) {
        await new Promise(resolve => setTimeout(resolve, 1000))
        i++
        console.log('Publishing data: ' + i)

        await channel.publish(EXCHANGE, ROUTING_KEY, Buffer.from('Test data: ' + i))
    }
}

test().catch(err => { console.error(err) })

Upon running this code, you will see that the connection is getting re-established and the messages are being published.

Publishing data: 11
Received message:  Test data: 11
Publishing data: 12
Received message:  Test data: 12
Disconnected:  Error: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - Closed via management plugin"
    at Object.accept (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:90:15)
    at Connection.mainAccept (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:486:48)
    at Socket.emit (node:events:513:28)
    at Socket.emit (node:domain:489:12)
    at emitReadable_ (node:internal/streams/readable:590:12)
    at processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 320
}
Error while acking message:  IllegalOperationError: Channel closed
    at ConfirmChannel.<anonymous> (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/channel.js:159:11)
    at ConfirmChannel.ack (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/channel_model.js:207:10)
    at /Users/sebastianscheibe/Code/rabbitmq-reconnect/src/test_timeout2.ts:44:19
    at runNextTicks (node:internal/process/task_queues:60:5)
    at listOnTimeout (node:internal/timers:533:9)
    at processTimers (node:internal/timers:507:7) {
  stackAtStateChange: 'Stack capture: Connection closed: 320 (CONNECTION-FORCED) with message "CONNECTION_FORCED - Closed via management plugin"\n' +
    '    at Object.accept (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:89:15)\n' +
    '    at Connection.mainAccept [as accept] (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:63:33)\n' +
    '    at Socket.go (/Users/sebastianscheibe/Code/rabbitmq-reconnect/node_modules/amqplib/lib/connection.js:486:48)\n' +
    '    at Socket.emit (node:events:513:28)\n' +
    '    at Socket.emit (node:domain:489:12)\n' +
    '    at emitReadable_ (node:internal/streams/readable:590:12)\n' +
    '    at processTicksAndRejections (node:internal/process/task_queues:81:21)'
}
Publishing data: 13
Connected!
Received message:  Test data: 12
Received message:  Test data: 13
Publishing data: 14
Received message:  Test data: 14

Conclusion

In conclusion, replacing the amqplib with the amqp-connection-manager offers functions such as automatic reconnect in case of errors and keeping messages to be sent in memory until the connection has been restored.

With that, everyone who is looking for a production ready RabbitMQ implementation should have a look at amqp-connect-manager.

References