start_episode — explicit lifecycle
log_episode(...) is the right tool for the 95% case: files on disk,
upload them all, finalize. When that doesn't fit — you want to stream
uploads as your robot finishes them, react to per-artifact failures,
or use a context manager to guarantee the run closes cleanly even on
exceptions — drop down to start_episode(...).
When to use which
| You want to… | Reach for |
|---|---|
| Log a run after it's done with all files on disk | log_episode |
| Upload artifacts as soon as they're produced | start_episode + manual upload_* calls |
| Auto-mark the run failed if your code crashes mid-flow | start_episode as a context manager |
| Skip artifacts entirely (metadata-only run) | start_episode(..., artifacts=[]) |
| Catch and recover from a failed upload of one specific kind | start_episode + try/except per upload |
The two-step flow
start_episode opens a run on the server (status recording) and
returns an Episode handle with the signed PUT URLs (when R2 is
wired). You upload artifacts at your leisure and call finalize when
done.
import robotrace as rt
ep = rt.start_episode(
name="streaming demo",
source="real",
policy_version="v1.2.3",
env_version="cell-A",
git_sha="abc1234",
seed=42,
fps=30,
# Only request signed URLs for the slots you'll actually fill.
# Defaults to all three; pass [] for a metadata-only run.
artifacts=["video", "sensors"],
)
ep.upload_video("/tmp/run.mp4")
ep.upload_sensors("/tmp/sensors.bin")
ep.finalize(
status="ready",
duration_s=47.2,
fps=30,
metadata={"task": "pick"},
)finalize rolls up bytes_total automatically from the upload
helpers' running total, so you don't have to call os.path.getsize
yourself. Pass bytes_total= explicitly if you want to override it.
Context manager — recommended default
Wrap the whole flow in with and the SDK guarantees the episode is
finalized exactly once:
- Clean exit →
status="ready" - Exception →
status="failed", with the exception type recorded inmetadata.failure_reasonso the admin can triage from the episode detail page
with rt.start_episode(
name="ctx demo",
policy_version="v1.2.3",
artifacts=["video"],
) as ep:
record_for(seconds=60, output="/tmp/run.mp4")
ep.upload_video("/tmp/run.mp4")
# No explicit finalize — the context manager handles it.The with block re-raises any user exception after marking the run
failed, so your existing error handling keeps working:
try:
with rt.start_episode(name="risky", policy_version="v1") as ep:
ep.upload_video(record_unstable_run())
except RuntimeError as exc:
# Episode is already marked failed on the server.
alerts.notify("run failed", error=str(exc))Streaming uploads
Each upload_* call streams the file from disk through httpx to
the signed R2 URL. Memory stays flat regardless of file size, so a
12 GB video uses the same RAM as a 12 KB one. There's no progress
callback in 0.1.x — coming in 0.2.
You can interleave uploads with your own work:
with rt.start_episode(name="interleave", artifacts=["video", "sensors"]) as ep:
ep.upload_video("/tmp/run.mp4") # ~30 s for 4K @ 60 s
post_process_sensors() # do CPU-bound work here
ep.upload_sensors("/tmp/sensors.bin") # ~5 sPer-artifact error handling
Sometimes you want to recover from a single upload failure without killing the whole episode — e.g. the sensor blob is the important one and the side-camera video is nice-to-have:
with rt.start_episode(
name="best effort video",
policy_version="v1",
artifacts=["video", "sensors"],
) as ep:
try:
ep.upload_video("/tmp/run.mp4")
except rt.RobotraceError as exc:
# Video upload failed (network, disk full, R2 5xx, etc.).
# Record the issue but keep the run going.
ep.finalize.metadata = {"video_upload_skipped": str(exc)}
# Sensors are required — let any failure here propagate, the
# context manager will mark the run failed.
ep.upload_sensors("/tmp/sensors.bin")Explicit Client
When you need multiple deployments at once (logging the same run to
staging + production, or a dependency-injected client for tests),
use rt.Client directly:
with rt.Client(api_key="rt_…", base_url="https://app.robotrace.dev") as client:
with client.start_episode(name="…", policy_version="…") as ep:
ep.upload_video("/tmp/run.mp4")Client holds an HTTP connection pool — construct it once at
process startup, reuse across many episodes, and close() (or use
as a context manager) on shutdown.
Metadata-only runs
When R2 isn't wired on your deployment yet (the response has
storage="unconfigured") or you just want to record reproducibility
metadata without artifacts, pass artifacts=[]:
ep = rt.start_episode(
name="metadata only",
policy_version="v1.2.3",
seed=42,
artifacts=[],
)
ep.finalize(status="ready", duration_s=12.6)The episode appears in the portal with the reproducibility fields
filled in but no artifact URLs. Calling ep.upload_video(...) on a
metadata-only episode raises ConfigurationError.
What's returned
start_episode returns an Episode dataclass:
@dataclass
class Episode:
id: str # uuid, as str
status: str # starts "recording"
storage: Literal["r2", "unconfigured"] # whether uploads will work
upload_urls: dict[ArtifactKind, UploadUrl]UploadUrl carries the signed PUT URL, its expiry, and the canonical
public URL the server persisted (when R2_PUBLIC_URL is configured):
@dataclass
class UploadUrl:
kind: ArtifactKind # "video" | "sensors" | "actions"
url: str
expires_at: str # ISO 8601, server clock
public_url: str | NoneYou generally don't need to touch these — ep.upload_video(...)
finds the right URL by kind.