A Python SDK for building serverless functions for AI inference. Write your function, declare required model refs, publish an endpoint release, and invoke it via Cozy's control plane.
When publishing a tenant worker, Cozy expects a Dockerfile-first project layout.
Build inputs MUST include:
cozy.toml(Cozy manifest; used at build/publish time)Dockerfile(builds the worker image)- tenant code (
pyproject.toml,uv.lock,src/, etc.)
The built image MUST:
- Install
gen-worker(so discovery + runtime can run). - Bake function discovery output (manifest) at build time:
RUN mkdir -p /app/.cozy && python -m gen_worker.discover > /app/.cozy/manifest.json- Use the Cozy worker runtime as the ENTRYPOINT:
ENTRYPOINT ["python", "-m", "gen_worker.entrypoint"]Notes:
cozy.tomlis not required to be present in the final image; it is a build-time input.- The platform reads
/app/.cozy/manifest.jsonfrom the built image and stores it in Cozy Hub DB for routing/invocation.
Start a python project, and then run:
uv add gen-workerWith PyTorch support:
uv add gen-worker[torch]import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
text: str
@worker_function()
def generate(ctx: ActionContext, payload: Input) -> Output:
return Output(text=f"Hello, {payload.prompt}!")- Function discovery - Automatic detection of
@worker_functiondecorated functions - Schema generation - Input/output schemas extracted from msgspec types
- Model injection - Dependency injection for ML models with caching
- Streaming output - Support for incremental/streaming responses
- Progress reporting - Built-in progress events via
ActionContext - Perf metrics - Best-effort per-run metrics emitted to gen-orchestrator (
metrics.*worker events) - File handling - Upload/download assets via Cozy hub file API
- Model caching - LRU cache with VRAM/disk management and cache-aware routing
import msgspec
from gen_worker import ActionContext, worker_function
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
result: str
@worker_function()
def my_function(ctx: ActionContext, payload: Input) -> Output:
return Output(result=f"Processed: {payload.prompt}")from typing import Iterator
class Delta(msgspec.Struct):
chunk: str
@worker_function()
def stream(ctx: ActionContext, payload: Input) -> Iterator[Delta]:
for word in payload.prompt.split():
if ctx.is_canceled():
raise InterruptedError("canceled")
yield Delta(chunk=word)Declare your model keyspace in cozy.toml:
[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"from typing import Annotated
from diffusers import DiffusionPipeline
from gen_worker.injection import ModelRef, ModelRefSource as Src
@worker_function()
def generate(
ctx: ActionContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.FIXED, "sd15")], # key from cozy.toml [models]
payload: Input,
) -> Output:
# Use the injected pipeline (loaded/cached by the worker's model manager).
return Output(result="done")If you want the client payload to choose which repo to run, declare a short-key
mapping in cozy.toml and use ModelRef(PAYLOAD, ...):
[models]
sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
flux = "hf:black-forest-labs/FLUX.2-klein-4B"from typing import Annotated
import msgspec
from diffusers import DiffusionPipeline
from gen_worker import ActionContext, worker_function
from gen_worker.injection import ModelRef, ModelRefSource as Src
class Input(msgspec.Struct):
prompt: str
model: str # must be one of: "sd15" | "flux"
@worker_function()
def generate(
ctx: ActionContext,
pipe: Annotated[DiffusionPipeline, ModelRef(Src.PAYLOAD, "model")],
payload: Input,
):
...Note: by default the worker requires payload model selection to use a known
short-key from [models] (cozy.toml). It will not accept arbitrary repo refs in
the payload. ModelRef(FIXED, ...) is also restricted to keys declared in the
manifest mapping (no inline hf:/cozy: refs).
@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
# Save bytes and get asset reference
asset = ctx.save_bytes(f"runs/{ctx.run_id}/outputs/output.png", image_bytes)
return Output(result=asset.ref)For local testing of a built worker image (without standing up gen-orchestrator), run the dev HTTP runner and write outputs to a mounted local directory.
Container example:
docker run --rm --gpus all -p 8081:8081 \
-v "$(pwd)/out:/outputs" \
-e COZY_HUB_URL='http://host.docker.internal:7777' \
<your-worker-image> \
python -m gen_worker.testing.http_runner --listen 0.0.0.0:8081 --outputs /outputsPrefetch a public model (example: SD1.5 on Hugging Face, via Cozy Hub mirror):
curl -sS -X POST 'http://localhost:8081/v1/models/prefetch' \
-H 'content-type: application/json' \
-d '{"models":[{"ref":"hf:runwayml/stable-diffusion-v1-5@main","dtypes":["bf16","fp16"]}]}'Invoke a function:
curl -sS -X POST 'http://localhost:8081/v1/run/generate' \
-H 'content-type: application/json' \
-d '{"payload": {"prompt": "a tiny robot watering a bonsai, macro photo"}}'Outputs are written under /outputs/runs/<run_id>/outputs/... (matching Cozy ref semantics).
schema_version = 1
name = "my-worker"
main = "my_pkg.main"
gen_worker = ">=0.2.0,<0.3.0"
[models]
sdxl = { ref = "hf:stabilityai/stable-diffusion-xl-base-1.0", dtypes = ["fp16","bf16"] }[models] entries support two forms:
- String form (defaults to
dtypes=["fp16","bf16"]):sd15 = "hf:stable-diffusion-v1-5/stable-diffusion-v1-5"
- Table form:
flux_fp8 = { ref = "hf:black-forest-labs/FLUX.2-klein-4B", dtypes = ["fp8"] }
Orchestrator-injected (production contract):
| Variable | Default | Description |
|---|---|---|
SCHEDULER_PUBLIC_ADDR |
- | Scheduler address workers should dial |
SCHEDULER_ADDRS |
- | Optional comma-separated seed addresses for leader discovery |
WORKER_JWT |
- | Worker-connect JWT (required; claims are authoritative) |
Local dev / advanced (not injected by orchestrator):
| Variable | Default | Description |
|---|---|---|
SCHEDULER_JWKS_URL |
- | Optional: verify WORKER_JWT locally against scheduler JWKS |
SCHEDULER_JWT_ISSUER |
- | Optional: expected iss when verifying WORKER_JWT locally |
SCHEDULER_JWT_AUDIENCE |
- | Optional: expected aud when verifying WORKER_JWT locally |
USE_TLS |
false |
Local-dev knob for plaintext vs TLS gRPC; production typically terminates TLS upstream |
WORKER_MAX_CONCURRENCY |
- | Max concurrent task executions |
WORKER_MAX_INPUT_BYTES |
- | Max input payload size |
WORKER_MAX_OUTPUT_BYTES |
- | Max output payload size |
WORKER_MAX_UPLOAD_BYTES |
- | Max file upload size |
WORKER_MAX_VRAM_GB |
Auto | Maximum VRAM for models |
WORKER_VRAM_SAFETY_MARGIN_GB |
3.5 | Reserved VRAM for working memory |
TENSORHUB_CACHE_DIR |
~/.cache/tensorhub |
TensorHub cache root; worker CAS defaults derive from this (${TENSORHUB_CACHE_DIR}/cas/...) |
WORKER_LOCAL_MODEL_CACHE_DIR |
/tmp/tensorhub/local-model-cache |
Optional local (non-NFS) cache for snapshot localization |
WORKER_REGISTER_TIMEOUT_S |
90 |
Startup watchdog: fail fast if worker never registers with scheduler |
WORKER_WARN_MODEL_RESOLVE_S |
30 |
Emit task.model_resolve.stuck warning after this duration |
WORKER_WARN_MODEL_LOAD_S |
60 |
Emit task.model_load.stuck warning after this duration |
WORKER_WARN_INFERENCE_S |
60 |
Emit task.inference.stuck warning after this duration |
WORKER_MAX_CONCURRENT_DOWNLOADS |
2 | Max parallel model downloads |
COZY_HUB_URL |
- | Cozy Hub base URL (used for public model requests and, if enabled, Cozy Hub API resolve) |
WORKER_ALLOW_COZY_HUB_API_RESOLVE |
false |
Local dev only: allow the worker to call Cozy Hub resolve APIs |
COZY_HUB_TOKEN |
- | Cozy Hub bearer token (optional; enables ingest-if-missing for public HF models, if Cozy Hub requires auth) |
HF_TOKEN |
- | Hugging Face token (for private hf: refs) |
The worker can emit best-effort performance/debug metrics to gen-orchestrator via WorkerEvent messages.
See docs/metrics.md.
See docs/worker-stuck-visibility.md for startup/task watchdog events used to diagnose stuck workers.
By default, hf: model refs do not download the full repo. The worker uses huggingface_hub.snapshot_download(allow_patterns=...) to avoid pulling huge legacy weights.
Defaults:
- Download only what a diffusers pipeline needs (derived from
model_index.json). - Skip
safety_checkerandfeature_extractorby default. - Download only reduced-precision safetensors weights (
fp16/bf16); never download.ckptor.binby default. - For sharded safetensors, also download the
*.safetensors.index.jsonand the referenced shard files.
Overrides:
COZY_HF_COMPONENTS="unet,vae,text_encoder,tokenizer,scheduler": hard override component list.COZY_HF_INCLUDE_OPTIONAL_COMPONENTS=1: include components likesafety_checker/feature_extractorif present.COZY_HF_WEIGHT_PRECISIONS="fp16,bf16": change which weight suffixes are accepted (addfp32only if you really need it).COZY_HF_ALLOW_ROOT_JSON=1: allow additional small root*.jsonfiles (some repos need extra root config).COZY_HF_FULL_REPO_DOWNLOAD=1: disable filtering and download the entire repo (not recommended; can be 10s of GB).
Cozy snapshot/object file downloads are written to *.part and then atomically renamed on success. If a *.part file exists from a previous interrupted download, the worker attempts to resume it using HTTP Range requests (if supported by the presigned object-store URL), and falls back to a full re-download if Range is not supported.
my-worker/
├── pyproject.toml
├── uv.lock
└── src/
└── my_module/
└── main.py
For production, use the cozyctl CLI to build and deploy worker-images to our network. But for local testing, you can build images using our provided Dockerfile:
# Build an example using the same root Dockerfile
docker build -t sd15-worker -f Dockerfile examples/sd15
# Run
docker run \
-e SCHEDULER_PUBLIC_ADDR=orchestrator:8080 \
-e WORKER_JWT='<worker-connect-jwt>' \
sd15-workerCanonical local dev build args (GPU, CUDA 12.6, torch 2.10.x, Python 3.12):
cd ~/cozy/python-gen-worker
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu126 \
--build-arg TORCH_SPEC='~=2.10.0' \
-f Dockerfile \
-t my-worker:dev \
examples/sd15Optional build args:
docker build \
--build-arg PYTHON_VERSION=3.12 \
--build-arg UV_TORCH_BACKEND=cu128 \
--build-arg TORCH_SPEC=">=2.9,<3" \
-t my-worker -f Dockerfile examples/sd15Worker images build directly from a Python+uv base image:
ghcr.io/astral-sh/uv:python3.12-bookworm-slim
PyTorch/CUDA dependencies are installed as part of your worker's dependency set during image build.
Control-plane behavior (cozy-hub + orchestrator):
- Every publish creates a new immutable internal
release_id. - End users invoke functions by
owner/endpoint/function(defaultprod) orowner/endpoint/function:tag. endpointis derived fromcozy.tomlnameand normalized to a URL-safe slug.functionnames are derived from Python@worker_functionnames and normalized to URL-safe slugs (for example,medasr_transcribe->medasr-transcribe).- Publishing does not move traffic by default.
- Promoting a function tag moves traffic to that release.
- Rollback is just retargeting the tag to an older release.
Workers report model availability for intelligent job routing:
| State | Location | Latency |
|---|---|---|
| Hot | VRAM | Instant |
| Warm | Disk | Seconds |
| Cold | None | Minutes (download required) |
For local end-to-end tests without standing up gen-orchestrator, use the one-off mock orchestrator invoke command (curl-like workflow). It starts a temporary scheduler, waits for a worker to connect, sends one TaskExecutionRequest, prints the result, and exits.
Start your worker container first:
docker run --rm \
--add-host=host.docker.internal:host-gateway \
-e SCHEDULER_PUBLIC_ADDR=host.docker.internal:8080 \
-e WORKER_JWT='dev-worker-jwt' \
<your-worker-image>In another terminal, send one request:
python -m gen_worker.testing.mock_orchestrator \
--listen 0.0.0.0:8080 \
--run hello \
--payload-json '{"name":"world"}'Run the command again with a different payload whenever you want to send another request.
from gen_worker.model_cache import ModelCache
cache = ModelCache(max_vram_gb=20.0)
cache.mark_loaded_to_vram("model-a", pipeline, size_gb=8.0)
cache.is_in_vram("model-a") # True
cache.get_vram_models() # ["model-a"]from gen_worker.errors import RetryableError, ValidationError, FatalError
@worker_function()
def process(ctx: ActionContext, payload: Input) -> Output:
if not payload.prompt:
raise ValidationError("prompt is required") # 400, no retry
try:
result = call_external_api()
except TimeoutError:
raise RetryableError("API timeout") # Will be retried
return Output(result=result)# Install dev dependencies
uv sync --extra dev
# Run tests
uv run pytest
# Type checking
uv run mypy src/gen_worker
# Build
uv buildRequires gen-orchestrator as a sibling repo:
uv sync --extra dev
python -m grpc_tools.protoc -I../gen-orchestrator/proto --python_out=src/gen_worker/pb --grpc_python_out=src/gen_worker/pb ../gen-orchestrator/proto/*.protoThe worker advertises a protocol MAJOR.MINOR in WorkerRegistration (protocol_major, protocol_minor).
- Current runtime constants live in
src/gen_worker/wire_protocol.py. - Orchestrator compatibility policy/ranges are documented in
../gen-orchestrator/docs/worker_wire_protocol.md.
MIT