Async Video Processing Pipeline with Python asyncio

python dev.to

Why Async Matters for Video Metadata

When DailyWatch fetches trending videos from 8 regions, each API call takes 200-500ms due to network latency. Done sequentially, that is 1.6-4 seconds just for the fetches. With asyncio, all 8 requests fire simultaneously and complete in the time of the slowest single request.

The Basic Pattern

Here is the simplest async fetch pattern:

import asyncio
import aiohttp
from dataclasses import dataclass

@dataclass
class VideoResult:
    video_id: str
    title: str
    views: int
    region: str

async def fetch_trending(session: aiohttp.ClientSession, region: str, api_key: str) -> list[VideoResult]:
    url = "https://www.googleapis.com/youtube/v3/videos"
    params = {
        "part": "snippet,statistics",
        "chart": "mostPopular",
        "regionCode": region,
        "maxResults": 50,
        "key": api_key,
    }
    async with session.get(url, params=params) as resp:
        data = await resp.json()
        return [
            VideoResult(
                video_id=item["id"],
                title=item["snippet"]["title"],
                views=int(item["statistics"].get("viewCount", 0)),
                region=region,
            )
            for item in data.get("items", [])
        ]

async def main():
    regions = ["US", "GB", "DE", "FR", "IN", "BR", "AU", "CA"]
    api_key = "YOUR_API_KEY"

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_trending(session, r, api_key) for r in regions]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    for region, result in zip(regions, results):
        if isinstance(result, Exception):
            print(f"[{region}] Error: {result}")
        else:
            print(f"[{region}] Fetched {len(result)} videos")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

asyncio.gather runs all tasks concurrently. The return_exceptions=True flag prevents one failed region from killing the entire batch.

Adding Rate Limiting with a Semaphore

APIs have rate limits. A semaphore controls how many requests run simultaneously:

import asyncio
import aiohttp
from asyncio import Semaphore

class RateLimitedFetcher:
    def __init__(self, api_key: str, max_concurrent: int = 3):
        self.api_key = api_key
        self.semaphore = Semaphore(max_concurrent)
        self.request_count = 0

    async def fetch_region(self, session: aiohttp.ClientSession, region: str) -> list[dict]:
        async with self.semaphore:
            self.request_count += 1
            url = "https://www.googleapis.com/youtube/v3/videos"
            params = {
                "part": "snippet,statistics,contentDetails",
                "chart": "mostPopular",
                "regionCode": region,
                "maxResults": 50,
                "key": self.api_key,
            }
            try:
                async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    if resp.status == 429:
                        retry_after = int(resp.headers.get("Retry-After", 5))
                        print(f"[{region}] Rate limited, waiting {retry_after}s")
                        await asyncio.sleep(retry_after)
                        return await self.fetch_region(session, region)
                    resp.raise_for_status()
                    data = await resp.json()
                    return data.get("items", [])
            except asyncio.TimeoutError:
                print(f"[{region}] Timeout after 10s")
                return []
            except aiohttp.ClientError as e:
                print(f"[{region}] HTTP error: {e}")
                return []

    async def fetch_all_regions(self, regions: list[str]) -> dict[str, list]:
        async with aiohttp.ClientSession() as session:
            tasks = {r: self.fetch_region(session, r) for r in regions}
            results = {}
            for region, coro in tasks.items():
                results[region] = await coro
            return results
Enter fullscreen mode Exit fullscreen mode

With max_concurrent=3, at most 3 API calls run at once. The 429 handler automatically retries after the server-specified delay.

Building a Processing Pipeline

Fetching is only step one. We also need to normalize, deduplicate, and store. Here is a pipeline with separate stages:

import asyncio
from collections import defaultdict

async def normalize(raw_items: list[dict], region: str) -> list[dict]:
    """Transform API response into our internal format."""
    normalized = []
    for item in raw_items:
        snippet = item.get("snippet", {})
        stats = item.get("statistics", {})
        normalized.append({
            "video_id": item["id"],
            "title": snippet.get("title", ""),
            "channel": snippet.get("channelTitle", ""),
            "views": int(stats.get("viewCount", 0)),
            "likes": int(stats.get("likeCount", 0)),
            "region": region,
            "category_id": int(snippet.get("categoryId", 0)),
        })
    return normalized

async def deduplicate(all_videos: list[dict]) -> list[dict]:
    """Merge duplicate videos from different regions."""
    seen: dict[str, dict] = {}
    for video in all_videos:
        vid = video["video_id"]
        if vid in seen:
            seen[vid]["regions"].append(video["region"])
            seen[vid]["views"] = max(seen[vid]["views"], video["views"])
        else:
            video["regions"] = [video["region"]]
            seen[vid] = video
    return list(seen.values())

async def run_pipeline(api_key: str):
    regions = ["US", "GB", "DE", "FR", "IN", "BR", "AU", "CA"]
    fetcher = RateLimitedFetcher(api_key, max_concurrent=3)

    # Stage 1: Fetch
    raw_results = await fetcher.fetch_all_regions(regions)
    print(f"Fetched from {len(raw_results)} regions, {fetcher.request_count} API calls")

    # Stage 2: Normalize (parallel per region)
    normalize_tasks = [
        normalize(items, region) for region, items in raw_results.items()
    ]
    normalized_batches = await asyncio.gather(*normalize_tasks)

    # Stage 3: Flatten and deduplicate
    all_videos = [v for batch in normalized_batches for v in batch]
    unique_videos = await deduplicate(all_videos)
    print(f"Total: {len(all_videos)} raw, {len(unique_videos)} after dedup")

    return unique_videos
Enter fullscreen mode Exit fullscreen mode

Each stage is async-compatible. The normalization runs in parallel across regions with asyncio.gather, and the deduplication merges region tags for videos trending in multiple countries.

Error Handling Patterns

In production at DailyWatch, we wrap the entire pipeline with structured error handling:

async def safe_pipeline(api_key: str) -> dict:
    stats = {"fetched": 0, "errors": [], "deduplicated": 0}
    try:
        videos = await run_pipeline(api_key)
        stats["fetched"] = len(videos)
        stats["deduplicated"] = len(videos)
    except Exception as e:
        stats["errors"].append(str(e))
    return stats
Enter fullscreen mode Exit fullscreen mode

The key insight is using return_exceptions=True in gather calls so one region failure does not abort the others, and logging errors per-region for debugging.

Performance Comparison

On an 8-region fetch for DailyWatch:

Approach Time
Sequential (for loop) 3.2s
asyncio.gather (unlimited) 0.5s
asyncio + semaphore(3) 0.9s

The semaphore approach is the sweet spot — 3.5x faster than sequential while respecting rate limits.


This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.

Source: dev.to

arrow_back Back to Tutorials