Query Live Data¶
Read live data from a running agent: full samples, latest values, change snapshots, and live ticks. Saved .trz files use the same tools through Trace.query() — see Open Trace Files.
Snippets assume an agent connected via connect(), with paths from zelos live demo:
Browse the catalog¶
agent.signals() returns a SignalCatalog of every signal that produced data in the lookback window. It's iterable, sliceable (catalog[:50]), and supports in.
catalog = agent.signals(lookback=60.0)
print(len(catalog), "signal(s)")
for sig in catalog[:5]:
print(sig.path, sig.unit or "—", sig.data_type)
Filter and look up¶
search() is case-insensitive substring across path, source, message, name, and unit. For glob patterns, use fnmatch. For exact lookup, use by_path():
from fnmatch import fnmatch
voltage = catalog.search("voltage")
bms = [s for s in catalog if fnmatch(s.path, "bus0/*BMS*.*")]
sig = catalog.by_path("bus0/BMS_message/status.cell_voltage")
print(sig.unit, sig.data_type, sig.producer)
by_path() raises SignalNotFound or AmbiguousSignal if the path collides across producers/segments. Pass the resolved Signal to disambiguate downstream calls.
Span all producers¶
producers=("localhost",) is the default. Pass producers=() to include every connected producer, or a specific connection by name (e.g., producers=("rover-1",)):
all_signals = agent.signals(producers=())
print(sorted({s.producer for s in all_signals if s.producer}))
Segments¶
Each Segment carries id, producer, start, end, duration, sources, connection, and trace_path. Per-producer issues surface as RuntimeWarning.
Run a query¶
agent.query() returns a SignalFrame — one time column plus one column per matched signal.
frame = agent.query(
[
"bus0/BMS_message/status.cell_voltage",
"bus0/BMS_message/status.pack_current",
],
start="-30s",
end="now",
)
print(frame) # SignalFrame(rows=N, columns=2, time_unit='ns')
print(frame.head(3).to_pandas())
start and end accept a datetime (UTC-coerced if naive), "now", ISO 8601, or a relative offset ("-30s", "-2m", "-1.5h", "-1d"). end defaults to "now".
Downsample for charts¶
By default, query() returns every sample. For charts, set downsample=N for about N points per signal. max_rows and sort are raw-only — mutually exclusive with downsample.
plot_frame = agent.query("bus0/*", start="-5m", downsample=400)
print(plot_frame.downsampled) # True
Wildcards¶
Any of the three positions in source/message.name can be *. Mix patterns and exact paths in one call. An empty expansion raises SignalNotFound with up to three "did you mean" suggestions.
frame = agent.query(
[
"bus0/BMS_message/status.*",
"eth0/sensor_data.sensor_value",
],
start="-30s",
)
Frame metadata¶
frame.time_range # TimeRange (or None if empty)
frame.query_duration_s # seconds
frame.downsampled # True if downsampled
frame.truncated # True if max_rows or an agent cap kicked in
frame.warnings # list[str] — per-producer warnings
A truncated frame still contains rows; check frame.truncated before assuming the window is complete.
Get one column¶
voltage = frame["bus0/BMS_message/status.cell_voltage"]
print(voltage.name, voltage.unit, len(voltage))
If the same path appears under multiple producers or segments, indexing by string raises AmbiguousSignal — look up the Signal and pass it: frame[catalog.by_path(...)].
Hand off to pandas / Arrow¶
df = frame.to_pandas() # DataFrame, UTC DatetimeIndex
table = frame.to_arrow() # pyarrow.Table
arr = voltage.to_arrow() # pa.Array of values
pandas + numpy live behind the [analysis] extra
pip install zelos-sdk[analysis] adds pandas and numpy. Without them, only to_arrow() and basic aggregations work — the SDK never silently downloads the scientific stack at runtime.
Bound the result size¶
frame = agent.query(
"bus0/BMS_message/status.*",
start="-1h",
max_rows=100_000, # 0 = no limit (default)
sort="desc", # newest first; "asc" is the default
)
Iterate columns and metadata¶
for sig in frame: # Signal objects in column order
print(sig.path, sig.unit)
for series in frame.values(): # SignalSeries
print(series.legend, len(series))
for path, series in frame.items(): # (path, SignalSeries)
print(path, series.min(), series.max())
for col in frame.columns[1:]: # ColumnMetadata (column 0 is time)
print(col.path, col.producer, col.data_segment_id)
Two columns can share a path when multiple producers emit the same signal; items() yields each column once.
Read latest values¶
agent.latest() returns the most recent value within [now − lookback, now]. Single path returns a LatestValue; a list returns dict[str, LatestValue]:
v = agent.latest("bus0/BMS_message/status.cell_voltage")
print(v.value, v.signal.unit, v.time)
print(v.formatted()) # "12.83 V"
latest = agent.latest([
"bus0/BMS_message/status.cell_voltage",
"bus0/BMS_message/status.pack_current",
"bus0/BMS_message/status.battery_level",
])
for path, lv in latest.items():
print(path, lv.formatted(), "@", lv.time)
LatestValue carries:
| field | meaning |
|---|---|
value |
typed scalar (int, float, bool, bytes, or str) |
raw |
display string from the agent |
time |
datetime (UTC), or None if unreported |
time_ns |
matching epoch nanoseconds |
signal |
full Signal metadata |
label |
signal.value_table[int(value)] if applicable, else None |
formatted() renders for display: enum label > number with unit > boolean > hex (binary) > raw string. digits=2 controls float precision.
event = agent.latest("eth0/sensor_events.event_type")
print(event.value, "→", event.formatted()) # "INFO" → "INFO"
The single-path arm raises SignalNotFound, NoData, or AmbiguousSignal. The list arm silently omits missing keys; only per-key AmbiguousSignal raises. Pass a resolved Signal to disambiguate.
Read at a cursor time¶
agent.at() returns the value at-or-before a specific instant. Always returns a dict.
from datetime import datetime, timezone
cursor = datetime(2024, 5, 1, 12, 30, tzinfo=timezone.utc)
values = agent.at(["bus0/BMS_message/status.cell_voltage"], cursor)
time accepts the same shapes as query() — datetime, "now", ISO 8601, or relative offset.
Pass min_time to stay inside the current segment
at() looks across all recorded segments by default — if a path has historical data in older segments, the call raises AmbiguousSignal. Bound the search:
lookback (default 60.0s) controls the catalog window used to expand wildcards and attach unit metadata.
Snapshot a window of changes¶
agent.window() returns a ReplayWindow: an opening snapshot plus one entry per value change over [start, start + duration]. Reach for it instead of query() for transitions — state machines, mode flags, gear positions yield one entry per change instead of thousands of duplicates.
window = agent.window(
"bus1/inverter_status.*",
start="-5s",
duration=5.0,
)
print(len(window.snapshot), "opening value(s)")
print(len(window.changes), "change(s)")
print(window.start, "→", window.end)
window.snapshot and window.changes are both list[LatestValue]. Use window.start_ns / window.end_ns (or .start / .end) as the actual bounds — they may differ slightly from your input if the agent aligns to sample boundaries.
Watch live values¶
agent.watch() polls latest() on an interval and yields a dict[str, LatestValue] per tick. until accepts a datetime / ISO string deadline, or a float seconds-from-now. until=None runs forever.
paths = [
"bus0/BMS_message/status.cell_voltage",
"bus0/BMS_message/status.battery_level",
]
for tick in agent.watch(paths, interval=1.0, until=30.0):
soc = tick.get("bus0/BMS_message/status.battery_level")
if soc and float(soc.value) < 20:
print("low battery!", soc.formatted())
paths=None snapshots the catalog once at entry — signals that come online later are not picked up. interval is clamped to 0.05s. Lower lookback to drop stale values faster.
Surviving transient errors¶
on_error="skip" yields an empty dict for the failing tick and tolerates up to max_consecutive_errors consecutive failures (strictly >) before re-raising; a successful tick resets the counter. on_error="raise" (default) propagates the first AgentError.
The empty-dict yield is your only signal — there's no warning emitted today. Inspect the next tick or instrument your loop:
skipped = 0
for tick in agent.watch(
paths,
interval=1.0,
on_error="skip",
max_consecutive_errors=5,
):
if not tick:
skipped += 1
continue
skipped = 0
...
Compute on series with units¶
Every column you pull off a SignalFrame is a SignalSeries. Series carry their units and compose through arithmetic, integration, and differentiation — no manual bookkeeping.
frame = agent.query(
[
"bus0/BMS_message/status.cell_voltage",
"bus0/BMS_message/status.pack_current",
],
start="-1m",
)
voltage = frame["bus0/BMS_message/status.cell_voltage"] # unit "V"
current = frame["bus0/BMS_message/status.pack_current"] # unit "A"
power = voltage * current # SignalSeries, unit "V·A"
energy = power.integrate() # SignalSeries, unit "V·A·s"
print(power.legend) # "(...cell_voltage * ...pack_current) (V·A)"
print(power.max(), power.mean()) # NamedScalar(...) — a float subclass
Aggregations¶
.min(), .max(), .mean(), .std(), .median() each return a NamedScalar: a float that also carries the series name. .median() is approximate; for exact, go through .to_pandas().
Operators¶
+, -, *, / work between two series sharing the same time index (the common case when both come from the same frame), and between a series and a scalar.
delta = current - 1.5 # subtract a setpoint, unit "A"
gain = 2.0 * voltage # scalar × series, unit "V"
ratio = power / current # unit becomes "V·A/A"
neg = -voltage # unary negation, unit preserved
Unit composition:
+/-: units must match. One-sidedNonepropagates; mismatched raiseTypeError— call.derive(label=..., unit=...)to assert.*:Nonepropagates; otherwisef"{a}·{b}"./: matching units cancel toNone; otherwisef"{a}/{b}".
For series from different frames or with different lengths, drop into pandas and re-enter via .derive().
Integrate, differentiate, clip, mask¶
energy = power.integrate() # cumulative trapezoidal; unit gains "·s"; first sample = 0
slew = current.derivative() # gradient over time; unit gains "/s"
limited = current.clip(lower=-100, upper=100) # unit and identity preserved
# Mask out charging samples (current < 0) — NaN where False.
import pyarrow as pa, pyarrow.compute as pc
discharging = pc.greater(current.to_arrow(), pa.scalar(0.0))
power_when_discharging = power.where(discharging)
where() requires a numeric series. The mask length must match len(series). For advanced quadrature, drop into pandas/numpy and use .derive() to come back.
Escape hatch back to a SignalSeries¶
When operators don't cover what you need (rolling windows, resampling, FFTs), drop into pandas / numpy, then re-enter via derive():
rolling = voltage.to_pandas().rolling("5s").mean()
smoothed = voltage.derive(
rolling.to_numpy(),
label="cell_voltage (5s rolling avg)",
unit="V",
)
print(smoothed.legend) # "cell_voltage (5s rolling avg) (V)"
derive() requires the new values to match the original time-index length.
Use the legend on charts¶
.legend is "name (unit)" — drop it into axis labels so units flow into the plot:
import altair as alt
df = power.to_pandas().rename("power").reset_index()
chart = (
alt.Chart(df)
.mark_line()
.encode(x="time:T", y=alt.Y("power:Q", title=power.legend))
)
Errors¶
Every SDK error inherits from AgentError. Branch on the granular subclasses:
from zelos_sdk.agent import (
AgentError, SignalNotFound, AmbiguousSignal, NoData,
)
try:
v = agent.latest("bus0/BMS_message/status.cell_volt")
except SignalNotFound as exc:
print("typo? did you mean:", exc.suggestions)
except AmbiguousSignal:
# Path matches >1 producer or segment — pass a list, or a resolved Signal.
matches = list(agent.signals().search("cell_voltage"))
v = agent.latest(matches[:1])
except NoData:
# Catalog hit, but no samples within `lookback`.
v = None
except AgentError:
raise
QueryRangeTooLarge is raised by agent.query() (not latest()) when the requested time range exceeds the agent's per-query budget. Shrink the window or pass downsample=N.
SignalNotFound.suggestions is up to three close paths — surface them in your CLI / notebook errors.
Internal is not an AgentError
zelos_sdk.agent.Internal is a RuntimeError subclass reserved for SDK invariant violations (e.g. the SDK couldn't decode something the agent sent). It deliberately lives outside the AgentError hierarchy so a broad except AgentError: doesn't accidentally swallow what should be a bug report. If you want to catch every SDK-side problem in one branch, use except (AgentError, Internal): and please file an issue when you hit Internal.
Tips¶
- Mix strings and
Signals freely. Anywherepathsis accepted, you can pass"bus0/*", aSignal, or a list mixing both. - Pass a
Signalto disambiguate. Iflatest("...")keeps raisingAmbiguousSignal, look it up in the catalog and pass the resolvedSignal. producers=()spans the whole agent. The default("localhost",)only sees local signals.downsample=Nfor charts, raw for analysis.downsample=400typically renders in milliseconds with no perceptible quality loss.- Guard with
max_rows. When the time range is user-supplied, setmax_rows=100_000to keep a typo from pulling a multi-hour window. watch(paths=None)snapshots the catalog at entry. Restart the iterator to pick up signals that come online later.
What's next¶
-
Use
query(),at(), andwindow()on saved.trzfiles. -
Inspect schemas, execute typed Actions, and handle results.
-
Connect to an agent and run your first query in under 2 minutes.