Building Concurrency from Scratch: Channels, Thread Pools, and Parallel Iterators

rust dev.to

In the journey of creating a new programming language, there's a moment when the focus shifts from parsing syntax and generating code to breathing life into the runtime. For any modern language aspiring to relevance, this inevitably means tackling concurrency. It's not enough to simply support threads; a truly productive language provides developers with safe, efficient, and ergonomic tools to manage parallelism. This isn't just about adding features—it's about defining the language's philosophy on how complex problems should be solved.

Recently, I embarked on this very challenge for a personal project, a new systems language I call nexus-lang. Instead of relying on existing libraries, I chose to build the core concurrency primitives from the ground up. Why? To deeply understand the trade-offs and design decisions that shape a developer's experience. This article chronicles that journey, guiding you through the design and implementation of three fundamental pillars of concurrency: a robust thread pool, safe communication channels, and expressive parallel iterators. We'll explore the 'why' behind the architecture and dive into simplified Rust implementations that capture the core logic, offering lessons applicable to anyone building or working with low-level systems.

The Workhorse: Designing a Robust Thread Pool

At the heart of any scalable concurrency model lies a thread pool. Spawning a new operating system thread for every concurrent task is prohibitively expensive. Each thread consumes system resources for its stack and requires kernel-level context switching. A thread pool mitigates this by creating a fixed number of worker threads upon initialization and reusing them to execute tasks from a job queue. This amortizes the cost of thread creation and provides a natural mechanism for controlling the degree of parallelism, preventing system overload.

Core Components

A thread pool has three main components:

  1. Workers: A collection of long-lived threads waiting for work.
  2. Job Queue: A shared, thread-safe queue where tasks (often closures or function pointers) are submitted.
  3. Dispatcher: The public-facing API that allows code to submit jobs to the queue.

For our implementation, we'll use a channel as the job queue. A Multi-Producer, Single-Consumer (MPSC) channel is a perfect fit here. Multiple parts of the application can dispatch jobs (Producers), and each Worker thread acts as a Consumer, pulling one job at a time from the shared queue.

Rust Implementation

Let's start by defining the structure. We need a Job type, which will be a boxed closure that can be sent between threads. The ThreadPool will hold onto the JoinHandles for each worker thread.

use std::sync::{mpsc, Arc, Mutex};
use std::thread;

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);
                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);
                    break;
                }
            }
        });

        Worker { id, thread: Some(thread) }
    }
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In this implementation, the ThreadPool::new function initializes the workers, each with a shared reference to the receiving end of the channel. The execute method simply wraps the closure in our Job type and sends it down the channel. The real magic is in the Worker's loop and the Drop implementation for ThreadPool. Each worker blocks on receiver.recv(), waiting for a message. Upon receiving a NewJob, it executes it. The Drop implementation ensures a graceful shutdown by sending a Terminate message for each worker and then joining each thread, waiting for it to finish its current job and exit its loop.

The Lifeline: Building Channels for Safe Communication

While thread pools manage who does the work, channels manage the communication between them. The core principle, borrowed from Communicating Sequential Processes (CSP), is simple: "Do not communicate by sharing memory; instead, share memory by communicating." Channels provide a thread-safe conduit for sending data from one thread to another, preventing the race conditions and deadlocks that plague traditional lock-based concurrency.

Core Components

A basic MPSC channel consists of:

  1. A Shared Buffer: A queue (like VecDeque) to hold the data being sent.
  2. A Synchronization Primitive: A Mutex to ensure only one thread can access the buffer at a time, and a Condvar (Condition Variable) to allow the receiver to sleep when the buffer is empty and be woken up by the sender when data arrives.
  3. Sender (Tx) and Receiver (Rx): Smart pointer-like structs that provide the public API for sending and receiving data. They manage shared ownership of the channel's internal state via an Arc (Atomically Referenced Counter).

A Simplified Rust Implementation

Let's build a simplified channel to see these pieces in action.

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};

struct Shared<T> {
    queue: Mutex<VecDeque<T>>,
    cvar: Condvar,
}

pub struct Sender<T> {
    shared: Arc<Shared<T>>,
}

impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Sender { shared: Arc::clone(&self.shared) }
    }
}

impl<T> Sender<T> {
    pub fn send(&self, value: T) {
        let mut queue = self.shared.queue.lock().unwrap();
        queue.push_back(value);
        // Notify one waiting thread that there is new data
        self.shared.cvar.notify_one();
    }
}

pub struct Receiver<T> {
    shared: Arc<Shared<T>>,
}

impl<T> Receiver<T> {
    pub fn recv(&self) -> Option<T> {
        let mut queue = self.shared.queue.lock().unwrap();
        loop {
            match queue.pop_front() {
                Some(value) => return Some(value),
                None => {
                    // If there are no more senders, the channel is closed.
                    if Arc::strong_count(&self.shared) == 1 {
                        return None;
                    }
                    // Wait for a notification from a sender
                    queue = self.shared.cvar.wait(queue).unwrap();
                }
            }
        }
    }
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let shared = Arc::new(Shared {
        queue: Mutex::new(VecDeque::new()),
        cvar: Condvar::new(),
    });

    (   Sender { shared: Arc::clone(&shared) },
        Receiver { shared },
    )
}
Enter fullscreen mode Exit fullscreen mode

The channel() function is our entry point. It creates the shared state inside an Arc and distributes it to the Sender and Receiver. The Sender::send method locks the queue, pushes a value, and crucially, calls cvar.notify_one() to wake up the receiver if it's sleeping. The Receiver::recv method locks the queue and enters a loop. If a value exists, it returns it. If not, it uses cvar.wait(). This atomically unlocks the mutex and puts the thread to sleep until it's notified by a sender. Once woken, it re-acquires the lock and checks the queue again. We also check Arc::strong_count to detect when all senders have been dropped, allowing the receiver to stop waiting and terminate.

The Multiplier: Enabling Parallel Iteration

With a thread pool and channels, we have the building blocks for higher-level abstractions. One of the most powerful is the parallel iterator. The goal is to provide an API that feels as natural as a standard iterator but executes the work in parallel. A developer should be able to transform code like this:

let results = my_vector.iter().map(|x| compute(x)).collect();

Into this, with minimal changes:

let results = my_vector.par_iter().map(|x| compute(x)).collect();

This requires a way to split the data source into independent chunks, process each chunk on the thread pool, and then collect the results in the correct order.

Design Strategy

  1. Chunking: Divide the input collection into roughly equal-sized chunks, one for each worker thread in our pool.
  2. Dispatching: For each chunk, send a job to the thread pool. This job will execute the user-provided operation (e.g., the closure inside map) on every element in its assigned chunk.
  3. Collecting: Use channels to get the results back from the worker threads. Since the jobs may finish out of order, we need a way to reassemble the final collection correctly. We can do this by sending tuples (chunk_index, chunk_results) back to the main thread.

Let's imagine a par_map function that takes a slice, a thread pool, and a mapping function.

// Assuming the ThreadPool and channel from previous sections

pub fn par_map<T, R, F>(pool: &ThreadPool, data: &[T], f: F) -> Vec<R>
where
    T: Sync,
    R: Send + 'static,
    F: Fn(T) -> R + Send + Sync + 'static,
{
    let (tx, rx) = mpsc::channel();
    let num_items = data.len();
    let chunk_size = (num_items as f64 / pool.workers.len() as f64).ceil() as usize;
    let f = Arc::new(f);

    if num_items == 0 {
        return vec![];
    }

    let mut job_count = 0;
    for (chunk_index, chunk) in data.chunks(chunk_size).enumerate() {
        let tx_clone = tx.clone();
        let f_clone = Arc::clone(&f);
        // We need to own the data for the thread
        let chunk_data: Vec<T> = chunk.to_vec(); // Simplified for clarity; `Arc<[T]>` is better

        job_count += 1;
        pool.execute(move || {
            let results: Vec<R> = chunk_data.into_iter().map(|item| f_clone(item)).collect();
            tx_clone.send((chunk_index, results)).unwrap();
        });
    }

    // Drop the original sender so the receiver knows when all jobs are done
    drop(tx);

    let mut results_map: std::collections::HashMap<usize, Vec<R>> = std::collections::HashMap::new();
    for (chunk_index, chunk_results) in rx {
        results_map.insert(chunk_index, chunk_results);
    }

    let mut final_results = Vec::with_capacity(num_items);
    for i in 0..job_count {
        if let Some(mut chunk_results) = results_map.remove(&i) {
            final_results.append(&mut chunk_results);
        }
    }

    final_results
}
Enter fullscreen mode Exit fullscreen mode

This function demonstrates the core pattern. It chunks the data, dispatches jobs to the pool, and then collects results from a channel. Storing results in a hash map indexed by chunk_index allows us to reassemble the final vector in the correct order, regardless of which thread finished first. A production-grade library like Rayon uses more sophisticated techniques like work-stealing for better load balancing, but this captures the fundamental logic.

A Python Perspective: Abstractions We Know and Love

While we've been deep in the weeds with Rust, it's enlightening to see how these same concepts manifest in higher-level languages like Python. Understanding the low-level mechanics gives us a profound appreciation for the convenient abstractions we often take for granted.

Python's concurrent.futures module provides a high-level ThreadPoolExecutor that elegantly hides the complexity of managing workers and job queues.

import concurrent.futures
import time

def compute_task(value):
    """A simple task that simulates some work."""
    print(f"Processing value: {value}")
    time.sleep(1)
    return value * value

values = [1, 2, 3, 4, 5, 6, 7, 8]

# The ThreadPoolExecutor is our ThreadPool
# The `map` method is our Parallel Iterator abstraction
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # `executor.map` handles chunking, dispatching, and collecting for us.
    results = list(executor.map(compute_task, values))

print(f"Final results: {results}")

# Output (order of processing may vary):
# Processing value: 1
# Processing value: 2
# Processing value: 3
# Processing value: 4
# Processing value: 5
# Processing value: 6
# Processing value: 7
# Processing value: 8
# Final results: [1, 4, 9, 16, 25, 36, 49, 64]
Enter fullscreen mode Exit fullscreen mode

Here, ThreadPoolExecutor is our ThreadPool. The executor.map function is a beautiful abstraction over the par_map logic we built manually. It handles data chunking, job dispatch, and result reordering transparently. Similarly, Python's queue.Queue class is a thread-safe implementation of the channel concept, perfect for custom inter-thread communication.

Seeing this Python code after building the primitives in Rust is illuminating. We now know that under the hood, ThreadPoolExecutor is managing a set of persistent threads and an internal queue, and map is performing the complex dance of dispatch and collection we implemented ourselves. This deeper understanding makes us better engineers, even when we're operating at a higher level of abstraction.

Conclusion

Building concurrency primitives from scratch is a formidable but incredibly rewarding endeavor. It forces a deep engagement with the fundamental challenges of parallelism: resource management, safe state sharing, and ergonomic API design. Our journey through building a thread pool, channels, and a parallel map function reveals a clear pattern: start with a simple, robust primitive (the thread pool), build a safe communication mechanism on top of it (channels), and then use those building blocks to create powerful, high-level abstractions (parallel iterators).

The key takeaways are universal:

  • Amortize Costs: Use pools for expensive resources like threads.
  • Communicate, Don't Share: Prefer message passing via channels over direct memory access with locks to avoid complex synchronization bugs.
  • Build Abstractions: Layer high-level, ergonomic APIs on top of low-level primitives to empower developers and reduce boilerplate.

Whether you're building a new programming language or simply want to become a more effective concurrent programmer, understanding what lies beneath the abstractions you use every day is a step toward mastery.

Source: dev.to

arrow_back Back to Tutorials