The Concurrency Problem
DailyWatch processes video metadata from 8 regions. Each region returns up to 50 videos, and each video may need thumbnail validation, duration parsing, and category enrichment. Processing 400 videos sequentially takes over 30 seconds. With Go's goroutines and channels, we brought that down to under 4 seconds.
Worker Pool Pattern
The worker pool is the most practical concurrency pattern for batch processing:
package pipeline
import (
"context"
"fmt"
"sync"
)
type Video struct {
ID string
Title string
Region string
Views int64
ThumbURL string
Valid bool
}
type Result struct {
Video Video
Err error
}
func ProcessVideos(ctx context.Context, videos []Video, workers int) []Result {
jobs := make(chan Video, len(videos))
results := make(chan Result, len(videos))
// Start workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for video := range jobs {
select {
case <-ctx.Done():
return
default:
processed, err := processOne(video)
results <- Result{Video: processed, Err: err}
}
}
}(i)
}
// Send jobs
for _, v := range videos {
jobs <- v
}
close(jobs)
// Wait for completion and close results
go func() {
wg.Wait()
close(results)
}()
// Collect results
var out []Result
for r := range results {
out = append(out, r)
}
return out
}
func processOne(v Video) (Video, error) {
// Validate thumbnail URL
if v.ThumbURL == "" {
v.ThumbURL = fmt.Sprintf("https://i.ytimg.com/vi/%s/mqdefault.jpg", v.ID)
}
v.Valid = true
return v, nil
}
The jobs channel distributes work. Workers pull from it concurrently. The results channel collects output. The sync.WaitGroup ensures we wait for all workers before reading results. Context cancellation lets us abort early if needed.
Rate-Limited API Fetcher
When fetching from external APIs, you need to control the request rate. A ticker-based rate limiter works well:
package fetcher
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
type RegionFetcher struct {
apiKey string
client *http.Client
rateLimit time.Duration
}
func NewRegionFetcher(apiKey string, requestsPerSecond int) *RegionFetcher {
return &RegionFetcher{
apiKey: apiKey,
client: &http.Client{Timeout: 10 * time.Second},
rateLimit: time.Second / time.Duration(requestsPerSecond),
}
}
type FetchResult struct {
Region string
Videos []Video
Err error
}
func (f *RegionFetcher) FetchAll(ctx context.Context, regions []string) []FetchResult {
results := make(chan FetchResult, len(regions))
ticker := time.NewTicker(f.rateLimit)
defer ticker.Stop()
for _, region := range regions {
<-ticker.C // Wait for rate limit
go func(r string) {
videos, err := f.fetchRegion(ctx, r)
results <- FetchResult{Region: r, Videos: videos, Err: err}
}(region)
}
var out []FetchResult
for i := 0; i < len(regions); i++ {
select {
case r := <-results:
out = append(out, r)
fmt.Printf("[%s] fetched %d videos\n", r.Region, len(r.Videos))
case <-ctx.Done():
return out
}
}
return out
}
func (f *RegionFetcher) fetchRegion(ctx context.Context, region string) ([]Video, error) {
url := fmt.Sprintf(
"https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics&chart=mostPopular®ionCode=%s&maxResults=50&key=%s",
region, f.apiKey,
)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := f.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("API returned %d for region %s", resp.StatusCode, region)
}
var data struct {
Items []struct {
ID string `json:"id"`
Snippet struct {
Title string `json:"title"`
Channel string `json:"channelTitle"`
} `json:"snippet"`
Statistics struct {
Views string `json:"viewCount"`
} `json:"statistics"`
} `json:"items"`
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
videos := make([]Video, 0, len(data.Items))
for _, item := range data.Items {
videos = append(videos, Video{
ID: item.ID,
Title: item.Snippet.Title,
Region: region,
})
}
return videos, nil
}
The ticker ensures we never exceed the rate limit. Each region fetch launches a goroutine, but the ticker gates how quickly they start. The http.NewRequestWithContext ensures requests cancel if the parent context expires.
Error Handling with errgroup
For cleaner error propagation, use golang.org/x/sync/errgroup:
import "golang.org/x/sync/errgroup"
func FetchWithErrGroup(ctx context.Context, regions []string, apiKey string) (map[string][]Video, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(3) // Max 3 concurrent goroutines
mu := sync.Mutex{}
results := make(map[string][]Video)
fetcher := NewRegionFetcher(apiKey, 5)
for _, region := range regions {
r := region
g.Go(func() error {
videos, err := fetcher.fetchRegion(ctx, r)
if err != nil {
return fmt.Errorf("region %s: %w", r, err)
}
mu.Lock()
results[r] = videos
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return results, err // Return partial results + error
}
return results, nil
}
errgroup.SetLimit(3) is like Python's semaphore — at most 3 goroutines run simultaneously. If any returns an error, the context cancels remaining work.
Performance Results
Processing 400 videos across 8 regions for DailyWatch:
| Approach | Time | Memory |
|---|---|---|
| Sequential | 32s | 12MB |
| Goroutines (unlimited) | 2.1s | 18MB |
| Worker pool (8 workers) | 3.8s | 14MB |
| errgroup (limit 3) | 5.2s | 13MB |
The worker pool with 8 workers (one per region) gives the best balance of speed and resource usage. The errgroup approach is slightly slower but provides cleaner error handling.
Go's goroutines are lightweight — each uses about 4KB of stack initially, growing as needed. Spawning thousands is routine. The channel-based communication keeps data races at bay without complex locking.
This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.