One of the most common complaints about Kafka is its routing model. You have topics. You have partitions. That's it. If you want to route order events differently based on region, priority, or event type, you're stuck with two bad options: create a separate topic for every permutation (orders-us-high, orders-eu-low, ...) or have every consumer receive everything and filter client-side.
RabbitMQ solved this years ago with its exchange model. But RabbitMQ trades routing flexibility for throughput — it can't match Kafka's append-only log performance.
TitanMQ has had both systems from the start: a Kafka-style commit log for storage and RabbitMQ-style exchanges for routing. The problem was they weren't connected. The exchange implementations sat in titanmq-routing, fully tested, doing nothing. Every PRODUCE request went straight to TopicManager.append(). The routing engine was dead code.
We just fixed that. Here's how.
The Design Problem
The naive approach would be: every message goes through an exchange, always. But that breaks backward compatibility and adds latency for the common case where you just want topic-partition semantics.
The approach we chose: exchanges are opt-in and overlay the existing topic model. The broker checks if the target topic name matches a declared exchange. If it does, route through the exchange. If it doesn't, write directly to the topic. Zero overhead for users who don't use exchanges.
Producer sends to "order-events"
│
▼
┌──────────────────┐
│ Exchange exists? │
│ "order-events" │
└──────┬───────────┘
yes │ no
│ │
▼ ▼
┌────────────┐ ┌──────────────┐
│ Route via │ │ Direct write │
│ exchange │ │ to topic │
└────┬───────┘ └──────────────┘
│
▼
Write to each destination topic
(fulfillment, analytics, ...)
This means existing applications that use plain topics continue to work without any changes. Exchanges are a layer you add when you need them.
ExchangeManager
The central piece is ExchangeManager, which manages the lifecycle of named exchanges:
public class ExchangeManager {
private final ConcurrentHashMap<String, Exchange> exchanges = new ConcurrentHashMap<>();
public void declareExchange(String name, ExchangeType type) {
Exchange exchange = switch (type) {
case DIRECT -> new DirectExchange(name);
case TOPIC -> new TopicExchange(name);
case FANOUT -> new FanoutExchange(name);
case CONTENT_BASED -> new ContentBasedExchange(name);
};
exchanges.put(name, exchange);
}
public void bind(String exchangeName, String destinationTopic, String routingKey) {
exchanges.get(exchangeName).bind(destinationTopic, routingKey);
}
public List<String> route(String exchangeName, TitanMessage message, String routingKey) {
Exchange exchange = exchanges.get(exchangeName);
if (exchange == null) return List.of();
return exchange.route(message, routingKey);
}
}
Declaration is idempotent — declaring the same exchange with the same type twice is a no-op. Declaring with a different type throws an error. This matches RabbitMQ's behavior and prevents accidental misconfiguration.
The Four Exchange Types
Direct
Exact routing key match. The simplest and fastest.
mgr.declareExchange("order-events", ExchangeType.DIRECT);
mgr.bind("order-events", "fulfillment", "order.created");
mgr.bind("order-events", "billing", "order.paid");
// Message with routingKey="order.created" → fulfillment only
// Message with routingKey="order.paid" → billing only
// Message with routingKey="order.cancelled" → nowhere (error returned)
Topic
Wildcard pattern matching. * matches one word, # matches zero or more.
mgr.declareExchange("events", ExchangeType.TOPIC);
mgr.bind("events", "us-handler", "order.us.*");
mgr.bind("events", "all-errors", "#.error");
// "order.us.created" → us-handler
// "payment.processing.error" → all-errors
// "error" → all-errors (# matches zero words too)
The pattern matching uses a recursive algorithm that handles the # wildcard's zero-or-more semantics:
private static boolean matchParts(String[] routing, int ri, String[] pattern, int pi) {
if (ri == routing.length && pi == pattern.length) return true;
if (pi == pattern.length) return false;
if (pattern[pi].equals("#")) {
if (pi == pattern.length - 1) return true;
for (int i = ri; i <= routing.length; i++) {
if (matchParts(routing, i, pattern, pi + 1)) return true;
}
return false;
}
if (ri == routing.length) return false;
if (pattern[pi].equals("*") || pattern[pi].equals(routing[ri])) {
return matchParts(routing, ri + 1, pattern, pi + 1);
}
return false;
}
Fanout
Broadcast to all bound destinations. Routing key is ignored.
mgr.declareExchange("notifications", ExchangeType.FANOUT);
mgr.bind("notifications", "email-service", "");
mgr.bind("notifications", "sms-service", "");
mgr.bind("notifications", "push-service", "");
// Any message → all three services
Content-Based
This is where TitanMQ goes beyond RabbitMQ. Instead of matching routing keys, it evaluates predicates against message headers.
mgr.declareExchange("smart-router", ExchangeType.CONTENT_BASED);
mgr.bind("smart-router", "us-queue", "region=us");
mgr.bind("smart-router", "eu-priority", "region=eu,priority=high");
// Message with header region=us → us-queue
// Message with headers region=eu, priority=high → eu-priority
// Message with header region=eu, priority=low → nowhere
The wire protocol uses a simple key=value,key=value format for binding rules. But programmatically, you can pass arbitrary Predicate<Map<String, String>> lambdas:
ContentBasedExchange exchange = (ContentBasedExchange) mgr.getExchange("smart-router");
exchange.bind("vip-queue", headers ->
Integer.parseInt(headers.getOrDefault("priority", "0")) > 8 &&
"premium".equals(headers.get("tier"))
);
Integration into the Broker
The key change is in BrokerRequestHandler.handleProduce(). The original code was a straight write:
// Before: direct write, no routing
TitanMessage message = TitanMessage.deserialize(buffer);
long offset = topicManager.append(message);
The new code checks for an exchange first:
if (exchangeManager.hasExchange(message.topic())) {
String routingKey = message.headers().getOrDefault("routingKey", message.topic());
List<String> destinations = exchangeManager.route(message.topic(), message, routingKey);
for (String destTopic : destinations) {
TitanMessage routed = TitanMessage.builder()
.id(message.id())
.topic(destTopic)
.key(message.key())
.payload(message.payload())
.headers(message.headers())
.timestamp(message.timestamp())
.build();
topicManager.append(routed);
}
} else {
topicManager.append(message);
}
The routing key comes from the message header routingKey. If the header isn't set, the topic name itself is used as the routing key. This keeps the producer API simple — you don't need a separate routing key parameter.
Each destination topic gets its own copy of the message with the topic field rewritten. The message ID, key, payload, headers, and timestamp are preserved. This means consumers on different destination topics see the same message content.
Wire Protocol
Three new commands were added:
| Command | Code | Payload |
|---|---|---|
DECLARE_EXCHANGE |
0x24 |
[4B nameLen][NB name][1B type] |
BIND_EXCHANGE |
0x25 |
[4B exchangeLen][NB exchange][4B destLen][NB dest][4B keyLen][NB key] |
UNBIND_EXCHANGE |
0x26 |
Same as BIND |
The exchange type is encoded as a single byte: 0=DIRECT, 1=TOPIC, 2=FANOUT, 3=CONTENT_BASED.
All three commands return ADMIN_RESPONSE with a success byte on completion.
What This Enables
With the routing engine live, TitanMQ can now handle patterns that previously required either multiple topics or client-side filtering:
Event-driven microservices: Declare a topic exchange for domain events. Each service binds to the patterns it cares about. The order service gets order.*, the analytics service gets # (everything), the error handler gets #.error.
Multi-region routing: Declare a content-based exchange. Bind regional queues based on the region header. Messages are routed at the broker — no wasted bandwidth sending US events to EU consumers.
Notification fanout: Declare a fanout exchange. Every notification channel (email, SMS, push) gets every message. Add a new channel by adding a binding — no producer changes needed.
Priority routing: Use content-based exchange with priority header predicates. High-priority messages go to a fast-path queue with more consumer instances. Low-priority messages go to a batch queue.
All of this on top of Kafka-style append-only logs with offset tracking, consumer groups, and replay capability. The routing happens at write time, so consumers still get the full commit log semantics — they can seek, replay, and track offsets on their destination topics.
Trade-offs
Routing adds a small amount of work per PRODUCE request: one ConcurrentHashMap.get() to check if an exchange exists, plus the routing logic itself. For direct and fanout exchanges, this is O(1). For topic exchanges, it's O(n) where n is the number of bindings (pattern matching against each binding). For content-based exchanges, it's O(n) predicate evaluations.
Messages routed to multiple destinations are written multiple times — once per destination topic. This increases disk I/O proportionally. If a fanout exchange has 10 bindings, each message is written 10 times. This is the same trade-off RabbitMQ makes, and it's the correct one: consumers on different topics need independent offset tracking and retention.
The routing engine is entirely in-memory. Exchange declarations and bindings are not persisted to disk yet. A broker restart loses all exchange configuration. This is a known limitation tracked in a separate issue — the fix will persist exchange state to the Raft log so it survives restarts and is replicated across the cluster.
GitHub: https://github.com/iwtxokhtd83/TitanMQ
Release: v1.2.0
Issue: #10