Concurrent Video Processing with Go Goroutines

go dev.to

The Concurrency Problem

DailyWatch processes video metadata from 8 regions. Each region returns up to 50 videos, and each video may need thumbnail validation, duration parsing, and category enrichment. Processing 400 videos sequentially takes over 30 seconds. With Go's goroutines and channels, we brought that down to under 4 seconds.

Worker Pool Pattern

The worker pool is the most practical concurrency pattern for batch processing:

package pipeline

import (
    "context"
    "fmt"
    "sync"
)

type Video struct {
    ID       string
    Title    string
    Region   string
    Views    int64
    ThumbURL string
    Valid    bool
}

type Result struct {
    Video Video
    Err   error
}

func ProcessVideos(ctx context.Context, videos []Video, workers int) []Result {
    jobs := make(chan Video, len(videos))
    results := make(chan Result, len(videos))

    // Start workers
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for video := range jobs {
                select {
                case <-ctx.Done():
                    return
                default:
                    processed, err := processOne(video)
                    results <- Result{Video: processed, Err: err}
                }
            }
        }(i)
    }

    // Send jobs
    for _, v := range videos {
        jobs <- v
    }
    close(jobs)

    // Wait for completion and close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    var out []Result
    for r := range results {
        out = append(out, r)
    }
    return out
}

func processOne(v Video) (Video, error) {
    // Validate thumbnail URL
    if v.ThumbURL == "" {
        v.ThumbURL = fmt.Sprintf("https://i.ytimg.com/vi/%s/mqdefault.jpg", v.ID)
    }
    v.Valid = true
    return v, nil
}
Enter fullscreen mode Exit fullscreen mode

The jobs channel distributes work. Workers pull from it concurrently. The results channel collects output. The sync.WaitGroup ensures we wait for all workers before reading results. Context cancellation lets us abort early if needed.

Rate-Limited API Fetcher

When fetching from external APIs, you need to control the request rate. A ticker-based rate limiter works well:

package fetcher

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type RegionFetcher struct {
    apiKey     string
    client     *http.Client
    rateLimit  time.Duration
}

func NewRegionFetcher(apiKey string, requestsPerSecond int) *RegionFetcher {
    return &RegionFetcher{
        apiKey:    apiKey,
        client:    &http.Client{Timeout: 10 * time.Second},
        rateLimit: time.Second / time.Duration(requestsPerSecond),
    }
}

type FetchResult struct {
    Region string
    Videos []Video
    Err    error
}

func (f *RegionFetcher) FetchAll(ctx context.Context, regions []string) []FetchResult {
    results := make(chan FetchResult, len(regions))
    ticker := time.NewTicker(f.rateLimit)
    defer ticker.Stop()

    for _, region := range regions {
        <-ticker.C // Wait for rate limit
        go func(r string) {
            videos, err := f.fetchRegion(ctx, r)
            results <- FetchResult{Region: r, Videos: videos, Err: err}
        }(region)
    }

    var out []FetchResult
    for i := 0; i < len(regions); i++ {
        select {
        case r := <-results:
            out = append(out, r)
            fmt.Printf("[%s] fetched %d videos\n", r.Region, len(r.Videos))
        case <-ctx.Done():
            return out
        }
    }
    return out
}

func (f *RegionFetcher) fetchRegion(ctx context.Context, region string) ([]Video, error) {
    url := fmt.Sprintf(
        "https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics&chart=mostPopular&regionCode=%s&maxResults=50&key=%s",
        region, f.apiKey,
    )
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }
    resp, err := f.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != 200 {
        return nil, fmt.Errorf("API returned %d for region %s", resp.StatusCode, region)
    }

    var data struct {
        Items []struct {
            ID      string `json:"id"`
            Snippet struct {
                Title   string `json:"title"`
                Channel string `json:"channelTitle"`
            } `json:"snippet"`
            Statistics struct {
                Views string `json:"viewCount"`
            } `json:"statistics"`
        } `json:"items"`
    }
    if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
        return nil, err
    }

    videos := make([]Video, 0, len(data.Items))
    for _, item := range data.Items {
        videos = append(videos, Video{
            ID:     item.ID,
            Title:  item.Snippet.Title,
            Region: region,
        })
    }
    return videos, nil
}
Enter fullscreen mode Exit fullscreen mode

The ticker ensures we never exceed the rate limit. Each region fetch launches a goroutine, but the ticker gates how quickly they start. The http.NewRequestWithContext ensures requests cancel if the parent context expires.

Error Handling with errgroup

For cleaner error propagation, use golang.org/x/sync/errgroup:

import "golang.org/x/sync/errgroup"

func FetchWithErrGroup(ctx context.Context, regions []string, apiKey string) (map[string][]Video, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(3) // Max 3 concurrent goroutines

    mu := sync.Mutex{}
    results := make(map[string][]Video)
    fetcher := NewRegionFetcher(apiKey, 5)

    for _, region := range regions {
        r := region
        g.Go(func() error {
            videos, err := fetcher.fetchRegion(ctx, r)
            if err != nil {
                return fmt.Errorf("region %s: %w", r, err)
            }
            mu.Lock()
            results[r] = videos
            mu.Unlock()
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return results, err // Return partial results + error
    }
    return results, nil
}
Enter fullscreen mode Exit fullscreen mode

errgroup.SetLimit(3) is like Python's semaphore — at most 3 goroutines run simultaneously. If any returns an error, the context cancels remaining work.

Performance Results

Processing 400 videos across 8 regions for DailyWatch:

Approach Time Memory
Sequential 32s 12MB
Goroutines (unlimited) 2.1s 18MB
Worker pool (8 workers) 3.8s 14MB
errgroup (limit 3) 5.2s 13MB

The worker pool with 8 workers (one per region) gives the best balance of speed and resource usage. The errgroup approach is slightly slower but provides cleaner error handling.

Go's goroutines are lightweight — each uses about 4KB of stack initially, growing as needed. Spawning thousands is routine. The channel-based communication keeps data races at bay without complex locking.


This article is part of the Building DailyWatch series. Check out DailyWatch to see these techniques in action.

Source: dev.to

arrow_back Back to Tutorials