Skip to content

Query Live Data

Read live data from a running agent: full samples, latest values, change snapshots, and live ticks. Saved .trz files use the same tools through Trace.query() — see Open Trace Files.

Snippets assume an agent connected via connect(), with paths from zelos live demo:

from zelos_sdk.agent import connect

with connect() as agent:
    ...

Browse the catalog

agent.signals() returns a SignalCatalog of every signal that produced data in the lookback window. It's iterable, sliceable (catalog[:50]), and supports in.

catalog = agent.signals(lookback=60.0)
print(len(catalog), "signal(s)")
for sig in catalog[:5]:
    print(sig.path, sig.unit or "—", sig.data_type)

Filter and look up

search() is case-insensitive substring across path, source, message, name, and unit. For glob patterns, use fnmatch. For exact lookup, use by_path():

from fnmatch import fnmatch

voltage = catalog.search("voltage")
bms = [s for s in catalog if fnmatch(s.path, "bus0/*BMS*.*")]

sig = catalog.by_path("bus0/BMS_message/status.cell_voltage")
print(sig.unit, sig.data_type, sig.producer)

by_path() raises SignalNotFound or AmbiguousSignal if the path collides across producers/segments. Pass the resolved Signal to disambiguate downstream calls.

Span all producers

producers=("localhost",) is the default. Pass producers=() to include every connected producer, or a specific connection by name (e.g., producers=("rover-1",)):

all_signals = agent.signals(producers=())
print(sorted({s.producer for s in all_signals if s.producer}))

Segments

for seg in agent.segments():
    print(seg.id, seg.producer, seg.start, "→", seg.end, seg.duration)

Each Segment carries id, producer, start, end, duration, sources, connection, and trace_path. Per-producer issues surface as RuntimeWarning.

Run a query

agent.query() returns a SignalFrame — one time column plus one column per matched signal.

frame = agent.query(
    [
        "bus0/BMS_message/status.cell_voltage",
        "bus0/BMS_message/status.pack_current",
    ],
    start="-30s",
    end="now",
)
print(frame)                  # SignalFrame(rows=N, columns=2, time_unit='ns')
print(frame.head(3).to_pandas())

start and end accept a datetime (UTC-coerced if naive), "now", ISO 8601, or a relative offset ("-30s", "-2m", "-1.5h", "-1d"). end defaults to "now".

Downsample for charts

By default, query() returns every sample. For charts, set downsample=N for about N points per signal. max_rows and sort are raw-only — mutually exclusive with downsample.

plot_frame = agent.query("bus0/*", start="-5m", downsample=400)
print(plot_frame.downsampled)  # True

Wildcards

Any of the three positions in source/message.name can be *. Mix patterns and exact paths in one call. An empty expansion raises SignalNotFound with up to three "did you mean" suggestions.

frame = agent.query(
    [
        "bus0/BMS_message/status.*",
        "eth0/sensor_data.sensor_value",
    ],
    start="-30s",
)

Frame metadata

frame.time_range        # TimeRange (or None if empty)
frame.query_duration_s  # seconds
frame.downsampled       # True if downsampled
frame.truncated         # True if max_rows or an agent cap kicked in
frame.warnings          # list[str] — per-producer warnings

A truncated frame still contains rows; check frame.truncated before assuming the window is complete.

Get one column

voltage = frame["bus0/BMS_message/status.cell_voltage"]
print(voltage.name, voltage.unit, len(voltage))

If the same path appears under multiple producers or segments, indexing by string raises AmbiguousSignal — look up the Signal and pass it: frame[catalog.by_path(...)].

Hand off to pandas / Arrow

df = frame.to_pandas()        # DataFrame, UTC DatetimeIndex
table = frame.to_arrow()      # pyarrow.Table
arr = voltage.to_arrow()      # pa.Array of values

pandas + numpy live behind the [analysis] extra

pip install zelos-sdk[analysis] adds pandas and numpy. Without them, only to_arrow() and basic aggregations work — the SDK never silently downloads the scientific stack at runtime.

Bound the result size

frame = agent.query(
    "bus0/BMS_message/status.*",
    start="-1h",
    max_rows=100_000,   # 0 = no limit (default)
    sort="desc",        # newest first; "asc" is the default
)

Iterate columns and metadata

for sig in frame:                   # Signal objects in column order
    print(sig.path, sig.unit)

for series in frame.values():       # SignalSeries
    print(series.legend, len(series))

for path, series in frame.items():  # (path, SignalSeries)
    print(path, series.min(), series.max())

for col in frame.columns[1:]:       # ColumnMetadata (column 0 is time)
    print(col.path, col.producer, col.data_segment_id)

Two columns can share a path when multiple producers emit the same signal; items() yields each column once.

Read latest values

agent.latest() returns the most recent value within [now − lookback, now]. Single path returns a LatestValue; a list returns dict[str, LatestValue]:

v = agent.latest("bus0/BMS_message/status.cell_voltage")
print(v.value, v.signal.unit, v.time)
print(v.formatted())     # "12.83 V"

latest = agent.latest([
    "bus0/BMS_message/status.cell_voltage",
    "bus0/BMS_message/status.pack_current",
    "bus0/BMS_message/status.battery_level",
])
for path, lv in latest.items():
    print(path, lv.formatted(), "@", lv.time)

LatestValue carries:

field meaning
value typed scalar (int, float, bool, bytes, or str)
raw display string from the agent
time datetime (UTC), or None if unreported
time_ns matching epoch nanoseconds
signal full Signal metadata
label signal.value_table[int(value)] if applicable, else None

formatted() renders for display: enum label > number with unit > boolean > hex (binary) > raw string. digits=2 controls float precision.

event = agent.latest("eth0/sensor_events.event_type")
print(event.value, "→", event.formatted())   # "INFO" → "INFO"

The single-path arm raises SignalNotFound, NoData, or AmbiguousSignal. The list arm silently omits missing keys; only per-key AmbiguousSignal raises. Pass a resolved Signal to disambiguate.

Read at a cursor time

agent.at() returns the value at-or-before a specific instant. Always returns a dict.

from datetime import datetime, timezone

cursor = datetime(2024, 5, 1, 12, 30, tzinfo=timezone.utc)
values = agent.at(["bus0/BMS_message/status.cell_voltage"], cursor)

time accepts the same shapes as query()datetime, "now", ISO 8601, or relative offset.

Pass min_time to stay inside the current segment

at() looks across all recorded segments by default — if a path has historical data in older segments, the call raises AmbiguousSignal. Bound the search:

values = agent.at(
    "bus0/BMS_message/status.*",
    time="-30s",
    min_time="-1m",
)

lookback (default 60.0s) controls the catalog window used to expand wildcards and attach unit metadata.

Snapshot a window of changes

agent.window() returns a ReplayWindow: an opening snapshot plus one entry per value change over [start, start + duration]. Reach for it instead of query() for transitions — state machines, mode flags, gear positions yield one entry per change instead of thousands of duplicates.

window = agent.window(
    "bus1/inverter_status.*",
    start="-5s",
    duration=5.0,
)
print(len(window.snapshot), "opening value(s)")
print(len(window.changes), "change(s)")
print(window.start, "→", window.end)

window.snapshot and window.changes are both list[LatestValue]. Use window.start_ns / window.end_ns (or .start / .end) as the actual bounds — they may differ slightly from your input if the agent aligns to sample boundaries.

Watch live values

agent.watch() polls latest() on an interval and yields a dict[str, LatestValue] per tick. until accepts a datetime / ISO string deadline, or a float seconds-from-now. until=None runs forever.

paths = [
    "bus0/BMS_message/status.cell_voltage",
    "bus0/BMS_message/status.battery_level",
]

for tick in agent.watch(paths, interval=1.0, until=30.0):
    soc = tick.get("bus0/BMS_message/status.battery_level")
    if soc and float(soc.value) < 20:
        print("low battery!", soc.formatted())

paths=None snapshots the catalog once at entry — signals that come online later are not picked up. interval is clamped to 0.05s. Lower lookback to drop stale values faster.

Surviving transient errors

on_error="skip" yields an empty dict for the failing tick and tolerates up to max_consecutive_errors consecutive failures (strictly >) before re-raising; a successful tick resets the counter. on_error="raise" (default) propagates the first AgentError.

The empty-dict yield is your only signal — there's no warning emitted today. Inspect the next tick or instrument your loop:

skipped = 0
for tick in agent.watch(
    paths,
    interval=1.0,
    on_error="skip",
    max_consecutive_errors=5,
):
    if not tick:
        skipped += 1
        continue
    skipped = 0
    ...

Compute on series with units

Every column you pull off a SignalFrame is a SignalSeries. Series carry their units and compose through arithmetic, integration, and differentiation — no manual bookkeeping.

frame = agent.query(
    [
        "bus0/BMS_message/status.cell_voltage",
        "bus0/BMS_message/status.pack_current",
    ],
    start="-1m",
)
voltage = frame["bus0/BMS_message/status.cell_voltage"]   # unit "V"
current = frame["bus0/BMS_message/status.pack_current"]   # unit "A"

power = voltage * current        # SignalSeries, unit "V·A"
energy = power.integrate()       # SignalSeries, unit "V·A·s"

print(power.legend)              # "(...cell_voltage * ...pack_current) (V·A)"
print(power.max(), power.mean()) # NamedScalar(...) — a float subclass

Aggregations

.min(), .max(), .mean(), .std(), .median() each return a NamedScalar: a float that also carries the series name. .median() is approximate; for exact, go through .to_pandas().

peak = power.max()
print(peak, float(peak))

Operators

+, -, *, / work between two series sharing the same time index (the common case when both come from the same frame), and between a series and a scalar.

delta = current - 1.5            # subtract a setpoint, unit "A"
gain = 2.0 * voltage             # scalar × series, unit "V"
ratio = power / current          # unit becomes "V·A/A"
neg = -voltage                   # unary negation, unit preserved

Unit composition:

  • + / -: units must match. One-sided None propagates; mismatched raise TypeError — call .derive(label=..., unit=...) to assert.
  • *: None propagates; otherwise f"{a}·{b}".
  • /: matching units cancel to None; otherwise f"{a}/{b}".

For series from different frames or with different lengths, drop into pandas and re-enter via .derive().

Integrate, differentiate, clip, mask

energy = power.integrate()                       # cumulative trapezoidal; unit gains "·s"; first sample = 0
slew = current.derivative()                      # gradient over time; unit gains "/s"
limited = current.clip(lower=-100, upper=100)    # unit and identity preserved

# Mask out charging samples (current < 0) — NaN where False.
import pyarrow as pa, pyarrow.compute as pc
discharging = pc.greater(current.to_arrow(), pa.scalar(0.0))
power_when_discharging = power.where(discharging)

where() requires a numeric series. The mask length must match len(series). For advanced quadrature, drop into pandas/numpy and use .derive() to come back.

Escape hatch back to a SignalSeries

When operators don't cover what you need (rolling windows, resampling, FFTs), drop into pandas / numpy, then re-enter via derive():

rolling = voltage.to_pandas().rolling("5s").mean()

smoothed = voltage.derive(
    rolling.to_numpy(),
    label="cell_voltage (5s rolling avg)",
    unit="V",
)
print(smoothed.legend)     # "cell_voltage (5s rolling avg) (V)"

derive() requires the new values to match the original time-index length.

Use the legend on charts

.legend is "name (unit)" — drop it into axis labels so units flow into the plot:

import altair as alt

df = power.to_pandas().rename("power").reset_index()
chart = (
    alt.Chart(df)
    .mark_line()
    .encode(x="time:T", y=alt.Y("power:Q", title=power.legend))
)

Errors

Every SDK error inherits from AgentError. Branch on the granular subclasses:

from zelos_sdk.agent import (
    AgentError, SignalNotFound, AmbiguousSignal, NoData,
)

try:
    v = agent.latest("bus0/BMS_message/status.cell_volt")
except SignalNotFound as exc:
    print("typo? did you mean:", exc.suggestions)
except AmbiguousSignal:
    # Path matches >1 producer or segment — pass a list, or a resolved Signal.
    matches = list(agent.signals().search("cell_voltage"))
    v = agent.latest(matches[:1])
except NoData:
    # Catalog hit, but no samples within `lookback`.
    v = None
except AgentError:
    raise

QueryRangeTooLarge is raised by agent.query() (not latest()) when the requested time range exceeds the agent's per-query budget. Shrink the window or pass downsample=N.

SignalNotFound.suggestions is up to three close paths — surface them in your CLI / notebook errors.

Internal is not an AgentError

zelos_sdk.agent.Internal is a RuntimeError subclass reserved for SDK invariant violations (e.g. the SDK couldn't decode something the agent sent). It deliberately lives outside the AgentError hierarchy so a broad except AgentError: doesn't accidentally swallow what should be a bug report. If you want to catch every SDK-side problem in one branch, use except (AgentError, Internal): and please file an issue when you hit Internal.

Tips

  • Mix strings and Signals freely. Anywhere paths is accepted, you can pass "bus0/*", a Signal, or a list mixing both.
  • Pass a Signal to disambiguate. If latest("...") keeps raising AmbiguousSignal, look it up in the catalog and pass the resolved Signal.
  • producers=() spans the whole agent. The default ("localhost",) only sees local signals.
  • downsample=N for charts, raw for analysis. downsample=400 typically renders in milliseconds with no perceptible quality loss.
  • Guard with max_rows. When the time range is user-supplied, set max_rows=100_000 to keep a typo from pulling a multi-hour window.
  • watch(paths=None) snapshots the catalog at entry. Restart the iterator to pick up signals that come online later.

What's next

  • Open Trace Files

    Use query(), at(), and window() on saved .trz files.

  • Run Actions

    Inspect schemas, execute typed Actions, and handle results.

  • Quick Start

    Connect to an agent and run your first query in under 2 minutes.