Magento 2 Async Operations and Message Queues: A Performance Deep Dive

php dev.to

Every Magento 2 store has operations that are too slow to run synchronously during a web request. Sending transactional emails. Updating inventory across thousands of products. Re-indexing after bulk catalog changes. Running price calculations for complex customer segments.

The naive approach — doing it all in-process during the HTTP request — results in timeouts, frustrated customers, and overwhelmed PHP workers. Magento 2's message queue framework is the proper solution, and when configured correctly, it can dramatically improve perceived performance while making your architecture more resilient.

This guide covers everything you need to know about Magento 2's async operations and message queue system: the architecture, RabbitMQ setup, consumer tuning, and practical patterns for pushing your own heavy workloads off the critical path.

What Is the Message Queue Framework?

Magento 2's message queue framework (introduced in 2.0 for B2B, available in all editions since 2.2) is a pub/sub system that decouples producers (code that initiates work) from consumers (code that processes it).

Instead of:

Request → Process → Wait → Response
Enter fullscreen mode Exit fullscreen mode

You get:

Request → Publish message → Return immediately
          (Consumer picks up message and processes asynchronously)
Enter fullscreen mode Exit fullscreen mode

The framework supports two backends:

  • MySQL — Simple, no extra infrastructure, works out of the box. Limited throughput.
  • RabbitMQ — Enterprise-grade, high-throughput, supports complex routing. Recommended for production.

What Magento Already Uses Message Queues For

Before building custom async logic, it's worth knowing what Magento already processes asynchronously:

Topic What It Does
async.operations.all Bulk REST API operations (product updates, etc.)
inventory.reservations.updateSalabilityStatus MSI stock updates
sales.rule.quote.trigger.recollect Cart price rule recalculations
product_action_attribute.update Bulk attribute updates from Admin grid
async.magento.cataloginventory.api.stockregistryinterface.updatestockitembysku.post Async stock updates via API

If you're using bulk operations in Admin or the Async REST API, these queues are already doing work for you.

Architecture Overview

The message queue system has four key components:

  1. Publisher — Sends a message to a topic
  2. Topic — Named channel (e.g., mycompany.product.sync)
  3. Queue — Storage for pending messages (MySQL table or RabbitMQ queue)
  4. Consumer — Long-running process that polls the queue and processes messages

Configuration is split across three XML files in your module:

etc/
├── queue_topology.xml   # Defines exchanges and bindings
├── queue_publisher.xml  # Connects topics to exchanges
├── queue_consumer.xml   # Defines consumer handlers
└── communication.xml    # Defines topic schemas
Enter fullscreen mode Exit fullscreen mode

Setting Up RabbitMQ

For anything beyond development, use RabbitMQ. MySQL queues work but have limitations under load — they use polling, table locks, and don't support priority queues or dead letter exchanges.

Installation

# Ubuntu/Debian
apt install rabbitmq-server
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

# Enable management UI
rabbitmq-plugins enable rabbitmq_management
# Access at http://localhost:15672 (guest/guest)
Enter fullscreen mode Exit fullscreen mode

Magento Configuration

// app/etc/env.php
'queue' => [
    'amqp' => [
        'host' => 'localhost',
        'port' => '5672',
        'user' => 'magento',
        'password' => 'your-password',
        'virtualhost' => '/',
        'ssl' => false,
    ]
]
Enter fullscreen mode Exit fullscreen mode

Create a dedicated RabbitMQ user:

rabbitmqctl add_user magento your-password
rabbitmqctl set_permissions -p / magento ".*" ".*" ".*"
rabbitmqctl set_user_tags magento monitoring
Enter fullscreen mode Exit fullscreen mode

Verify the Connection

bin/magento queue:consumers:list
Enter fullscreen mode Exit fullscreen mode

If this returns a list of consumers without errors, the AMQP connection is working.

Building a Custom Async Operation

Let's walk through a real example: a custom product sync to an external ERP system that currently runs synchronously during the admin product save.

Step 1: Define the Topic Schema

<!-- etc/communication.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="mycompany.erp.product.sync" request="MyCompany\ErpSync\Api\Data\SyncRequestInterface"/>
</config>
Enter fullscreen mode Exit fullscreen mode

Step 2: Configure the Publisher

<!-- etc/queue_publisher.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="mycompany.erp.product.sync">
        <connection name="amqp" exchange="magento" disabled="false"/>
        <!-- Fallback to MySQL if RabbitMQ unavailable -->
        <connection name="db" exchange="magento-db" disabled="true"/>
    </publisher>
</config>
Enter fullscreen mode Exit fullscreen mode

Step 3: Configure Topology (RabbitMQ)

<!-- etc/queue_topology.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento" type="topic" connection="amqp">
        <binding id="erpSyncBinding"
                 topic="mycompany.erp.product.sync"
                 destinationType="queue"
                 destination="mycompany.erp.product.sync"/>
    </exchange>
</config>
Enter fullscreen mode Exit fullscreen mode

Step 4: Define the Consumer

<!-- etc/queue_consumer.xml -->
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="myCompanyErpProductSync"
              queue="mycompany.erp.product.sync"
              connection="amqp"
              consumerInstance="Magento\Framework\MessageQueue\Consumer"
              handler="MyCompany\ErpSync\Model\ProductSyncHandler::processMessage"
              maxMessages="1000"/>
</config>
Enter fullscreen mode Exit fullscreen mode

Step 5: Implement the Handler

// Model/ProductSyncHandler.php
namespace MyCompany\ErpSync\Model;

use MyCompany\ErpSync\Api\Data\SyncRequestInterface;
use Psr\Log\LoggerInterface;

class ProductSyncHandler
{
    public function __construct(
        private readonly ErpApiClient $erpClient,
        private readonly LoggerInterface $logger
    ) {}

    public function processMessage(SyncRequestInterface $request): void
    {
        try {
            $this->erpClient->syncProduct(
                $request->getSku(),
                $request->getProductData()
            );
        } catch (\Exception $e) {
            // Log but don't re-throw — Magento will handle dead letters
            $this->logger->error('ERP sync failed: ' . $e->getMessage(), [
                'sku' => $request->getSku(),
            ]);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Publish from Your Event Observer

// Observer/ProductSaveAfter.php
namespace MyCompany\ErpSync\Observer;

use Magento\Framework\MessageQueue\PublisherInterface;
use MyCompany\ErpSync\Api\Data\SyncRequestInterfaceFactory;

class ProductSaveAfter implements \Magento\Framework\Event\ObserverInterface
{
    public function __construct(
        private readonly PublisherInterface $publisher,
        private readonly SyncRequestInterfaceFactory $requestFactory
    ) {}

    public function execute(\Magento\Framework\Event\Observer $observer): void
    {
        $product = $observer->getEvent()->getProduct();

        $request = $this->requestFactory->create();
        $request->setSku($product->getSku());
        $request->setProductData($this->extractProductData($product));

        // Non-blocking: returns immediately, consumer picks up async
        $this->publisher->publish('mycompany.erp.product.sync', $request);
    }
}
Enter fullscreen mode Exit fullscreen mode

The admin product save now returns instantly. The ERP sync happens in the background.

Running Consumers in Production

Consumer processes are long-running PHP daemons. They need to be managed like a service.

Supervisor Configuration

; /etc/supervisor/conf.d/magento-consumers.conf
[program:magento-erp-sync]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start myCompanyErpProductSync --max-messages=10000
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
startretries=10
stderr_logfile=/var/log/supervisor/magento-erp-sync.err.log
stdout_logfile=/var/log/supervisor/magento-erp-sync.out.log
stopasgroup=true
killasgroup=true

[program:magento-inventory-reservations]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start inventoryQtyCounter --max-messages=10000
directory=/var/www/html
user=www-data
autostart=true
autorestart=true
Enter fullscreen mode Exit fullscreen mode

Reload Supervisor after changes:

supervisorctl reread
supervisorctl update
supervisorctl status
Enter fullscreen mode Exit fullscreen mode

Key Consumer Options

bin/magento queue:consumers:start <consumer-name> \
  --max-messages=10000 \   # Restart after N messages (prevents memory leaks)
  --batch-size=100 \       # Process N messages per iteration
  --single-thread \        # Run as single-threaded (default)
  --pid-file-path=/tmp/consumer.pid
Enter fullscreen mode Exit fullscreen mode

The --max-messages flag is critical. Long-running PHP processes accumulate memory leaks from Magento's object manager. Setting a reasonable limit (1000-50000 depending on message size) and letting Supervisor restart the process is safer than running indefinitely.

Scaling Consumers

For high-throughput scenarios, run multiple consumer instances in parallel:

; Multiple parallel consumers for high-volume queue
[program:magento-erp-sync]
command=/usr/bin/php /var/www/html/bin/magento queue:consumers:start myCompanyErpProductSync --max-messages=5000
numprocs=4               ; Run 4 parallel consumers
process_name=%(program_name)s-%(process_num)02d
Enter fullscreen mode Exit fullscreen mode

With RabbitMQ, multiple consumers on the same queue use round-robin distribution automatically. With MySQL queues, Magento handles locking to prevent double-processing, but MySQL queues don't scale as gracefully.

The Async Bulk Operations API

For mass operations (importing thousands of products, bulk price updates, etc.), Magento's Async Bulk REST API is built on top of the message queue framework:

# Async bulk product update
curl -X POST https://yourstore.com/rest/async/bulk/V1/products \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '[
    {"product": {"sku": "SKU001", "price": 29.99}},
    {"product": {"sku": "SKU002", "price": 39.99}},
    ...
  ]'
Enter fullscreen mode Exit fullscreen mode

The response returns immediately with a bulk operation UUID:

{"bulk_uuid":"a1b2c3d4-...","request_items":[...]}
Enter fullscreen mode Exit fullscreen mode

Check status asynchronously:

curl https://yourstore.com/rest/V1/bulk/a1b2c3d4-.../status \
  -H "Authorization: Bearer YOUR_TOKEN"
Enter fullscreen mode Exit fullscreen mode

This approach is dramatically faster for bulk imports than the synchronous REST API because:

  1. The HTTP request returns in milliseconds
  2. Operations are batched and processed by consumers
  3. Multiple consumers can parallelize the work
  4. Failed operations are tracked and retriable without re-running everything

Monitoring Queue Health

RabbitMQ Management Console

The RabbitMQ management UI (http://your-server:15672) gives you:

  • Queue depths (are messages piling up?)
  • Consumer counts (are consumers running?)
  • Message rates (throughput per second)
  • Dead letter queue contents (failed messages)

CLI Monitoring

# List all queues and their message counts
rabbitmqctl list_queues name messages consumers

# Check if consumers are running
bin/magento queue:consumers:list

# Magento's bulk operation status
bin/magento queue:consumers:start --help
Enter fullscreen mode Exit fullscreen mode

Alert on Queue Depth

Set up monitoring to alert when queue depth exceeds thresholds:

# Simple bash check (add to cron or monitoring system)
DEPTH=$(rabbitmqctl list_queues name messages | grep "mycompany.erp.product.sync" | awk '{print $2}')
if [ "$DEPTH" -gt 1000 ]; then
  echo "WARNING: ERP sync queue depth is $DEPTH" | mail -s "Queue Alert" ops@yourcompany.com
fi
Enter fullscreen mode Exit fullscreen mode

Common Pitfalls and How to Avoid Them

Pitfall 1: Not Handling Failures

If your consumer throws an unhandled exception, the message may be lost (MySQL) or moved to a dead letter queue (RabbitMQ). Always catch exceptions in your handler and implement retry logic or alerting for persistent failures.

Pitfall 2: Running Consumers via Cron Only

Magento's default setup uses cron to start consumers. This adds up to a 1-minute delay before a consumer starts processing new messages. For latency-sensitive operations, run consumers as persistent daemons via Supervisor instead.

Pitfall 3: Large Message Payloads

Message queues are optimized for small messages. Avoid putting large data blobs (full product objects, images) in messages. Instead, pass identifiers:

// Bad: serialize the entire product
$this->publisher->publish('topic', $product->getData()); // Could be 50KB+

// Good: pass just the ID
$this->publisher->publish('topic', ['product_id' => $product->getId()]); // Bytes
// Consumer loads the product from DB when processing
Enter fullscreen mode Exit fullscreen mode

Pitfall 4: Ignoring Memory Limits

Long-running consumers will eventually exhaust PHP memory without --max-messages. The Supervisor restart approach is the standard pattern — don't fight it.

Quick Reference: When to Use Message Queues

Scenario Async? Why
Sending transactional emails ✅ Yes No customer-facing impact from delay
Syncing to external ERP/PIM ✅ Yes External API latency shouldn't block save
Bulk product/price updates ✅ Yes Can take minutes; use Async Bulk API
Re-indexing after bulk import ✅ Yes Resource-intensive, not time-critical
Updating cart totals ❌ No Customer expects immediate feedback
Inventory check at checkout ❌ No Must be synchronous for accuracy
Generating invoices ⚠️ Depends Async OK if email delivery is the only output

Conclusion

Magento 2's message queue framework is one of the platform's most underutilized performance tools. Most stores that struggle with slow admin saves, timeout-prone bulk operations, or overloaded PHP workers could benefit significantly from pushing work off the critical path.

The investment is real — you need RabbitMQ in production, Supervisor to manage consumers, and careful attention to message schema design. But the payoff is equally real: admin product saves that return in milliseconds, bulk imports that don't time out, and an architecture that handles traffic spikes gracefully because expensive work is queued rather than executed inline.

Start with Magento's built-in async operations (the Async Bulk REST API is available today with no custom code), then identify the top 2-3 synchronous operations in your codebase that are causing the most pain, and implement them as async consumers. Your customers — and your PHP workers — will thank you.

Source: dev.to

arrow_back Back to Tutorials