Every Magento 2 store has operations that are too slow to run synchronously during a web request. Sending transactional emails. Updating inventory across thousands of products. Re-indexing after bulk catalog changes. Running price calculations for complex customer segments.
The naive approach — doing it all in-process during the HTTP request — results in timeouts, frustrated customers, and overwhelmed PHP workers. Magento 2's message queue framework is the proper solution, and when configured correctly, it can dramatically improve perceived performance while making your architecture more resilient.
This guide covers everything you need to know about Magento 2's async operations and message queue system: the architecture, RabbitMQ setup, consumer tuning, and practical patterns for pushing your own heavy workloads off the critical path.
What Is the Message Queue Framework?
Magento 2's message queue framework (introduced in 2.0 for B2B, available in all editions since 2.2) is a pub/sub system that decouples producers (code that initiates work) from consumers (code that processes it).
Instead of:
Request → Process → Wait → Response
You get:
Request → Publish message → Return immediately
(Consumer picks up message and processes asynchronously)
The framework supports two backends:
- MySQL — Simple, no extra infrastructure, works out of the box. Limited throughput.
- RabbitMQ — Enterprise-grade, high-throughput, supports complex routing. Recommended for production.
What Magento Already Uses Message Queues For
Before building custom async logic, it's worth knowing what Magento already processes asynchronously:
| Topic | What It Does |
|---|---|
async.operations.all |
Bulk REST API operations (product updates, etc.) |
inventory.reservations.updateSalabilityStatus |
MSI stock updates |
sales.rule.quote.trigger.recollect |
Cart price rule recalculations |
product_action_attribute.update |
Bulk attribute updates from Admin grid |
async.magento.cataloginventory.api.stockregistryinterface.updatestockitembysku.post |
Async stock updates via API |
If you're using bulk operations in Admin or the Async REST API, these queues are already doing work for you.
Architecture Overview
The message queue system has four key components:
- Publisher — Sends a message to a topic
-
Topic — Named channel (e.g.,
mycompany.product.sync) - Queue — Storage for pending messages (MySQL table or RabbitMQ queue)
- Consumer — Long-running process that polls the queue and processes messages
Configuration is split across three XML files in your module:
etc/
├── queue_topology.xml # Defines exchanges and bindings
├── queue_publisher.xml # Connects topics to exchanges
├── queue_consumer.xml # Defines consumer handlers
└── communication.xml # Defines topic schemas
Setting Up RabbitMQ
For anything beyond development, use RabbitMQ. MySQL queues work but have limitations under load — they use polling, table locks, and don't support priority queues or dead letter exchanges.
Installation
# Ubuntu/Debian
apt install rabbitmq-server
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
# Enable management UI
rabbitmq-plugins enable rabbitmq_management
# Access at http://localhost:15672 (guest/guest)
Magento Configuration
// app/etc/env.php
'queue' => [
'amqp' => [
'host' => 'localhost',
'port' => '5672',
'user' => 'magento',
'password' => 'your-password',
'virtualhost' => '/',
'ssl' => false,
]
]
Create a dedicated RabbitMQ user:
rabbitmqctl add_user magento your-password
rabbitmqctl set_permissions -p / magento ".*" ".*" ".*"
rabbitmqctl set_user_tags magento monitoring
Verify the Connection
bin/magento queue:consumers:list
If this returns a list of consumers without errors, the AMQP connection is working.
Building a Custom Async Operation
Let's walk through a real example: a custom product sync to an external ERP system that currently runs synchronously during the admin product save.
Step 1: Define the Topic Schema
<!-- etc/communication.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="mycompany.erp.product.sync" request="MyCompany\ErpSync\Api\Data\SyncRequestInterface"/>
</config>
Step 2: Configure the Publisher
<!-- etc/queue_publisher.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="mycompany.erp.product.sync">
<connection name="amqp" exchange="magento" disabled="false"/>
<!-- Fallback to MySQL if RabbitMQ unavailable -->
<connection name="db" exchange="magento-db" disabled="true"/>
</publisher>
</config>
Step 3: Configure Topology (RabbitMQ)
<!-- etc/queue_topology.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="magento" type="topic" connection="amqp">
<binding id="erpSyncBinding"
topic="mycompany.erp.product.sync"
destinationType="queue"
destination="mycompany.erp.product.sync"/>
</exchange>
</config>
Step 4: Define the Consumer
<!-- etc/queue_consumer.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="myCompanyErpProductSync"
queue="mycompany.erp.product.sync"
connection="amqp"
consumerInstance="Magento\Framework\MessageQueue\Consumer"
handler="MyCompany\ErpSync\Model\ProductSyncHandler::processMessage"
maxMessages="1000"/>
</config>
Step 5: Implement the Handler
// Model/ProductSyncHandler.php
namespace MyCompany\ErpSync\Model;
use MyCompany\ErpSync\Api\Data\SyncRequestInterface;
use Psr\Log\LoggerInterface;
class ProductSyncHandler
{
public function __construct(
private readonly ErpApiClient $erpClient,
private readonly LoggerInterface $logger
) {}
public function processMessage(SyncRequestInterface $request): void
{
try {
$this->erpClient->syncProduct(
$request->getSku(),
$request->getProductData()
);
} catch (\Exception $e) {
// Log but don't re-throw — Magento will handle dead letters
$this->logger->error('ERP sync failed: ' . $e->getMessage(), [
'sku' => $request->getSku(),
]);
}
}
}
Step 6: Publish from Your Event Observer
// Observer/ProductSaveAfter.php
namespace MyCompany\ErpSync\Observer;
use Magento\Framework\MessageQueue\PublisherInterface;
use MyCompany\ErpSync\Api\Data\SyncRequestInterfaceFactory;
class ProductSaveAfter implements \Magento\Framework\Event\ObserverInterface
{
public function __construct(
private readonly PublisherInterface $publisher,
private readonly SyncRequestInterfaceFactory $requestFactory
) {}
public function execute(\Magento\Framework\Event\Observer $observer): void
{
$product = $observer->getEvent()->getProduct();
$request = $this->requestFactory->create();
$request->setSku($product->getSku());
$request->setProductData($this->extractProductData($product));
// Non-blocking: returns immediately, consumer picks up async
$this->publisher->publish('mycompany.erp.product.sync', $request);
}
}
The admin product save now returns instantly. The ERP sync happens in the background.
Running Consumers in Production
Consumer processes are long-running PHP daemons. They need to be managed like a service.
Supervisor Configuration
; /etc/supervisor/conf.d/magento-consumers.conf
[program:magento-erp-sync]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start myCompanyErpProductSync --max-messages=10000
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
startretries=10
stderr_logfile=/var/log/supervisor/magento-erp-sync.err.log
stdout_logfile=/var/log/supervisor/magento-erp-sync.out.log
stopasgroup=true
killasgroup=true
[program:magento-inventory-reservations]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start inventoryQtyCounter --max-messages=10000
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
Reload Supervisor after changes:
supervisorctl reread
supervisorctl update
supervisorctl status
Key Consumer Options
bin/magento queue:consumers:start <consumer-name> \
--max-messages=10000 \ # Restart after N messages (prevents memory leaks)
--batch-size=100 \ # Process N messages per iteration
--single-thread \ # Run as single-threaded (default)
--pid-file-path=/tmp/consumer.pid
The --max-messages flag is critical. Long-running PHP processes accumulate memory leaks from Magento's object manager. Setting a reasonable limit (1000-50000 depending on message size) and letting Supervisor restart the process is safer than running indefinitely.
Scaling Consumers
For high-throughput scenarios, run multiple consumer instances in parallel:
; Multiple parallel consumers for high-volume queue
[program:magento-erp-sync]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start myCompanyErpProductSync --max-messages=5000
numprocs=4 ; Run 4 parallel consumers
process_name=%(program_name)s-%(process_num)02d
With RabbitMQ, multiple consumers on the same queue use round-robin distribution automatically. With MySQL queues, Magento handles locking to prevent double-processing, but MySQL queues don't scale as gracefully.
The Async Bulk Operations API
For mass operations (importing thousands of products, bulk price updates, etc.), Magento's Async Bulk REST API is built on top of the message queue framework:
# Async bulk product update
curl -X POST https://yourstore.com/rest/async/bulk/V1/products \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '[
{"product": {"sku": "SKU001", "price": 29.99}},
{"product": {"sku": "SKU002", "price": 39.99}},
...
]'
The response returns immediately with a bulk operation UUID:
{"bulk_uuid":"a1b2c3d4-...","request_items":[...]}
Check status asynchronously:
curl https://yourstore.com/rest/V1/bulk/a1b2c3d4-.../status \
-H "Authorization: Bearer YOUR_TOKEN"
This approach is dramatically faster for bulk imports than the synchronous REST API because:
- The HTTP request returns in milliseconds
- Operations are batched and processed by consumers
- Multiple consumers can parallelize the work
- Failed operations are tracked and retriable without re-running everything
Monitoring Queue Health
RabbitMQ Management Console
The RabbitMQ management UI (http://your-server:15672) gives you:
- Queue depths (are messages piling up?)
- Consumer counts (are consumers running?)
- Message rates (throughput per second)
- Dead letter queue contents (failed messages)
CLI Monitoring
# List all queues and their message counts
rabbitmqctl list_queues name messages consumers
# Check if consumers are running
bin/magento queue:consumers:list
# Magento's bulk operation status
bin/magento queue:consumers:start --help
Alert on Queue Depth
Set up monitoring to alert when queue depth exceeds thresholds:
# Simple bash check (add to cron or monitoring system)
DEPTH=$(rabbitmqctl list_queues name messages | grep "mycompany.erp.product.sync" | awk '{print $2}')
if [ "$DEPTH" -gt 1000 ]; then
echo "WARNING: ERP sync queue depth is $DEPTH" | mail -s "Queue Alert" ops@yourcompany.com
fi
Common Pitfalls and How to Avoid Them
Pitfall 1: Not Handling Failures
If your consumer throws an unhandled exception, the message may be lost (MySQL) or moved to a dead letter queue (RabbitMQ). Always catch exceptions in your handler and implement retry logic or alerting for persistent failures.
Pitfall 2: Running Consumers via Cron Only
Magento's default setup uses cron to start consumers. This adds up to a 1-minute delay before a consumer starts processing new messages. For latency-sensitive operations, run consumers as persistent daemons via Supervisor instead.
Pitfall 3: Large Message Payloads
Message queues are optimized for small messages. Avoid putting large data blobs (full product objects, images) in messages. Instead, pass identifiers:
// Bad: serialize the entire product
$this->publisher->publish('topic', $product->getData()); // Could be 50KB+
// Good: pass just the ID
$this->publisher->publish('topic', ['product_id' => $product->getId()]); // Bytes
// Consumer loads the product from DB when processing
Pitfall 4: Ignoring Memory Limits
Long-running consumers will eventually exhaust PHP memory without --max-messages. The Supervisor restart approach is the standard pattern — don't fight it.
Quick Reference: When to Use Message Queues
| Scenario | Async? | Why |
|---|---|---|
| Sending transactional emails | ✅ Yes | No customer-facing impact from delay |
| Syncing to external ERP/PIM | ✅ Yes | External API latency shouldn't block save |
| Bulk product/price updates | ✅ Yes | Can take minutes; use Async Bulk API |
| Re-indexing after bulk import | ✅ Yes | Resource-intensive, not time-critical |
| Updating cart totals | ❌ No | Customer expects immediate feedback |
| Inventory check at checkout | ❌ No | Must be synchronous for accuracy |
| Generating invoices | ⚠️ Depends | Async OK if email delivery is the only output |
Conclusion
Magento 2's message queue framework is one of the platform's most underutilized performance tools. Most stores that struggle with slow admin saves, timeout-prone bulk operations, or overloaded PHP workers could benefit significantly from pushing work off the critical path.
The investment is real — you need RabbitMQ in production, Supervisor to manage consumers, and careful attention to message schema design. But the payoff is equally real: admin product saves that return in milliseconds, bulk imports that don't time out, and an architecture that handles traffic spikes gracefully because expensive work is queued rather than executed inline.
Start with Magento's built-in async operations (the Async Bulk REST API is available today with no custom code), then identify the top 2-3 synchronous operations in your codebase that are causing the most pain, and implement them as async consumers. Your customers — and your PHP workers — will thank you.