Creating a Virtual Load Balancer for RabbitMQ

Fateh Ali Aamir
4 min readOct 8, 2024

--

Photo by Gary Bendig on Unsplash

What is RabbitMQ

RabbitMQ is an open-source message broker software that facilitates communication between different components of a distributed system. It implements the Advanced Message Queuing Protocol (AMQP), enabling applications to communicate with each other asynchronously by sending and receiving messages. RabbitMQ is used to manage and route messages, ensuring they are delivered to the right place at the right time, even when components are distributed across different servers or are offline for a period.

How to Install RabbitMQ

Step 1: Install Docker
First, ensure that you have Docker installed on your system. You can download Docker from the official Docker website and follow the installation instructions for your operating system (Windows, macOS, or Linux).

Step 2: Pull the RabbitMQ Docker Image
Once Docker is installed, you can pull the official RabbitMQ image from Docker Hub. You can choose the RabbitMQ image with the management plugin enabled, which gives you a web-based UI for managing RabbitMQ.

Open a terminal and run the following command:

docker pull rabbitmq:management

This command pulls the RabbitMQ Docker image with the management plugin included.

Step 3: Run RabbitMQ in a Docker Container
To start RabbitMQ in a Docker container, run the following command:

docker run -d --name rabbitmq-server -p 5672:5672 -p 15672:15672 rabbitmq:management

Explanation of the options used:

  • -d: Run the container in detached mode (in the background).
  • --name rabbitmq-server: Assign the name rabbitmq-server to the container.
  • -p 5672:5672: Map port 5672 (the RabbitMQ server port for messaging) from the container to your host.
  • -p 15672:15672: Map port 15672 (the RabbitMQ management web UI port) from the container to your host.

The Virtual Load Balancer

What we’re building today is an unorthodox take on the load balancer. This isn’t a typical load balancer that makes decisions based on the CPU or memory consumption and instead balances the load within the queues which is quite opposite to how load balancing is traditionally envisioned. In this implementation we present below, we dynamically pick up queues that have at least one message and then we choose the queue with the least amount of messages, something like the Shortest-Job-First paradigm. We want to clear up queues as soon as possible. You can alter the logic of how you want the queues to be served using this template.

The Listing Function

async listQueues(): Promise<ConsumerSetting[]> {
try {
const username = process.env.AMAZON_MQ_USERNAME;
const password = process.env.AMAZON_MQ_PASSWORD;
const host = process.env.AMAZON_MQ_HOST;
const port = process.env.AMAZON_MQ_PORT;

const response = await axios.get(`http://${host}:${port}/api/queues`, {
auth: {
username,
password,
},
});

const formattedQueues: ConsumerSetting[] = response.data.map(
(queue: { name: string; messages: number }) => ({
name: queue.name,
messages: queue.messages,
}),
);

return formattedQueues;
} catch (error: unknown) {
if (axios.isAxiosError(error)) {
this.logger.error('Error fetching queues:', error.message);
} else {
this.logger.error('Unexpected error:', (error as Error).message);
}

return [];
}
}

The first function, listQueues, is responsible for fetching all queues from a RabbitMQ server. It retrieves the necessary credentials (username, password, host, and port) from environment variables and makes an authenticated GET request to the RabbitMQ API to retrieve the list of queues. The response from the API contains data about the queues, including their names and the number of messages in each queue. The function then processes this response by mapping the queue data into an array of ConsumerSetting objects, which hold the name of each queue and the number of messages it contains. If the request fails, it logs the error and returns an empty array.

The Choosing Function

async chooseQueue(channel: Channel): Promise<null> {
let queues: ConsumerSetting[];
let currentQueue: ConsumerSetting | null = null;

while (true) {
this.logger.log('Refreshing queue list...');
queues = await listQueues();

const validQueues = queues
.filter(queue => queue.messages > 1)
.sort((a, b) => a.messages - b.messages);

currentQueue = validQueues.length > 0 ? validQueues[0] : null;

if (currentQueue) {
await this.consumeMessages(channel, currentQueue);
currentQueue = null;
await this.delay(5000);
} else {
this.logger.log('No queues available with messages greater than one.');
await this.delay(5000);
}
}
}

The second function, chooseQueue, continuously monitors the queues to find one with more than one message and selects the queue with the fewest messages for message consumption. It starts by calling the listQueues function to get an updated list of queues, then filters the queues based on the condition that they must have more than one message. The queue with the fewest messages is selected, and the function calls consumeMessages to process the messages from the specified queue. If no valid queues are available, it waits for 5 seconds and retries the operation. This function ensures efficient queue processing by dynamically selecting the least busy queue for consumption.

Conclusion

In conclusion, creating a virtual load balancer for RabbitMQ offers a powerful approach to managing message queues efficiently. By dynamically selecting the queue with the fewest messages, this solution ensures that tasks are processed quickly, reducing latency and preventing bottlenecks. Unlike traditional load balancers that focus on resource allocation like CPU or memory, this method prioritizes clearing queues with the least amount of work, making it an ideal choice for optimizing message-driven systems. With this setup, you can customize the logic to fit your specific needs, ensuring a flexible and scalable RabbitMQ infrastructure.

--

--