Skip to main content

Minimal run

import zeroeval as ze

ze.init()
dataset = ze.Dataset.pull("capital-cities")

@ze.task(outputs=["prediction"])
def predict(row):
    return {"prediction": row.answer}

run = dataset.eval(predict, workers=8)
print(run.id)
print(run.health)
The object returned by dataset.eval(...) is a run (Eval):
print(type(run).__name__)  # Eval
print(run.id)              # backend eval ID
print(run.rows[0])         # task outputs for the first row
print(run.metrics)         # aggregate metrics after scoring

Task requirements

@ze.task functions must:
  • Return a dict
  • Include all declared outputs
@ze.task(outputs=["prediction"])
def predict(row):
    value = call_model(row.question)
    return {"prediction": value}
If required outputs are missing, the SDK raises a validation error.

Task outputs vs signals

  • Return task outputs in the dictionary from your @ze.task(...).
  • Emit signals during execution for runtime facts you may want to inspect or score later.
@ze.task(outputs=["prediction"])
def predict(row):
    ze.emit_signal("phase", "retrieve")
    docs = retrieve(row.question)
    ze.emit_signal("retrieved_doc_count", len(docs))
    return {"prediction": call_model(row.question, docs)}

Execution controls

Use ExecutionConfig for runtime behavior:
run = dataset.eval(
    predict,
    execution=ze.ExecutionConfig(
        workers=12,
        timeout_s=30,
        retry=ze.RetryPolicy(max_attempts=3),
    ),
)

Key knobs

  • workers: thread pool size
  • max_in_flight: max queued concurrent futures
  • timeout_s: per-row future timeout
  • retry: transient retry policy
  • failure.on_row_error: "continue" or "stop"

Checkpointing

Enable incremental persistence for long runs:
run = dataset.eval(
    predict,
    checkpoint=ze.CheckpointConfig(
        enabled=True,
        flush_every_rows=50,
        flush_every_seconds=10.0,
    ),
)
Checkpointing reduces work lost on interruption and supports strong resume behavior.

Emit runtime signals

Tasks can emit runtime observations while they execute:
@ze.task(outputs=["prediction"])
def predict(row):
    ze.emit_signal("phase", "retrieve")
    docs = retrieve(row.question)
    ze.emit_signal("retrieved_doc_count", len(docs))
    ze.emit_signal("retrieval_hit", bool(docs))
    return {"prediction": call_model(row.question, docs)}
Signals are attached to the current task span by default and can later be inspected in trace-aware eval views. Use signals for execution facts, then use evaluations to convert those facts into scores.

Attach run metadata

Use parameters to persist contextual metadata with the run:
run = dataset.eval(
    predict,
    parameters={
        "model": "gpt-4o-mini",
        "experiment": "capital-baseline",
        "dataset_version": dataset.version_number,
    },
)
Keep runs self-describing by always storing model name, dataset version, and subset in run parameters.

Health and execution summary

After execution, inspect run.health to understand whether the run completed cleanly:
print(run.health["status"])
print(run.health["success_count"])
print(run.health["error_count"])
print(run.health["retry_success_rate"])