Building a resilient microservice cache with optimistic locking and event-driven invalidation

typescript dev.to

Building a resilient microservice cache with optimistic locking and event-driven invalidation

Building a resilient microservice cache with optimistic locking and event-driven invalidation

Caching is essential for performance in distributed systems, but it becomes tricky when you have multiple services mutating shared data. This tutorial walks you through designing and implementing a resilient, scalable cache layer for a microservice architecture. You’ll learn how to combine optimistic locking, versioned cache entries, and event-driven invalidation to keep cache correctness high without sacrificing throughput.

What you’ll build

  • A simple user profile service with a read-heavy cache path
  • A cache store that supports optimistic locking using version stamps
  • An event bus (simulated) to propagate invalidation events
  • A small service that consumes events to invalidate or refresh cached data
  • A tested, runnable example in Python, with optional TypeScript typings for a frontend consumer

Prerequisites

  • Basic familiarity with HTTP APIs, RESTful design, and eventual consistency concepts
  • Understanding of caching (TTL, eviction) and cache coherence challenges
  • Python 3.8+ or your preferred modern runtime
  • Optional: Node.js for a frontend example

High-level design

  • Read path: check cache for a recent version tag. If present and fresh, return cached data. If not, fetch from the source, validate that the data hasn’t changed (via a version field), and update the cache with a new version.
  • Write path: when mutating data, perform an atomic update with a version increment. If the cache entry’s version is stale, reject or retry with fresh data.
  • Invalidation path: when a write occurs, publish an invalidation event with the affected keys and new versions. Listeners refresh or clear stale cache entries.
  • Concurrency control: use optimistic locking via a version field in the data model. This minimizes lock contention while preserving correctness.
  • Consistency model: read-through cache with versioned entries; eventual consistency guaranteed via invalidation events.

Code overview

  • cache.py: a minimal in-memory cache with version stamping and optimistic checks
  • store.py: the canonical data store (simulated in-memory) with versioned records
  • event_bus.py: a simple publish/subscribe mechanism to simulate events
  • service.py: the user profile service exposing get and update endpoints with cache integration
  • test_cli.py: simple tests to exercise read/write paths and invalidation

Step 1: The data model and store

  • Data model includes id, name, email, and version (integer)
  • The store supports get_by_id, update_with_optimistic_lock, and create
  • Version increments on each successful update

Python implementation (store.py)

  • A thread-safe in-memory store using a dictionary and a Lock
  • get returns a snapshot of the data along with its version
  • update_with_optimistic_lock(id, expected_version, new_data) verifies version before applying

Code (store.py)

  • from threading import Lock
  • from typing import Optional, Dict, Any

class DataStore:
def init(self):
self._lock = Lock()
self._store: Dict[str, Dict[str, Any]] = {}

def create(self, id: str, data: Dict[str, Any]) -> Dict[str, Any]:
    with self._lock:
        if id in self._store:
            raise ValueError("Item already exists")
        item = data.copy()
        item["version"] = 1
        self._store[id] = item
        return item.copy()

def get_by_id(self, id: str) -> Optional[Dict[str, Any]]:
    with self._lock:
        item = self._store.get(id)
        return item.copy() if item else None

def update_with_optimistic_lock(self, id: str, expected_version: int, new_data: Dict[str, Any]) -> Dict[str, Any]:
    with self._lock:
        if id not in self._store:
            raise KeyError("Item not found")
        current = self._store[id]
        if current["version"] != expected_version:
            raise ValueError("Version mismatch")
        # Apply updates
        updated = current.copy()
        updated.update(new_data)
        updated["version"] = current["version"] + 1
        self._store[id] = updated
        return updated.copy()
Enter fullscreen mode Exit fullscreen mode

Step 2: The cache with optimistic locking

  • Cache entries store data and version
  • get: if entry present and version matches data version, return data; otherwise fetch from store
  • set: store data with its version
  • invalidate: clear or refresh on event

Code (cache.py)

  • from typing import Optional, Dict, Any
  • from time import time

class CacheEntry:
def init(self, value: Dict[str, Any], expires_at: float):
self.value = value
self.expires_at = expires_at

class VersionedCache:
def init(self, ttl_seconds: float = 60.0):
self._ttl = ttl_seconds
self._store: Dict[str, CacheEntry] = {}

def get(self, key: str) -> Optional[Dict[str, Any]]:
    ent = self._store.get(key)
    if not ent:
        return None
    if ent.expires_at < time():
        del self._store[key]
        return None
    return ent.value.copy()

def set(self, key: str, value: Dict[str, Any]) -> None:
    expires_at = time() + self._ttl
    self._store[key] = CacheEntry(value.copy(), expires_at)

def invalidate(self, key: str) -> None:
    if key in self._store:
        del self._store[key]
Enter fullscreen mode Exit fullscreen mode

Step 3: The event bus

  • Lightweight pub/sub to propagate invalidation events
  • Event includes type ("invalidate") and payload (key)

Code (event_bus.py)

  • from typing import Callable, Dict, List
  • class EventBus:
  • def init(self):
  •     self._subscribers: Dict[str, List[Callable[[Dict[str, Any]], None]]] = {}
    
  • def publish(self, event_type: str, payload: Dict[str, Any]) -> None:
    
  •     for cb in self._subscribers.get(event_type, []):
    
  •         cb(payload)
    
  • def subscribe(self, event_type: str, callback: Callable[[Dict[str, Any]], None]) -> None:
    
  •     self._subscribers.setdefault(event_type, []).append(callback)
    

Step 4: The service layer

  • get_profile(id): check cache; if miss, fetch from store, cache it
  • update_profile(id, partial): perform optimistic update on store; on success, publish invalidation event for that id
  • A simple CLI HTTP-like interface is optional; here we’ll use direct function calls to keep focused

Code (service.py)

  • from store import DataStore
  • from cache import VersionedCache
  • from event_bus import EventBus

class UserService:
def init(self, store: DataStore, cache: VersionedCache, bus: EventBus, ttl: float = 60.0):
self.store = store
self.cache = cache
self.bus = bus
self.ttl = ttl
self.bus.subscribe("invalidate_profile", self._handle_invalidation)

def _handle_invalidation(self, payload: Dict[str, Any]) -> None:
    key = payload["id"]
    self.cache.invalidate(key)

def get_profile(self, id: str) -> Dict[str, Any]:
    cached = self.cache.get(id)
    if cached:
        return cached
    # fetch from store
    item = self.store.get_by_id(id)
    if item is None:
        raise KeyError("Profile not found")
    self.cache.set(id, item)
    return item

def update_profile(self, id: str, updates: Dict[str, Any]) -> Dict[str, Any]:
    # First, fetch current version to use as optimistic lock
    current = self.store.get_by_id(id)
    if current is None:
        raise KeyError("Profile not found")
    expected_version = current["version"]
    updated = self.store.update_with_optimistic_lock(id, expected_version, updates)
    # Publish invalidation so other caches refresh
    self.bus.publish("invalidate_profile", {"id": id})
    return updated
Enter fullscreen mode Exit fullscreen mode

Step 5: Wiring it together (example usage)

  • Initialize store, cache, and event bus
  • Create a sample profile
  • Retrieve to populate cache
  • Update profile, observe cache invalidation and refresh on next read

Example runner (main.py)

  • from store import DataStore
  • from cache import VersionedCache
  • from event_bus import EventBus
  • from service import UserService

def main():
store = DataStore()
bus = EventBus()
cache = VersionedCache(ttl_seconds=30)
svc = UserService(store, cache, bus)

# Create initial data
store.create("user-123", {"id": "user-123", "name": "Alex Doe", "email": "alex@example.com"})

# Read (populates cache)
print("Initial read:", svc.get_profile("user-123"))

# Read again (from cache)
print("Cached read:", svc.get_profile("user-123"))

# Update
updated = svc.update_profile("user-123", {"name": "Alexandra Doe"})
print("Updated:", updated)

# Read after update (cache should have been invalidated by event)
print("After invalidation read:", svc.get_profile("user-123"))
Enter fullscreen mode Exit fullscreen mode

if name == "main":
main()

Step 6: Testing the flow

  • Test scenarios:
    • Cache miss followed by successful store fetch
    • Concurrent-style update with version mismatch (simulate by mutating store outside cache)
    • Invalidation propagation triggers a refresh on next read
  • Simple pytest-style tests can be added to verify:
    • Version increments on update
    • Cache invalidation occurs after update
    • Retrieval path prefers cache when valid

Illustration: a quick mental model

  • Imagine your data as a library book copy with a version tag. The cache holds a stamped copy. If someone else updates the master copy, the stamp changes. The cache checks the stamp before using a copy. If the stamp mismatches, it fetches a fresh copy and updates its stamp. Invalidation events are like a librarian shouting “Schnell, replace this copy!” to all caches.

Practical tips and gotchas

  • Use a reliable store versioning scheme: ensure every write increments a monotonically increasing version to avoid stale reads.
  • Prefer read-through with explicit invalidation: it reduces the chance of stale reads while keeping write throughput high.
  • Tune TTL: longer TTL reduces cache churn but increases chances of slightly stale reads; shorter TTL increases freshness but more load on the store.
  • If you have multiple cache layers (edge, service, database), propagate invalidation across layers to minimize stale data windows.
  • Monitor cache miss rate and invalidation events to detect synchronization problems early.

Extensions you can add

  • Distributed cache backend (Redis, Memcached) with native Lua scripts for atomic operations and version checks
  • Event-driven architecture using a real message broker (Kafka, NATS) for cross-service invalidation
  • Frontend integration: a small TypeScript client that uses the same version semantics to decide when to refetch data

Sample TypeScript snippet for a frontend consumer (optional)

  • It calls an API to get profile data, which returns { id, name, email, version }
  • On cache miss, it fetches from API and caches with the provided version
  • On receiving a cache invalidation event via WebSocket or SSE, it invalidates local cache and fetches fresh data

TypeScript sketch (frontend-cache.ts)

  • type Profile = { id: string; name: string; email: string; version: number }

  • class FrontendCache {

  • private store: Map<string, Profile> = new Map()
    
  • private apiBase = "/api/profile"
    
  • async getProfile(id: string): Promise<Profile> {
    
  •     const cached = this.store.get(id)
    
  •     if (cached) return cached
    
  •     const fresh = await fetch(`${this.apiBase}/${id}`).then(r => r.json())
    
  •     this.store.set(id, fresh)
    
  •     return fresh
    
  • }
    
  • invalidate(id: string): void {
    
  •     this.store.delete(id)
    
  • }
    
  • }

  • // WebSocket or SSE listener would call invalidate on relevant IDs

Conclusion
This tutorial presented a practical pattern for building a resilient microservice cache using optimistic locking and event-driven invalidation. The core ideas-versioned cache entries, a simple pub/sub invalidation mechanism, and a read-through cache path-help maintain correctness without sacrificing performance in a distributed setting. The provided Python scaffolding is a solid starting point; you can swap in real-world components (Redis, Kafka, gRPC) as needed to scale to production workloads.

Would you like me to adapt this into a runnable Git repository with tests and a docker-compose setup, or tailor the example to a specific tech stack you’re using (e.g., Python FastAPI with Redis, or Node.js with Redis and Kafka)?

-

Rizwan Saleem | https://rizwansaleem.co

Source: dev.to

arrow_back Back to Tutorials