Every cron library I looked at pulled in a pile of dependencies for what is, fundamentally, a string-to-schedule conversion. So I decided to write one from scratch — and then kept going until I had a complete task scheduler.
CronLite is ~5,300 lines of Python. Zero external packages. Everything from the cron parser down to the SQLite storage layer uses nothing beyond the standard library.
What it does
- Full POSIX cron parsing (wildcards, ranges, steps, lists, month/day names, presets)
- Priority-queue scheduler with a fixed-size thread pool
- Retry strategies: fixed, linear, exponential backoff
- DAG job dependencies with cycle detection
- SQLite-backed persistence (WAL mode for concurrent reads)
- REST API (~15 endpoints) built on
http.server - CLI for everything: add, remove, trigger, explain, history, stats
The interesting part: next_run()
The most satisfying function to write was next_run(). Given a cron expression and a starting datetime, find the next moment that matches all five fields.
The naive approach — scan forward minute by minute — works but is painfully slow for something like 0 0 1 1 * (only runs once a year).
Instead, I walk fields from most-significant to least-significant. When a field doesn't match, I advance to the next matching value and reset all less-significant fields. Carries propagate naturally: if no minute matches in the current hour, advance the hour; if no hour matches today, advance the day.
def next_run(self, after: datetime) -> datetime | None:
candidate = _truncate_to_minute(after) + timedelta(minutes=1)
deadline = after + timedelta(days=_MAX_SEARCH_YEARS * 366)
while candidate <= deadline:
# Month
if not self.month.matches(candidate.month):
candidate = self._advance_month(candidate)
if candidate is None or candidate > deadline:
return None
continue
# Day (POSIX union semantics for DOM + DOW)
if not self._day_matches(candidate):
candidate = candidate.replace(hour=0, minute=0) + timedelta(days=1)
continue
# Hour
if not self.hour.matches(candidate.hour):
next_hour = self._next_value(candidate.hour, self.hour.values)
if next_hour is not None and next_hour > candidate.hour:
candidate = candidate.replace(hour=next_hour, minute=0)
if self.minute.matches(0):
return candidate
next_min = self._next_value(0, self.minute.values)
if next_min is not None:
return candidate.replace(minute=next_min)
candidate = candidate.replace(hour=0, minute=0) + timedelta(days=1)
continue
# Minute
if not self.minute.matches(candidate.minute):
next_min = self._next_value(candidate.minute, self.minute.values)
if next_min is not None and next_min > candidate.minute:
return candidate.replace(minute=next_min)
# No more minutes this hour — advance
next_hour = self._next_value(candidate.hour + 1, self.hour.values)
if next_hour is not None:
candidate = candidate.replace(
hour=next_hour, minute=min(self.minute.values)
)
continue
candidate = candidate.replace(hour=0, minute=0) + timedelta(days=1)
continue
return candidate
return None
The POSIX spec has a subtle rule: if both day-of-month and day-of-week are restricted (not wildcards), a day matches if either field matches — it's a union, not an intersection. Most tutorials get this wrong.
Retry engine
When a job fails, the retry policy decides what happens next. I implemented three strategies, each computing the delay before the next attempt:
class RetryPolicy:
def compute_delay(self, attempt: int) -> float:
if self.strategy == RetryStrategy.FIXED:
delay = self.base_delay
elif self.strategy == RetryStrategy.LINEAR:
delay = self.base_delay * attempt
elif self.strategy == RetryStrategy.EXPONENTIAL:
delay = self.base_delay * (2 ** (attempt - 1))
return min(delay, self.max_delay)
For a base delay of 10 seconds and exponential backoff: 10s → 20s → 40s → 80s → 160s, capped at whatever max_delay is set to. The cap prevents a job from waiting hours between retries after a dozen failures.
DAG dependencies
Jobs can declare dependencies. Before firing a job, the engine checks that all upstream jobs have completed successfully:
class JobDAG:
def add_dependency(self, job_id: str, depends_on: str) -> None:
# Check for cycles using DFS 3-color algorithm
self._edges[job_id].add(depends_on)
if self._has_cycle():
self._edges[job_id].discard(depends_on)
raise CyclicDependencyError(f"Adding {depends_on} → {job_id} creates a cycle")
Cycle detection uses the standard three-color DFS: white (unvisited), gray (in current path), black (fully explored). If we reach a gray node, there's a cycle. Topological sort uses Kahn's algorithm — start from nodes with zero in-degree, peel them off one at a time.
Storage backends
Both MemoryStore and SQLiteStore implement the same Store ABC, so swapping between them is a one-line change. The SQLite backend uses WAL journal mode for concurrent reads and parameterized queries throughout:
class SQLiteStore(Store):
def __init__(self, db_path: str):
self._conn = sqlite3.connect(db_path)
self._conn.execute("PRAGMA journal_mode=WAL")
self._create_tables()
def save_job(self, job: Job) -> None:
self._conn.execute(
"INSERT OR REPLACE INTO jobs VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
self._job_to_row(job),
)
self._conn.commit()
Running it
git clone https://github.com/hajirufai/cronlite.git
cd cronlite
# Explain a schedule
python -m cronlite explain "*/15 9-17 * * MON-FRI"
# → Every 15 minutes, between 09:00 and 17:59, on weekdays
# Add and trigger a job
python -m cronlite add backup "0 2 * * *" "pg_dump mydb > backup.sql"
python -m cronlite trigger backup
# Start the scheduler with API
python -m cronlite start --workers 4 --port 8080
Or use the HTTP API:
curl -X POST http://localhost:8080/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{
"name": "etl-pipeline",
"expression": "0 */6 * * *",
"command": "python run_etl.py",
"max_retries": 3,
"retry_strategy": "exponential"
}'
Numbers
- 22 source modules
- 219 passing tests
- 5 example scripts
- 0 external dependencies
- Python 3.11+ only
The test suite covers parsing edge cases (February 30, leap years, step patterns), executor behavior (timeouts, zombie processes, output truncation), storage persistence, DAG cycle detection, retry delay calculations, and full end-to-end scheduling scenarios.
Things I learned
POSIX cron semantics are full of edge cases. Day-of-month + day-of-week union caught me off guard. February 30 expressions parse successfully but next_run() correctly returns None within the search window.
Signal handlers only work in the main thread. The scheduler's graceful shutdown handler needs a try/except around signal.signal() for when it's used as a library in a non-main thread.
Python's or with containers is subtle. empty_list or default returns default because empty containers are falsy. Had to use explicit is not None checks when distinguishing between "no value" and "empty value."
SQLite WAL mode matters. Without it, concurrent reads during a write transaction block. With WAL, readers and writers don't interfere with each other.
Source: github.com/hajirufai/cronlite