Why Async Matters for Video Metadata
When DailyWatch fetches trending videos from 8 regions, each API call takes 200-500ms due to network latency. Done sequentially, that is 1.6-4 seconds just for the fetches. With asyncio, all 8 requests fire simultaneously and complete in the time of the slowest single request.
The Basic Pattern
Here is the simplest async fetch pattern:
import asyncio
import aiohttp
from dataclasses import dataclass
@dataclass
class VideoResult:
video_id: str
title: str
views: int
region: str
async def fetch_trending(session: aiohttp.ClientSession, region: str, api_key: str) -> list[VideoResult]:
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,statistics",
"chart": "mostPopular",
"regionCode": region,
"maxResults": 50,
"key": api_key,
}
async with session.get(url, params=params) as resp:
data = await resp.json()
return [
VideoResult(
video_id=item["id"],
title=item["snippet"]["title"],
views=int(item["statistics"].get("viewCount", 0)),
region=region,
)
for item in data.get("items", [])
]
async def main():
regions = ["US", "GB", "DE", "FR", "IN", "BR", "AU", "CA"]
api_key = "YOUR_API_KEY"
async with aiohttp.ClientSession() as session:
tasks = [fetch_trending(session, r, api_key) for r in regions]
results = await asyncio.gather(*tasks, return_exceptions=True)
for region, result in zip(regions, results):
if isinstance(result, Exception):
print(f"[{region}] Error: {result}")
else:
print(f"[{region}] Fetched {len(result)} videos")
asyncio.run(main())
asyncio.gather runs all tasks concurrently. The return_exceptions=True flag prevents one failed region from killing the entire batch.
Adding Rate Limiting with a Semaphore
APIs have rate limits. A semaphore controls how many requests run simultaneously:
import asyncio
import aiohttp
from asyncio import Semaphore
class RateLimitedFetcher:
def __init__(self, api_key: str, max_concurrent: int = 3):
self.api_key = api_key
self.semaphore = Semaphore(max_concurrent)
self.request_count = 0
async def fetch_region(self, session: aiohttp.ClientSession, region: str) -> list[dict]:
async with self.semaphore:
self.request_count += 1
url = "https://www.googleapis.com/youtube/v3/videos"
params = {
"part": "snippet,statistics,contentDetails",
"chart": "mostPopular",
"regionCode": region,
"maxResults": 50,
"key": self.api_key,
}
try:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 429:
retry_after = int(resp.headers.get("Retry-After", 5))
print(f"[{region}] Rate limited, waiting {retry_after}s")
await asyncio.sleep(retry_after)
return await self.fetch_region(session, region)
resp.raise_for_status()
data = await resp.json()
return data.get("items", [])
except asyncio.TimeoutError:
print(f"[{region}] Timeout after 10s")
return []
except aiohttp.ClientError as e:
print(f"[{region}] HTTP error: {e}")
return []
async def fetch_all_regions(self, regions: list[str]) -> dict[str, list]:
async with aiohttp.ClientSession() as session:
tasks = {r: self.fetch_region(session, r) for r in regions}
results = {}
for region, coro in tasks.items():
results[region] = await coro
return results
With max_concurrent=3, at most 3 API calls run at once. The 429 handler automatically retries after the server-specified delay.
Building a Processing Pipeline
Fetching is only step one. We also need to normalize, deduplicate, and store. Here is a pipeline with separate stages:
import asyncio
from collections import defaultdict
async def normalize(raw_items: list[dict], region: str) -> list[dict]:
"""Transform API response into our internal format."""
normalized = []
for item in raw_items:
snippet = item.get("snippet", {})
stats = item.get("statistics", {})
normalized.append({
"video_id": item["id"],
"title": snippet.get("title", ""),
"channel": snippet.get("channelTitle", ""),
"views": int(stats.get("viewCount", 0)),
"likes": int(stats.get("likeCount", 0)),
"region": region,
"category_id": int(snippet.get("categoryId", 0)),
})
return normalized
async def deduplicate(all_videos: list[dict]) -> list[dict]:
"""Merge duplicate videos from different regions."""
seen: dict[str, dict] = {}
for video in all_videos:
vid = video["video_id"]
if vid in seen:
seen[vid]["regions"].append(video["region"])
seen[vid]["views"] = max(seen[vid]["views"], video["views"])
else:
video["regions"] = [video["region"]]
seen[vid] = video
return list(seen.values())
async def run_pipeline(api_key: str):
regions = ["US", "GB", "DE", "FR", "IN", "BR", "AU", "CA"]
fetcher = RateLimitedFetcher(api_key, max_concurrent=3)
# Stage 1: Fetch
raw_results = await fetcher.fetch_all_regions(regions)
print(f"Fetched from {len(raw_results)} regions, {fetcher.request_count} API calls")
# Stage 2: Normalize (parallel per region)
normalize_tasks = [
normalize(items, region) for region, items in raw_results.items()
]
normalized_batches = await asyncio.gather(*normalize_tasks)
# Stage 3: Flatten and deduplicate
all_videos = [v for batch in normalized_batches for v in batch]
unique_videos = await deduplicate(all_videos)
print(f"Total: {len(all_videos)} raw, {len(unique_videos)} after dedup")
return unique_videos
Each stage is async-compatible. The normalization runs in parallel across regions with asyncio.gather, and the deduplication merges region tags for videos trending in multiple countries.
Error Handling Patterns
In production at DailyWatch, we wrap the entire pipeline with structured error handling:
async def safe_pipeline(api_key: str) -> dict:
stats = {"fetched": 0, "errors": [], "deduplicated": 0}
try:
videos = await run_pipeline(api_key)
stats["fetched"] = len(videos)
stats["deduplicated"] = len(videos)
except Exception as e:
stats["errors"].append(str(e))
return stats
The key insight is using return_exceptions=True in gather calls so one region failure does not abort the others, and logging errors per-region for debugging.
Performance Comparison
On an 8-region fetch for DailyWatch:
| Approach | Time |
|---|---|
| Sequential (for loop) | 3.2s |
| asyncio.gather (unlimited) | 0.5s |
| asyncio + semaphore(3) | 0.9s |
The semaphore approach is the sweet spot — 3.5x faster than sequential while respecting rate limits.
This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.