start_episode — explicit lifecycle

log_episode(...) is the right tool for the 95% case: files on disk, upload them all, finalize. When that doesn't fit — you want to stream uploads as your robot finishes them, react to per-artifact failures, or use a context manager to guarantee the run closes cleanly even on exceptions — drop down to start_episode(...).

When to use which

You want to…Reach for
Log a run after it's done with all files on disklog_episode
Upload artifacts as soon as they're producedstart_episode + manual upload_* calls
Auto-mark the run failed if your code crashes mid-flowstart_episode as a context manager
Skip artifacts entirely (metadata-only run)start_episode(..., artifacts=[])
Catch and recover from a failed upload of one specific kindstart_episode + try/except per upload

The two-step flow

start_episode opens a run on the server (status recording) and returns an Episode handle with the signed PUT URLs (when R2 is wired). You upload artifacts at your leisure and call finalize when done.

import robotrace as rt
 
ep = rt.start_episode(
    name="streaming demo",
    source="real",
    policy_version="v1.2.3",
    env_version="cell-A",
    git_sha="abc1234",
    seed=42,
    fps=30,
    # Only request signed URLs for the slots you'll actually fill.
    # Defaults to all three; pass [] for a metadata-only run.
    artifacts=["video", "sensors"],
)
 
ep.upload_video("/tmp/run.mp4")
ep.upload_sensors("/tmp/sensors.bin")
 
ep.finalize(
    status="ready",
    duration_s=47.2,
    fps=30,
    metadata={"task": "pick"},
)

finalize rolls up bytes_total automatically from the upload helpers' running total, so you don't have to call os.path.getsize yourself. Pass bytes_total= explicitly if you want to override it.

Context manager — recommended default

Wrap the whole flow in with and the SDK guarantees the episode is finalized exactly once:

  • Clean exitstatus="ready"
  • Exceptionstatus="failed", with the exception type recorded in metadata.failure_reason so the admin can triage from the episode detail page
with rt.start_episode(
    name="ctx demo",
    policy_version="v1.2.3",
    artifacts=["video"],
) as ep:
    record_for(seconds=60, output="/tmp/run.mp4")
    ep.upload_video("/tmp/run.mp4")
    # No explicit finalize — the context manager handles it.

The with block re-raises any user exception after marking the run failed, so your existing error handling keeps working:

try:
    with rt.start_episode(name="risky", policy_version="v1") as ep:
        ep.upload_video(record_unstable_run())
except RuntimeError as exc:
    # Episode is already marked failed on the server.
    alerts.notify("run failed", error=str(exc))

Streaming uploads

Each upload_* call streams the file from disk through httpx to the signed R2 URL. Memory stays flat regardless of file size, so a 12 GB video uses the same RAM as a 12 KB one. There's no progress callback in 0.1.x — coming in 0.2.

You can interleave uploads with your own work:

with rt.start_episode(name="interleave", artifacts=["video", "sensors"]) as ep:
    ep.upload_video("/tmp/run.mp4")        # ~30 s for 4K @ 60 s
    post_process_sensors()                 # do CPU-bound work here
    ep.upload_sensors("/tmp/sensors.bin")  # ~5 s

Per-artifact error handling

Sometimes you want to recover from a single upload failure without killing the whole episode — e.g. the sensor blob is the important one and the side-camera video is nice-to-have:

with rt.start_episode(
    name="best effort video",
    policy_version="v1",
    artifacts=["video", "sensors"],
) as ep:
    try:
        ep.upload_video("/tmp/run.mp4")
    except rt.RobotraceError as exc:
        # Video upload failed (network, disk full, R2 5xx, etc.).
        # Record the issue but keep the run going.
        ep.finalize.metadata = {"video_upload_skipped": str(exc)}
 
    # Sensors are required — let any failure here propagate, the
    # context manager will mark the run failed.
    ep.upload_sensors("/tmp/sensors.bin")

Explicit Client

When you need multiple deployments at once (logging the same run to staging + production, or a dependency-injected client for tests), use rt.Client directly:

with rt.Client(api_key="rt_…", base_url="https://app.robotrace.dev") as client:
    with client.start_episode(name="…", policy_version="…") as ep:
        ep.upload_video("/tmp/run.mp4")

Client holds an HTTP connection pool — construct it once at process startup, reuse across many episodes, and close() (or use as a context manager) on shutdown.

Metadata-only runs

When R2 isn't wired on your deployment yet (the response has storage="unconfigured") or you just want to record reproducibility metadata without artifacts, pass artifacts=[]:

ep = rt.start_episode(
    name="metadata only",
    policy_version="v1.2.3",
    seed=42,
    artifacts=[],
)
ep.finalize(status="ready", duration_s=12.6)

The episode appears in the portal with the reproducibility fields filled in but no artifact URLs. Calling ep.upload_video(...) on a metadata-only episode raises ConfigurationError.

What's returned

start_episode returns an Episode dataclass:

@dataclass
class Episode:
    id: str                                # uuid, as str
    status: str                            # starts "recording"
    storage: Literal["r2", "unconfigured"] # whether uploads will work
    upload_urls: dict[ArtifactKind, UploadUrl]

UploadUrl carries the signed PUT URL, its expiry, and the canonical public URL the server persisted (when R2_PUBLIC_URL is configured):

@dataclass
class UploadUrl:
    kind: ArtifactKind  # "video" | "sensors" | "actions"
    url: str
    expires_at: str     # ISO 8601, server clock
    public_url: str | None

You generally don't need to touch these — ep.upload_video(...) finds the right URL by kind.