RabbitMQ
RabbitMQ is a robust message broker that supports multiple messaging protocols, enabling reliable communication between distributed applications.
Overview
- Versions: 4.0, 3.13, 3.12
- Cluster Support: ❌ No (Single node only)
- Use Cases: Message queuing, async tasks, event streaming, microservices communication
- Features: Multiple protocols, routing, federation, reliability
Key Features
- Multiple Protocols: AMQP, MQTT, STOMP, HTTP
- Flexible Routing: Direct, topic, fanout, and header exchanges
- Message Reliability: Persistent messages, acknowledgments, publisher confirms
- High Availability: Mirrored queues and quorum queues
- Management UI: Built-in web interface for monitoring and management
- Plugin System: Extensible with plugins
- Dead Letter Exchanges: Handle failed message processing
- Priority Queues: Process high-priority messages first
- Delayed Messages: Schedule message delivery
Resource Tiers
| Tier | CPU | Memory | Disk | Best For |
|---|---|---|---|---|
| Small | 0.5 | 1GB | 10GB | Development, testing |
| Medium | 1 | 2GB | 25GB | Small production apps |
| Large | 2 | 4GB | 50GB | Production workloads |
| XLarge | 4 | 8GB | 100GB | High-throughput messaging |
Creating a RabbitMQ Add-on
- Navigate to Add-ons → Create Add-on
- Select RabbitMQ as the type
- Choose a version (4.0, 3.13, or 3.12)
- Configure:
- Label: Descriptive name (e.g., "Task Queue")
- Description: Purpose and notes
- Environment: Development or Production
- Resource Tier: Based on your workload requirements
- Configure backups:
- Schedule: Daily recommended for production
- Retention: 7+ days for production
- Click Create Add-on
Connection Information
After deployment, connection details are available in the add-on details page and automatically injected into your apps via STRONGLY_SERVICES.
Connection String Format
amqp://username:password@host:5672/vhost
Accessing Connection Details
- Python
- Node.js
- Go
import os
import json
import pika
# Parse STRONGLY_SERVICES
services = json.loads(os.environ.get('STRONGLY_SERVICES', '{}'))
# Get RabbitMQ add-on connection
rabbitmq_addon = services['addons']['addon-id']
# Connect using connection string
params = pika.URLParameters(rabbitmq_addon['connectionString'])
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Or connect using individual parameters
credentials = pika.PlainCredentials(
rabbitmq_addon['username'],
rabbitmq_addon['password']
)
params = pika.ConnectionParameters(
host=rabbitmq_addon['host'],
port=rabbitmq_addon['port'],
virtual_host=rabbitmq_addon.get('vhost', '/'),
credentials=credentials
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Declare a queue
channel.queue_declare(queue='tasks', durable=True)
# Publish a message
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)
connection.close()
const amqp = require('amqplib');
// Parse STRONGLY_SERVICES
const services = JSON.parse(process.env.STRONGLY_SERVICES || '{}');
const rabbitmqAddon = services.addons['addon-id'];
// Connect using connection string
const connection = await amqp.connect(rabbitmqAddon.connectionString);
const channel = await connection.createChannel();
// Declare a queue
await channel.assertQueue('tasks', { durable: true });
// Publish a message
channel.sendToQueue('tasks', Buffer.from('Hello World!'), {
persistent: true
});
// Consume messages
await channel.consume('tasks', (msg) => {
console.log('Received:', msg.content.toString());
channel.ack(msg);
}, { noAck: false });
// Close connection when done
// await connection.close();
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
type Services struct {
Addons map[string]Addon `json:"addons"`
}
type Addon struct {
ConnectionString string `json:"connectionString"`
}
func main() {
var services Services
json.Unmarshal([]byte(os.Getenv("STRONGLY_SERVICES")), &services)
rabbitmqAddon := services.Addons["addon-id"]
// Connect
conn, err := amqp.Dial(rabbitmqAddon.ConnectionString)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create channel
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
defer ch.Close()
// Declare queue
q, err := ch.QueueDeclare(
"tasks", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err)
}
// Publish message
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
if err != nil {
log.Fatal(err)
}
fmt.Println("Message sent")
}
Core Concepts
Exchanges
Exchanges receive messages from publishers and route them to queues based on rules.
Exchange Types:
- Direct: Route to queues based on exact routing key match
- Topic: Route to queues based on pattern matching (wildcards)
- Fanout: Route to all bound queues (broadcast)
- Headers: Route based on message header attributes
# Declare exchange
channel.exchange_declare(
exchange='logs',
exchange_type='fanout',
durable=True
)
# Bind queue to exchange
channel.queue_bind(
queue='queue_name',
exchange='logs',
routing_key=''
)
Queues
Queues store messages until they are consumed by applications.
# Declare durable queue
channel.queue_declare(
queue='tasks',
durable=True, # Survive broker restart
exclusive=False, # Not restricted to this connection
auto_delete=False # Don't delete when consumer disconnects
)
Messages
Messages contain data and metadata.
# Publish with properties
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps({'task': 'process_data', 'data': {...}}),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json',
content_encoding='utf-8',
priority=5, # 0-9 priority
expiration='60000', # TTL in milliseconds
message_id='msg-123',
timestamp=int(time.time()),
headers={'x-custom': 'value'}
)
)
Common Patterns
Work Queue (Task Distribution)
Distribute time-consuming tasks among multiple workers.
Producer:
import pika
import json
connection = pika.BlockingConnection(pika.URLParameters(url))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
task = {'action': 'send_email', 'to': 'user@example.com'}
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task),
properties=pika.BasicProperties(
delivery_mode=2,
)
)
connection.close()
Consumer (Worker):
import pika
import json
import time
def callback(ch, method, properties, body):
task = json.loads(body)
print(f"Processing: {task}")
# Simulate work
time.sleep(5)
print("Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.URLParameters(url))
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
# Fair dispatch - don't give worker more than 1 task at a time
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='tasks',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()
Pub/Sub (Fanout Exchange)
Broadcast messages to multiple consumers.
Publisher:
channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)
message = "System event occurred"
channel.basic_publish(
exchange='logs',
routing_key='', # Ignored for fanout
body=message
)
Subscriber:
channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)
# Create exclusive queue (deleted when connection closes)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f"Received: {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
Routing (Direct Exchange)
Route messages based on routing key.
Publisher:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)
severities = ['info', 'warning', 'error']
severity = 'error'
message = "Critical error occurred"
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message
)
Consumer:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind to multiple routing keys
for severity in ['error', 'warning']:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
def callback(ch, method, properties, body):
print(f"[{method.routing_key}] {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
Topics (Pattern Matching)
Route messages based on wildcard patterns.
# Declare topic exchange
channel.exchange_declare(exchange='topic_logs', exchange_type='topic', durable=True)
# Publish with routing key pattern
routing_key = 'user.login.success'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body='User logged in successfully'
)
# Consumer binds with wildcards
# * matches exactly one word
# # matches zero or more words
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Listen to all user events
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.*.*')
# Or listen to all login events for any entity
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.login.*')
# Or listen to all error events
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.*.error')
RPC (Request/Reply)
Implement RPC pattern using RabbitMQ.
RPC Server:
def fib(n):
if n <= 1:
return n
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, properties, body):
n = int(body)
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=properties.reply_to,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id
),
body=str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print('Awaiting RPC requests')
channel.start_consuming()
RPC Client:
import uuid
class RpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(pika.URLParameters(url))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, properties, body):
if self.corr_id == properties.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n)
)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
# Usage
rpc = RpcClient()
result = rpc.call(10)
print(f"fib(10) = {result}")
Advanced Features
Dead Letter Exchange
Handle failed messages by routing them to a dead letter exchange.
# Declare main queue with DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed_tasks'
}
)
# Declare dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='failed_tasks', durable=True)
channel.queue_bind(exchange='dlx', queue='failed_tasks', routing_key='failed_tasks')
# Messages are moved to DLX when:
# - Message is rejected with requeue=False
# - Message TTL expires
# - Queue length limit is exceeded
Message TTL (Time To Live)
Set expiration time for messages.
# Per-message TTL
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Expires in 60 seconds',
properties=pika.BasicProperties(
expiration='60000' # milliseconds
)
)
# Queue-level TTL
channel.queue_declare(
queue='tasks',
arguments={
'x-message-ttl': 60000 # All messages expire in 60 seconds
}
)
Priority Queues
Process high-priority messages first.
# Declare priority queue
channel.queue_declare(
queue='priority_tasks',
arguments={
'x-max-priority': 10 # Max priority level
}
)
# Publish with priority
channel.basic_publish(
exchange='',
routing_key='priority_tasks',
body='High priority task',
properties=pika.BasicProperties(
priority=9
)
)
Delayed Messages
Schedule message delivery (requires rabbitmq_delayed_message_exchange plugin).
# Declare delayed exchange
channel.exchange_declare(
exchange='delayed',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# Publish with delay
channel.basic_publish(
exchange='delayed',
routing_key='tasks',
body='Delayed message',
properties=pika.BasicProperties(
headers={'x-delay': 5000} # 5 second delay
)
)
Backup & Restore
RabbitMQ add-ons use rabbitmqctl export_definitions for backups, exporting configuration and definitions.
Backup Configuration
- Tool:
rabbitmqctl export_definitions - Format:
.json - Includes: Exchanges, queues, bindings, vhosts, users, permissions
- Storage: AWS S3 (
s3://strongly-backups/backups/<addon-id>/)
Manual Backup
- Go to add-on details page
- Click Backup Now
- Monitor progress in job logs
- Backup saved as
backup-YYYYMMDDHHMMSS.json
RabbitMQ backups include configuration (exchanges, queues, bindings) but not message contents. For message persistence, use durable queues and persistent messages.
Scheduled Backups
Configure during add-on creation or in settings:
- Daily backups: Recommended for production
- Retention: 7-14 days minimum
- Custom cron: For specific schedules
Restore Process
- Navigate to Backups tab
- Select backup from list
- Click Restore
- Confirm (add-on will stop temporarily)
- Definitions imported using
rabbitmqctl import_definitions - Add-on automatically restarts
Monitoring
Monitor your RabbitMQ add-on through the Strongly platform:
- CPU Usage: Track CPU utilization
- Memory Usage: Monitor memory consumption
- Disk Space: Watch disk utilization
- Message Rate: Messages published/consumed per second
- Queue Depth: Number of messages in queues
- Connection Count: Active connections
Management API
import requests
# Get queue info
response = requests.get(
f'http://{host}:15672/api/queues/%2F/tasks',
auth=(username, password)
)
queue_info = response.json()
print(f"Messages: {queue_info['messages']}")
print(f"Consumers: {queue_info['consumers']}")
Best Practices
- Use Durable Queues: Survive broker restarts
- Persistent Messages: Set delivery_mode=2 for important messages
- Acknowledge Messages: Use manual acknowledgments for reliability
- Fair Dispatch: Set prefetch_count to distribute load evenly
- Handle Failures: Use dead letter exchanges for failed messages
- Monitor Queue Depth: Alert on growing queues
- Use Connection Pooling: Reuse connections and channels
- Set Message TTL: Prevent queue buildup with expiration
- Implement Retries: Retry failed messages with exponential backoff
- Separate Concerns: Use different queues/exchanges for different purposes
- Use Priority Queues Sparingly: Only when truly needed (adds overhead)
- Test Backups: Verify backup/restore procedures
Troubleshooting
Connection Issues
# Test connection
try:
connection = pika.BlockingConnection(pika.URLParameters(url))
print("Connected successfully")
connection.close()
except pika.exceptions.AMQPConnectionError as e:
print(f"Connection failed: {e}")
Queue Depth Issues
# Check queue depth
queue = channel.queue_declare(queue='tasks', passive=True)
message_count = queue.method.message_count
print(f"Messages in queue: {message_count}")
if message_count > 10000:
print("Warning: Queue depth is high")
Message Not Being Consumed
Common issues:
- Consumer not acknowledging messages
- Prefetch count too high
- Consumer crashed without nack/reject
- Queue not bound to exchange correctly
# Purge queue (delete all messages)
channel.queue_purge('tasks')
# Check bindings
# Use management UI or API
Support
For issues or questions:
- Check add-on logs in the Strongly dashboard
- Review RabbitMQ official documentation
- Use RabbitMQ Management UI (available at port 15672)
- Contact Strongly support through the platform