Background Tasks¶
Run long-running or scheduled work outside the request cycle using Vibetuner's background task system, powered by Streaq and Redis.
Package name convention
In all examples, app refers to your project's Python package (the directory
under src/). The actual name depends on your project slug (e.g.,
src/myproject/ for a project named "myproject").
Prerequisites¶
Background tasks require Redis. Set the REDIS_URL environment variable in
your .env file:
When using Docker development (just dev), Redis is started automatically.
For local development, install and start Redis separately or use the Docker
Compose services.
Quick Start¶
1. Create a Task¶
Create a task file in src/app/tasks/:
# src/app/tasks/emails.py
from vibetuner.tasks.worker import get_worker
worker = get_worker()
@worker.task()
async def send_welcome_email(user_id: str) -> dict[str, str]:
"""Send a welcome email to a newly registered user."""
from vibetuner.models import UserModel
from vibetuner.services.email import send_email
user = await UserModel.get(user_id)
if user:
await send_email(
to_email=user.email,
subject="Welcome!",
html_content="<h1>Welcome!</h1>",
)
return {"status": "sent", "email": user.email}
return {"status": "skipped"}
2. Register the Task¶
List the task function in your tune.py:
# src/app/tune.py
from vibetuner import VibetunerApp
from app.tasks.emails import send_welcome_email
app = VibetunerApp(
tasks=[send_welcome_email],
)
3. Enqueue from a Route¶
# src/app/frontend/routes/auth.py
from app.tasks.emails import send_welcome_email
@router.post("/signup")
async def signup(email: str):
user = await create_user(email)
task = await send_welcome_email.enqueue(str(user.id))
return {"message": "Welcome email queued", "task_id": task.id}
4. Start the Worker¶
# Recommended: using justfile
just worker-dev
# Or manually:
uv run vibetuner run dev worker
# With Docker, Redis + app start automatically:
just dev
The worker picks up queued tasks and executes them in the background.
Creating Tasks¶
Basic Task¶
Every task is an async function decorated with @worker.task():
# src/app/tasks/reports.py
from vibetuner.tasks.worker import get_worker
worker = get_worker()
@worker.task()
async def generate_report(account_id: str) -> dict:
"""Generate a monthly report for an account."""
# ... long-running work ...
return {"status": "complete", "account_id": account_id}
Task with Timeout¶
Set a maximum execution time for a task:
from datetime import timedelta
@worker.task(timeout=timedelta(minutes=5))
async def process_large_file(file_id: str) -> dict:
# Aborted if it takes longer than 5 minutes
...
Task Organization¶
Group related tasks in separate modules under src/app/tasks/:
src/app/tasks/
├── __init__.py
├── emails.py # Email-related tasks
├── reports.py # Report generation
└── notifications.py # Push notifications
Register all task functions in tune.py:
# src/app/tune.py
from vibetuner import VibetunerApp
from app.tasks.emails import send_welcome_email, send_digest
from app.tasks.reports import generate_report
from app.tasks.notifications import send_push
app = VibetunerApp(
tasks=[send_welcome_email, send_digest, generate_report, send_push],
)
Worker Dependency Injection¶
Use WorkerDepends() from Streaq to inject the worker context into your tasks.
The context provides access to shared resources initialized during the worker
lifespan (such as HTTP clients and database connections):
from streaq import WorkerDepends
from vibetuner.tasks.worker import get_worker
worker = get_worker()
@worker.task()
async def fetch_external_data(url: str, ctx=WorkerDepends()) -> dict:
"""Fetch data using the shared HTTP client from worker context."""
response = await ctx.http_client.get(url)
return {"status": response.status_code, "data": response.text[:100]}
The ctx parameter receives the context object yielded by the worker lifespan.
By default, this is a Context instance with project metadata. You can extend
it with a custom worker lifespan (see Worker Lifecycle).
The @robust_task() Decorator¶
For tasks that need automatic retries and dead letter tracking, use the
@robust_task() decorator instead of @worker.task():
from vibetuner.tasks.robust import robust_task
@robust_task(max_retries=5, backoff_max=600)
async def send_webhook(payload: dict) -> dict:
"""Send a webhook with automatic retries on failure."""
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://example.com/hook", json=payload)
resp.raise_for_status()
return {"status": "delivered"}
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_retries |
int |
3 |
Maximum attempts before giving up |
backoff_base |
float |
2.0 |
Base for exponential backoff (delay = base ** tries) |
backoff_max |
float |
300.0 |
Maximum backoff delay in seconds |
timeout |
timedelta \| int \| None |
None |
Task timeout (forwarded to worker.task()) |
on_failure |
Callable \| None |
None |
Called on permanent failure (sync or async) |
**task_kwargs |
Extra keyword arguments forwarded to worker.task() |
Retry Behavior¶
When a task raises an exception:
- If
tries < max_retries, the task is retried after an exponential backoff delay:min(backoff_base ** tries, backoff_max)seconds. - If all retries are exhausted, the task is recorded in the
dead_lettersMongoDB collection and the optionalon_failurecallback is invoked.
Dead Letter Collection¶
Permanently failed tasks are stored in the dead_letters MongoDB collection
with the following fields:
| Field | Type | Description |
|---|---|---|
task_name |
str |
Function name of the failed task |
task_id |
str |
Unique task identifier |
error |
str |
Error message |
error_type |
str |
Exception class name |
tries |
int |
Number of attempts made |
failed_at |
datetime |
Timestamp of permanent failure |
Failure Callback¶
The on_failure callback receives three arguments and can be sync or async:
async def alert_on_failure(task_name: str, task_id: str, exc: Exception):
"""Send an alert when a task permanently fails."""
await notify_ops_team(f"Task {task_name} ({task_id}) failed: {exc}")
@robust_task(max_retries=3, on_failure=alert_on_failure)
async def critical_sync(data: dict):
...
Scheduled Tasks (Cron)¶
Use @worker.cron() to run tasks on a schedule:
from vibetuner.tasks.worker import get_worker
worker = get_worker()
@worker.cron("0 9 * * *") # Every day at 9:00 AM UTC
async def daily_digest():
"""Send daily digest emails to all subscribed users."""
...
@worker.cron("*/15 * * * *") # Every 15 minutes
async def check_expired_sessions():
"""Clean up expired sessions."""
...
The cron expression follows standard crontab syntax:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sun=0)
│ │ │ │ │
* * * * *
Register cron tasks in tune.py the same way as regular tasks:
# src/app/tune.py
from vibetuner import VibetunerApp
from app.tasks.maintenance import daily_digest, check_expired_sessions
app = VibetunerApp(
tasks=[daily_digest, check_expired_sessions],
)
Note
Cron tasks are only executed by the worker process. They will not run if the worker is not started.
Enqueueing and Monitoring Tasks¶
Enqueue a Task¶
from app.tasks.emails import send_welcome_email
# Enqueue with arguments
task = await send_welcome_email.enqueue(user_id)
Check Task Status¶
# Get current status
status = await task.status()
# Wait for result with timeout
task_result = await task.result(timeout=30)
if task_result.success:
value = task_result.result
else:
error = task_result.exception
Cancel a Task¶
SSE Integration¶
Broadcast real-time updates from background tasks to connected clients using Server-Sent Events. This is useful for progress indicators, live feeds, and notifications.
from vibetuner.sse import broadcast
from vibetuner.tasks.worker import get_worker
worker = get_worker()
@worker.task()
async def process_upload(file_id: str, user_id: str) -> dict:
"""Process an uploaded file and broadcast progress."""
channel = f"upload:{user_id}"
await broadcast(channel, "progress", data="Processing started...")
# ... do work ...
await broadcast(channel, "progress", data="50% complete...")
# ... more work ...
await broadcast(
channel,
"complete",
data="<div class='alert alert-success'>Upload complete!</div>",
)
return {"status": "complete"}
Warning
Broadcasting from a task requires Redis. The broadcast() function
uses Redis pub/sub to relay events across processes (the worker
process to the frontend process). Without Redis, broadcasts from
tasks will not reach connected clients.
On the frontend, subscribe with HTMX:
With a corresponding SSE endpoint:
from vibetuner.sse import sse_endpoint
@sse_endpoint("/events/upload/{user_id}", router=router)
async def upload_stream(request: Request, user_id: str):
return f"upload:{user_id}"
For more details on SSE, see the SSE / Real-Time Streaming section in the development guide.
Worker Lifecycle¶
Default Behavior¶
The worker automatically initializes MongoDB and SQLModel connections on startup
and tears them down on shutdown. The default lifespan yields a Context object
with project metadata.
Custom Worker Lifespan¶
To add custom startup/shutdown logic or extend the worker context, create a
custom lifespan and pass it to tune.py:
# src/app/tasks/lifespan.py
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from vibetuner.context import Context, ctx
from vibetuner.tasks.lifespan import base_lifespan
@asynccontextmanager
async def worker_lifespan() -> AsyncGenerator[Context, None]:
"""Custom worker lifespan with additional setup."""
async with base_lifespan() as context:
# Custom startup logic
print("Worker starting with custom setup")
yield context
# Custom shutdown logic
print("Worker shutting down")
# src/app/tune.py
from vibetuner import VibetunerApp
from app.tasks.lifespan import worker_lifespan
app = VibetunerApp(
worker_lifespan=worker_lifespan,
)
Note
The base lifespan handles MongoDB and SQLModel initialization. Always
wrap it with async with base_lifespan() as context: when providing
a custom lifespan to retain database connectivity.
Streaq Task Dashboard¶
When workers are configured, a built-in monitoring UI is available at
/debug/tasks. This dashboard shows:
- Active, queued, and completed tasks
- Task execution times and results
- Failed tasks and error details
- Worker health and concurrency
The dashboard is protected by the same debug access controls as other
/debug/* endpoints. In development mode (DEBUG=true), it is accessible
without authentication. In production, use the
/_unlock-debug?token=<DEBUG_ACCESS_TOKEN> endpoint to gain access.
A standalone worker monitoring web UI also starts automatically on
port 11111 when you run the worker. Access it at
http://localhost:11111.
Running Workers¶
Development¶
# Worker only
just worker-dev
# Everything: server + assets + worker
just local-all-with-worker
# Worker only (with hot reload, manual)
uv run vibetuner run dev worker
# Custom port for the monitoring UI
uv run vibetuner run dev worker --port 12000
In development mode, the worker runs with hot reload enabled. Code changes automatically restart the worker process.
Production¶
The production compose file (compose.prod.yml) runs the worker as a
separate service:
Or run manually without Docker:
Configuration¶
| Setting | Environment Variable | Default | Description |
|---|---|---|---|
| Redis URL | REDIS_URL |
None | Redis connection string (required) |
| Concurrency | WORKER_CONCURRENCY |
16 |
Max concurrent tasks per worker |
| Queue name | — | Project slug | Derived from REDIS_KEY_PREFIX |
| Monitoring port | --port flag |
11111 |
Port for the worker web UI |
Testing Tasks¶
Use the mock_tasks fixture to test task enqueuing without Redis:
from unittest.mock import patch
async def test_signup_queues_email(vibetuner_client, mock_tasks):
with patch(
"app.tasks.emails.send_welcome_email",
mock_tasks.send_welcome_email,
):
resp = await vibetuner_client.post(
"/signup", data={"email": "[email protected]"}
)
assert mock_tasks.send_welcome_email.enqueue.called
For testing task logic directly (without enqueuing), call the function:
async def test_send_welcome_email(vibetuner_db):
from app.tasks.emails import send_welcome_email
# Create a test user first
user = await create_test_user()
result = await send_welcome_email(str(user.id))
assert result["status"] == "sent"
Troubleshooting¶
"Redis not configured" Error¶
The worker requires REDIS_URL to be set. Add it to your .env:
If using Docker, Redis starts automatically with just dev.
Tasks Not Executing¶
- Is the worker running? Check with
just worker-devor look for the worker process in Docker logs. - Is the task registered? Make sure the task function is listed in
VibetunerApp.tasksintune.py. - Is Redis reachable? Run
vibetuner doctorto check service connectivity.
Tasks Failing Silently¶
Check the worker logs. In development mode, set DEBUG=true for verbose
output. For tasks using @robust_task(), check the dead_letters MongoDB
collection for permanently failed tasks.
Cron Tasks Not Running¶
Cron tasks only execute inside the worker process. If you only have
just local-all running (without the worker), cron tasks will not fire.
Use just local-all-with-worker or start the worker separately with
just worker-dev.
Next Steps¶
- SSE / Real-Time Streaming — Push live updates to the browser
- Deployment — Run workers in production
- CLI Reference — Full
vibetuner runcommand details - Architecture — System design overview