Skip to main content

RabbitMQ

RabbitMQ

RabbitMQ is a robust message broker that supports multiple messaging protocols, enabling reliable communication between distributed applications.

Overview

  • Versions: 4.0, 3.13, 3.12
  • Cluster Support: ❌ No (Single node only)
  • Use Cases: Message queuing, async tasks, event streaming, microservices communication
  • Features: Multiple protocols, routing, federation, reliability

Key Features

  • Multiple Protocols: AMQP, MQTT, STOMP, HTTP
  • Flexible Routing: Direct, topic, fanout, and header exchanges
  • Message Reliability: Persistent messages, acknowledgments, publisher confirms
  • High Availability: Mirrored queues and quorum queues
  • Management UI: Built-in web interface for monitoring and management
  • Plugin System: Extensible with plugins
  • Dead Letter Exchanges: Handle failed message processing
  • Priority Queues: Process high-priority messages first
  • Delayed Messages: Schedule message delivery

Resource Tiers

TierCPUMemoryDiskBest For
Small0.51GB10GBDevelopment, testing
Medium12GB25GBSmall production apps
Large24GB50GBProduction workloads
XLarge48GB100GBHigh-throughput messaging

Creating a RabbitMQ Add-on

  1. Navigate to Add-onsCreate Add-on
  2. Select RabbitMQ as the type
  3. Choose a version (4.0, 3.13, or 3.12)
  4. Configure:
    • Label: Descriptive name (e.g., "Task Queue")
    • Description: Purpose and notes
    • Environment: Development or Production
    • Resource Tier: Based on your workload requirements
  5. Configure backups:
    • Schedule: Daily recommended for production
    • Retention: 7+ days for production
  6. Click Create Add-on

Connection Information

After deployment, connection details are available in the add-on details page and automatically injected into your apps via STRONGLY_SERVICES.

Connection String Format

amqp://username:password@host:5672/vhost

Accessing Connection Details

import os
import json
import pika

# Parse STRONGLY_SERVICES
services = json.loads(os.environ.get('STRONGLY_SERVICES', '{}'))

# Get RabbitMQ add-on connection
rabbitmq_addon = services['addons']['addon-id']

# Connect using connection string
params = pika.URLParameters(rabbitmq_addon['connectionString'])
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Or connect using individual parameters
credentials = pika.PlainCredentials(
rabbitmq_addon['username'],
rabbitmq_addon['password']
)
params = pika.ConnectionParameters(
host=rabbitmq_addon['host'],
port=rabbitmq_addon['port'],
virtual_host=rabbitmq_addon.get('vhost', '/'),
credentials=credentials
)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Declare a queue
channel.queue_declare(queue='tasks', durable=True)

# Publish a message
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)

connection.close()

Core Concepts

Exchanges

Exchanges receive messages from publishers and route them to queues based on rules.

Exchange Types:

  • Direct: Route to queues based on exact routing key match
  • Topic: Route to queues based on pattern matching (wildcards)
  • Fanout: Route to all bound queues (broadcast)
  • Headers: Route based on message header attributes
# Declare exchange
channel.exchange_declare(
exchange='logs',
exchange_type='fanout',
durable=True
)

# Bind queue to exchange
channel.queue_bind(
queue='queue_name',
exchange='logs',
routing_key=''
)

Queues

Queues store messages until they are consumed by applications.

# Declare durable queue
channel.queue_declare(
queue='tasks',
durable=True, # Survive broker restart
exclusive=False, # Not restricted to this connection
auto_delete=False # Don't delete when consumer disconnects
)

Messages

Messages contain data and metadata.

# Publish with properties
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps({'task': 'process_data', 'data': {...}}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
content_encoding='utf-8',
priority=5, # 0-9 priority
expiration='60000', # TTL in milliseconds
message_id='msg-123',
timestamp=int(time.time()),
headers={'x-custom': 'value'}
)
)

Common Patterns

Work Queue (Task Distribution)

Distribute time-consuming tasks among multiple workers.

Producer:

import pika
import json

connection = pika.BlockingConnection(pika.URLParameters(url))
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

task = {'action': 'send_email', 'to': 'user@example.com'}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task),
properties=pika.BasicProperties(
delivery_mode=2,
)
)

connection.close()

Consumer (Worker):

import pika
import json
import time

def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Processing: {task}")

# Simulate work
time.sleep(5)

print("Done")
ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.URLParameters(url))
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

# Fair dispatch - don't give worker more than 1 task at a time
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
queue='tasks',
on_message_callback=callback
)

print('Waiting for messages...')
channel.start_consuming()

Pub/Sub (Fanout Exchange)

Broadcast messages to multiple consumers.

Publisher:

channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

message = "System event occurred"
channel.basic_publish(
exchange='logs',
routing_key='', # Ignored for fanout
body=message
)

Subscriber:

channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

# Create exclusive queue (deleted when connection closes)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

def callback(ch, method, properties, body):
print(f"Received: {body.decode()}")

channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)

channel.start_consuming()

Routing (Direct Exchange)

Route messages based on routing key.

Publisher:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)

severities = ['info', 'warning', 'error']
severity = 'error'
message = "Critical error occurred"

channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message
)

Consumer:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind to multiple routing keys
for severity in ['error', 'warning']:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)

def callback(ch, method, properties, body):
print(f"[{method.routing_key}] {body.decode()}")

channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)

channel.start_consuming()

Topics (Pattern Matching)

Route messages based on wildcard patterns.

# Declare topic exchange
channel.exchange_declare(exchange='topic_logs', exchange_type='topic', durable=True)

# Publish with routing key pattern
routing_key = 'user.login.success'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body='User logged in successfully'
)

# Consumer binds with wildcards
# * matches exactly one word
# # matches zero or more words
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Listen to all user events
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*.*')

# Or listen to all login events for any entity
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.login.*')

# Or listen to all error events
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.*.error')

RPC (Request/Reply)

Implement RPC pattern using RabbitMQ.

RPC Server:

def fib(n):
if n <= 1:
return n
return fib(n - 1) + fib(n - 2)

def on_request(ch, method, properties, body):
n = int(body)
response = fib(n)

ch.basic_publish(
exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id
),
body=str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print('Awaiting RPC requests')
channel.start_consuming()

RPC Client:

import uuid

class RpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.URLParameters(url))
self.channel = self.connection.channel()

result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)

self.response = None
self.corr_id = None

def on_response(self, ch, method, properties, body):
if self.corr_id == properties.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())

self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n)
)

while self.response is None:
self.connection.process_data_events()

return int(self.response)

# Usage
rpc = RpcClient()
result = rpc.call(10)
print(f"fib(10) = {result}")

Advanced Features

Dead Letter Exchange

Handle failed messages by routing them to a dead letter exchange.

# Declare main queue with DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed_tasks'
}
)

# Declare dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='failed_tasks', durable=True)
channel.queue_bind(exchange='dlx', queue='failed_tasks', routing_key='failed_tasks')

# Messages are moved to DLX when:
# - Message is rejected with requeue=False
# - Message TTL expires
# - Queue length limit is exceeded

Message TTL (Time To Live)

Set expiration time for messages.

# Per-message TTL
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Expires in 60 seconds',
properties=pika.BasicProperties(
expiration='60000' # milliseconds
)
)

# Queue-level TTL
channel.queue_declare(
queue='tasks',
arguments={
'x-message-ttl': 60000 # All messages expire in 60 seconds
}
)

Priority Queues

Process high-priority messages first.

# Declare priority queue
channel.queue_declare(
queue='priority_tasks',
arguments={
'x-max-priority': 10 # Max priority level
}
)

# Publish with priority
channel.basic_publish(
exchange='',
routing_key='priority_tasks',
body='High priority task',
properties=pika.BasicProperties(
priority=9
)
)

Delayed Messages

Schedule message delivery (requires rabbitmq_delayed_message_exchange plugin).

# Declare delayed exchange
channel.exchange_declare(
exchange='delayed',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)

# Publish with delay
channel.basic_publish(
exchange='delayed',
routing_key='tasks',
body='Delayed message',
properties=pika.BasicProperties(
headers={'x-delay': 5000} # 5 second delay
)
)

Backup & Restore

RabbitMQ add-ons use rabbitmqctl export_definitions for backups, exporting configuration and definitions.

Backup Configuration

  • Tool: rabbitmqctl export_definitions
  • Format: .json
  • Includes: Exchanges, queues, bindings, vhosts, users, permissions
  • Storage: AWS S3 (s3://strongly-backups/backups/<addon-id>/)

Manual Backup

  1. Go to add-on details page
  2. Click Backup Now
  3. Monitor progress in job logs
  4. Backup saved as backup-YYYYMMDDHHMMSS.json
Backup Scope

RabbitMQ backups include configuration (exchanges, queues, bindings) but not message contents. For message persistence, use durable queues and persistent messages.

Scheduled Backups

Configure during add-on creation or in settings:

  • Daily backups: Recommended for production
  • Retention: 7-14 days minimum
  • Custom cron: For specific schedules

Restore Process

  1. Navigate to Backups tab
  2. Select backup from list
  3. Click Restore
  4. Confirm (add-on will stop temporarily)
  5. Definitions imported using rabbitmqctl import_definitions
  6. Add-on automatically restarts

Monitoring

Monitor your RabbitMQ add-on through the Strongly platform:

  • CPU Usage: Track CPU utilization
  • Memory Usage: Monitor memory consumption
  • Disk Space: Watch disk utilization
  • Message Rate: Messages published/consumed per second
  • Queue Depth: Number of messages in queues
  • Connection Count: Active connections

Management API

import requests

# Get queue info
response = requests.get(
f'http://{host}:15672/api/queues/%2F/tasks',
auth=(username, password)
)
queue_info = response.json()
print(f"Messages: {queue_info['messages']}")
print(f"Consumers: {queue_info['consumers']}")

Best Practices

  1. Use Durable Queues: Survive broker restarts
  2. Persistent Messages: Set delivery_mode=2 for important messages
  3. Acknowledge Messages: Use manual acknowledgments for reliability
  4. Fair Dispatch: Set prefetch_count to distribute load evenly
  5. Handle Failures: Use dead letter exchanges for failed messages
  6. Monitor Queue Depth: Alert on growing queues
  7. Use Connection Pooling: Reuse connections and channels
  8. Set Message TTL: Prevent queue buildup with expiration
  9. Implement Retries: Retry failed messages with exponential backoff
  10. Separate Concerns: Use different queues/exchanges for different purposes
  11. Use Priority Queues Sparingly: Only when truly needed (adds overhead)
  12. Test Backups: Verify backup/restore procedures

Troubleshooting

Connection Issues

# Test connection
try:
connection = pika.BlockingConnection(pika.URLParameters(url))
print("Connected successfully")
connection.close()
except pika.exceptions.AMQPConnectionError as e:
print(f"Connection failed: {e}")

Queue Depth Issues

# Check queue depth
queue = channel.queue_declare(queue='tasks', passive=True)
message_count = queue.method.message_count
print(f"Messages in queue: {message_count}")

if message_count > 10000:
print("Warning: Queue depth is high")

Message Not Being Consumed

Common issues:

  • Consumer not acknowledging messages
  • Prefetch count too high
  • Consumer crashed without nack/reject
  • Queue not bound to exchange correctly
# Purge queue (delete all messages)
channel.queue_purge('tasks')

# Check bindings
# Use management UI or API

Support

For issues or questions:

  • Check add-on logs in the Strongly dashboard
  • Review RabbitMQ official documentation
  • Use RabbitMQ Management UI (available at port 15672)
  • Contact Strongly support through the platform