Giving Kafka-Style Topics the Routing Power of RabbitMQ

java dev.to

One of the most common complaints about Kafka is its routing model. You have topics. You have partitions. That's it. If you want to route order events differently based on region, priority, or event type, you're stuck with two bad options: create a separate topic for every permutation (orders-us-high, orders-eu-low, ...) or have every consumer receive everything and filter client-side.

RabbitMQ solved this years ago with its exchange model. But RabbitMQ trades routing flexibility for throughput — it can't match Kafka's append-only log performance.

TitanMQ has had both systems from the start: a Kafka-style commit log for storage and RabbitMQ-style exchanges for routing. The problem was they weren't connected. The exchange implementations sat in titanmq-routing, fully tested, doing nothing. Every PRODUCE request went straight to TopicManager.append(). The routing engine was dead code.

We just fixed that. Here's how.


The Design Problem

The naive approach would be: every message goes through an exchange, always. But that breaks backward compatibility and adds latency for the common case where you just want topic-partition semantics.

The approach we chose: exchanges are opt-in and overlay the existing topic model. The broker checks if the target topic name matches a declared exchange. If it does, route through the exchange. If it doesn't, write directly to the topic. Zero overhead for users who don't use exchanges.

Producer sends to "order-events"
         │
         ▼
  ┌──────────────────┐
  │ Exchange exists?  │
  │ "order-events"   │
  └──────┬───────────┘
     yes │        no
         │         │
         ▼         ▼
  ┌────────────┐  ┌──────────────┐
  │ Route via  │  │ Direct write │
  │ exchange   │  │ to topic     │
  └────┬───────┘  └──────────────┘
       │
       ▼
  Write to each destination topic
  (fulfillment, analytics, ...)
Enter fullscreen mode Exit fullscreen mode

This means existing applications that use plain topics continue to work without any changes. Exchanges are a layer you add when you need them.

ExchangeManager

The central piece is ExchangeManager, which manages the lifecycle of named exchanges:

public class ExchangeManager {

    private final ConcurrentHashMap<String, Exchange> exchanges = new ConcurrentHashMap<>();

    public void declareExchange(String name, ExchangeType type) {
        Exchange exchange = switch (type) {
            case DIRECT -> new DirectExchange(name);
            case TOPIC -> new TopicExchange(name);
            case FANOUT -> new FanoutExchange(name);
            case CONTENT_BASED -> new ContentBasedExchange(name);
        };
        exchanges.put(name, exchange);
    }

    public void bind(String exchangeName, String destinationTopic, String routingKey) {
        exchanges.get(exchangeName).bind(destinationTopic, routingKey);
    }

    public List<String> route(String exchangeName, TitanMessage message, String routingKey) {
        Exchange exchange = exchanges.get(exchangeName);
        if (exchange == null) return List.of();
        return exchange.route(message, routingKey);
    }
}
Enter fullscreen mode Exit fullscreen mode

Declaration is idempotent — declaring the same exchange with the same type twice is a no-op. Declaring with a different type throws an error. This matches RabbitMQ's behavior and prevents accidental misconfiguration.

The Four Exchange Types

Direct

Exact routing key match. The simplest and fastest.

mgr.declareExchange("order-events", ExchangeType.DIRECT);
mgr.bind("order-events", "fulfillment", "order.created");
mgr.bind("order-events", "billing", "order.paid");

// Message with routingKey="order.created" → fulfillment only
// Message with routingKey="order.paid" → billing only
// Message with routingKey="order.cancelled" → nowhere (error returned)
Enter fullscreen mode Exit fullscreen mode

Topic

Wildcard pattern matching. * matches one word, # matches zero or more.

mgr.declareExchange("events", ExchangeType.TOPIC);
mgr.bind("events", "us-handler", "order.us.*");
mgr.bind("events", "all-errors", "#.error");

// "order.us.created" → us-handler
// "payment.processing.error" → all-errors
// "error" → all-errors (# matches zero words too)
Enter fullscreen mode Exit fullscreen mode

The pattern matching uses a recursive algorithm that handles the # wildcard's zero-or-more semantics:

private static boolean matchParts(String[] routing, int ri, String[] pattern, int pi) {
    if (ri == routing.length && pi == pattern.length) return true;
    if (pi == pattern.length) return false;

    if (pattern[pi].equals("#")) {
        if (pi == pattern.length - 1) return true;
        for (int i = ri; i <= routing.length; i++) {
            if (matchParts(routing, i, pattern, pi + 1)) return true;
        }
        return false;
    }

    if (ri == routing.length) return false;
    if (pattern[pi].equals("*") || pattern[pi].equals(routing[ri])) {
        return matchParts(routing, ri + 1, pattern, pi + 1);
    }
    return false;
}
Enter fullscreen mode Exit fullscreen mode

Fanout

Broadcast to all bound destinations. Routing key is ignored.

mgr.declareExchange("notifications", ExchangeType.FANOUT);
mgr.bind("notifications", "email-service", "");
mgr.bind("notifications", "sms-service", "");
mgr.bind("notifications", "push-service", "");

// Any message → all three services
Enter fullscreen mode Exit fullscreen mode

Content-Based

This is where TitanMQ goes beyond RabbitMQ. Instead of matching routing keys, it evaluates predicates against message headers.

mgr.declareExchange("smart-router", ExchangeType.CONTENT_BASED);
mgr.bind("smart-router", "us-queue", "region=us");
mgr.bind("smart-router", "eu-priority", "region=eu,priority=high");

// Message with header region=us → us-queue
// Message with headers region=eu, priority=high → eu-priority
// Message with header region=eu, priority=low → nowhere
Enter fullscreen mode Exit fullscreen mode

The wire protocol uses a simple key=value,key=value format for binding rules. But programmatically, you can pass arbitrary Predicate<Map<String, String>> lambdas:

ContentBasedExchange exchange = (ContentBasedExchange) mgr.getExchange("smart-router");
exchange.bind("vip-queue", headers ->
    Integer.parseInt(headers.getOrDefault("priority", "0")) > 8 &&
    "premium".equals(headers.get("tier"))
);
Enter fullscreen mode Exit fullscreen mode

Integration into the Broker

The key change is in BrokerRequestHandler.handleProduce(). The original code was a straight write:

// Before: direct write, no routing
TitanMessage message = TitanMessage.deserialize(buffer);
long offset = topicManager.append(message);
Enter fullscreen mode Exit fullscreen mode

The new code checks for an exchange first:

if (exchangeManager.hasExchange(message.topic())) {
    String routingKey = message.headers().getOrDefault("routingKey", message.topic());
    List<String> destinations = exchangeManager.route(message.topic(), message, routingKey);

    for (String destTopic : destinations) {
        TitanMessage routed = TitanMessage.builder()
                .id(message.id())
                .topic(destTopic)
                .key(message.key())
                .payload(message.payload())
                .headers(message.headers())
                .timestamp(message.timestamp())
                .build();
        topicManager.append(routed);
    }
} else {
    topicManager.append(message);
}
Enter fullscreen mode Exit fullscreen mode

The routing key comes from the message header routingKey. If the header isn't set, the topic name itself is used as the routing key. This keeps the producer API simple — you don't need a separate routing key parameter.

Each destination topic gets its own copy of the message with the topic field rewritten. The message ID, key, payload, headers, and timestamp are preserved. This means consumers on different destination topics see the same message content.

Wire Protocol

Three new commands were added:

Command Code Payload
DECLARE_EXCHANGE 0x24 [4B nameLen][NB name][1B type]
BIND_EXCHANGE 0x25 [4B exchangeLen][NB exchange][4B destLen][NB dest][4B keyLen][NB key]
UNBIND_EXCHANGE 0x26 Same as BIND

The exchange type is encoded as a single byte: 0=DIRECT, 1=TOPIC, 2=FANOUT, 3=CONTENT_BASED.

All three commands return ADMIN_RESPONSE with a success byte on completion.

What This Enables

With the routing engine live, TitanMQ can now handle patterns that previously required either multiple topics or client-side filtering:

Event-driven microservices: Declare a topic exchange for domain events. Each service binds to the patterns it cares about. The order service gets order.*, the analytics service gets # (everything), the error handler gets #.error.

Multi-region routing: Declare a content-based exchange. Bind regional queues based on the region header. Messages are routed at the broker — no wasted bandwidth sending US events to EU consumers.

Notification fanout: Declare a fanout exchange. Every notification channel (email, SMS, push) gets every message. Add a new channel by adding a binding — no producer changes needed.

Priority routing: Use content-based exchange with priority header predicates. High-priority messages go to a fast-path queue with more consumer instances. Low-priority messages go to a batch queue.

All of this on top of Kafka-style append-only logs with offset tracking, consumer groups, and replay capability. The routing happens at write time, so consumers still get the full commit log semantics — they can seek, replay, and track offsets on their destination topics.

Trade-offs

Routing adds a small amount of work per PRODUCE request: one ConcurrentHashMap.get() to check if an exchange exists, plus the routing logic itself. For direct and fanout exchanges, this is O(1). For topic exchanges, it's O(n) where n is the number of bindings (pattern matching against each binding). For content-based exchanges, it's O(n) predicate evaluations.

Messages routed to multiple destinations are written multiple times — once per destination topic. This increases disk I/O proportionally. If a fanout exchange has 10 bindings, each message is written 10 times. This is the same trade-off RabbitMQ makes, and it's the correct one: consumers on different topics need independent offset tracking and retention.

The routing engine is entirely in-memory. Exchange declarations and bindings are not persisted to disk yet. A broker restart loses all exchange configuration. This is a known limitation tracked in a separate issue — the fix will persist exchange state to the Raft log so it survives restarts and is replicated across the cluster.


GitHub: https://github.com/iwtxokhtd83/TitanMQ
Release: v1.2.0
Issue: #10

Source: dev.to

arrow_back Back to Tutorials