Python SDK Reference¶
This section is generated from the installed zelos_sdk package.
Use the sidebar to browse modules, or jump directly:
Package Overview¶
zelos_sdk
¶
DataSegment
¶
Python wrapper for a data segment.
Represents a segment of trace data with metadata about its time range and producer.
QueryResult
¶
Python wrapper for query results.
Contains the results of a trace data query, including field names, the raw Arrow data as Python bytes, and the SQL query that was executed.
to_arrow()
¶
Convert the Arrow data to a Python object that can be read by PyArrow.
Returns: bytes: Arrow IPC stream data
Examples: >>> import pyarrow as pa >>> result = reader.query(...) >>> arrow_bytes = result.to_arrow() >>> reader = pa.ipc.open_stream(arrow_bytes) >>> table = reader.read_all()
TimeRange
¶
Python wrapper for time range.
Represents a time interval with start and end timestamps.
TraceEventFieldMetadata
¶
Metadata describing a field in a trace event schema.
This class defines the structure of a field within an event schema, including its name, data type, and optional unit of measurement.
Args: name (str): The field name. data_type (DataType): The data type for the field. unit (Optional[str]): Optional unit of measurement.
Examples: >>> # Define a field for HTTP status code >>> status_field = TraceEventFieldMetadata("status_code", DataType.Int32) >>> >>> # Define a field with a unit of measurement >>> duration_field = TraceEventFieldMetadata( ... "duration_ms", DataType.Float64, "milliseconds")
TraceMetadata
¶
Python wrapper for trace metadata.
Contains information about a complete trace including its time range, producer, and associated data segments.
TraceNamespace
¶
A namespace that manages and organizes TraceSources.
TraceNamespace provides a centralized registry for TraceSources with an isolated router. Each namespace has its own router, allowing complete isolation between different namespaces for testing or multi-tenant scenarios.
Examples: >>> # Create an isolated namespace >>> ns = TraceNamespace("my_app") >>> source = TraceSource("service", namespace=ns) >>> with TraceWriter("data.trz", namespace=ns) as writer: ... source.log("event", value=42)
TraceReadEvent
¶
Python wrapper for a trace read event.
Represents an event containing multiple fields.
TraceReadEventField
¶
Python wrapper for a trace read event field.
Represents a single field within an event.
TraceReadSource
¶
Python wrapper for a trace read source.
Represents a source (e.g., "can") containing multiple events.
TraceReader
¶
Python wrapper for the TraceReader.
This reader provides read-only access to trace files, allowing you to query metadata and retrieve trace data programmatically. It supports listing data segments, querying time ranges, and retrieving raw or downsampled data.
The reader uses context management and should be used with a with statement
to ensure proper resource cleanup.
Complete End-to-End Workflow¶
This example demonstrates opening a trace file, discovering available fields, and querying specific data:
import zelos_sdk
import pyarrow as pa
# Open trace file for reading
with zelos_sdk.TraceReader("recording.trz") as reader:
# Discover available segments
segments = reader.list_data_segments()
assert len(segments) > 0
# Discover available fields hierarchically
sources = reader.list_fields()
assert len(sources) > 0
# Navigate hierarchy: source → event → field
can_source = next(s for s in sources if s.name == "can")
msg_event = next(e for e in can_source.events if e.name == "VehicleSpeed")
speed_field = next(f for f in msg_event.fields if f.name == "speed")
# Query discovered field
time_range = reader.time_range()
result = reader.query(
data_segment_ids=[s.id for s in segments],
fields=[speed_field.path], # "*/can/VehicleSpeed.speed"
start=time_range.start,
end=time_range.end,
)
# Verify data received
arrow_reader = pa.ipc.open_stream(result.to_arrow())
table = arrow_reader.read_all()
assert table.num_rows > 0
assert speed_field.path in result.fields or any(speed_field.path in s for s in result.fields)
__repr__()
¶
String representation of the reader.
close()
¶
Close the trace reader.
This method closes the trace file and releases resources. It's automatically called when exiting the context manager.
Returns: None
get_value_table(data_segment_id, field_path)
¶
Get the value table (enum mapping) for a specific field.
Args: data_segment_id (str): Data segment ID to query. field_path (str): Field path in format "source/event.field" (without the "*/" prefix).
Returns: dict: Mapping of integer keys to string values, or None if no value table exists.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... # Get enum mapping for a status field ... status_map = reader.get_value_table( ... segments[0].id, ... "controller/state.status" ... ) ... if status_map: ... print(status_map) # {0: "IDLE", 1: "RUNNING", 2: "ERROR"}
list_data_segments()
¶
List all data segments in the trace.
Returns: List[DataSegment]: List of data segment metadata.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... for seg in segments: ... print(f"Segment {seg.id}: {seg.producer}")
list_data_segments_in_time_range(start, end)
¶
List data segments within a specific time range.
Args: start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).
Returns: List[DataSegment]: List of data segments overlapping the time range.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> from datetime import datetime, timezone >>> start = datetime(2024, 1, 1, tzinfo=timezone.utc) >>> end = datetime(2024, 1, 2, tzinfo=timezone.utc) >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments_in_time_range(start, end)
list_fields(data_segment_id=...)
¶
List all fields in the trace organized by source and event.
This method discovers all available fields in the trace by querying the database schema and organizing them hierarchically by source and event.
Args: data_segment_id (str, optional): Specific data segment ID to query. If None, queries all segments.
Returns: List[TraceReadSource]: List of sources, each containing events and fields.
Raises: RuntimeError: If the reader is not open or query fails.
Example: Discover and Query Fields¶
with TraceReader("recording.trz") as reader:
# Discover all available fields
sources = reader.list_fields()
assert len(sources) > 0
# Navigate the hierarchy
for source in sources:
for event in source.events:
for field in event.fields:
# field.path is the field path for queries (e.g., "*/can/VehicleSpeed.speed")
assert field.path.startswith("*/") and "." in field.path
list_traces()
¶
List all traces in the trace file.
Returns: List[TraceMetadata]: List of trace metadata.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... traces = reader.list_traces() ... for trace in traces: ... print(f"Trace {trace.name}: {trace.start_date} to {trace.end_date}")
open()
¶
Open the trace file for reading.
This method initializes the reader and opens the trace file in read-only mode. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the trace file cannot be opened.
query(data_segment_ids, fields, start, end)
¶
Query data for specified fields within a time range.
This returns raw, unsampled data for the requested fields.
Args: data_segment_ids (List[str]): List of data segment IDs to query. fields (List[str]): List of field paths (e.g., "bus0/msg1/sig1"). start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).
Returns: QueryResult: Query results with Arrow data.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... time_range = reader.time_range() ... result = reader.query( ... data_segment_ids=[s.id for s in segments], ... fields=["bus0/msg1/sig1", "bus0/msg2/sig3"], ... start=time_range.start, ... end=time_range.end, ... ) ... # Convert to PyArrow table ... import pyarrow as pa ... arrow_reader = pa.ipc.open_stream(result.to_arrow()) ... table = arrow_reader.read_all() ... df = table.to_pandas()
time_range()
¶
Get the time range covered by the trace.
Returns: TimeRange: Object containing start and end timestamps.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... time_range = reader.time_range() ... print(f"Start: {time_range.start}") ... print(f"End: {time_range.end}")
TraceSender
¶
Communication channel for sending trace events.
This class is typically obtained from a TracePublishClient and passed to a TraceSource during creation. It handles the underlying message transport.
Note: Users generally don't need to interact with this class directly; it's used internally to connect TraceSource to TracePublishClient.
TraceSourceCache(name, namespace=None)
¶
A TraceSource wrapper that caches the last value of each field.
Uses a Rust core for cache storage, condition evaluation, and emit decisions. Python layer provides Pythonic attribute navigation.
Example: source = TraceSourceCache("motor_controller") source.add_event("motor_stats", [ TraceEventFieldMetadata("rpm", DataType.Float64), TraceEventFieldMetadata("torque", DataType.Float64, "Nm") ]) source.log("motor_stats", {"rpm": 3500.0, "torque": 42.8}) assert source.motor_stats.rpm.get() == 3500.0
add_event(name, schema, conditions=None)
¶
Register an event schema.
add_value_table(name, field_name, data)
¶
Register a value table (enum mapping).
event_field_names(name)
¶
Get field names for a registered event.
field_data_type(event_name, field_name)
¶
Get field data type.
get_source()
¶
Not supported — use the cache API directly.
get_value(event_name, field_name)
¶
Get the cached value for a field, or None.
has_event(name)
¶
Check if an event is registered.
log(name, data)
¶
Log data and update cache. Auto-registers event if not yet registered.
log_at(time_ns, name, data)
¶
Log data at a specific timestamp.
set_default_log_condition(condition=None)
¶
Set default log condition. Pass a Python LogCondition or None.
TraceSourceCacheLastEvent(name, cache, source, conditions=None)
¶
A cached event with attribute access to fields and submessages.
Example: event = source.motor_stats event.rpm.get() # field access event.thermal.temp.get() # submessage access event.log(rpm=3500) # log via event
TraceSourceCacheLastField(name, full_path, data_type, cache, event_name, condition=None, uses_default=False)
¶
A cached field that stores the last logged value.
Example: field = event.rpm field.get() # Get cached value field.name # Full path: "motor_stats.rpm"
TraceSourceEvent
¶
__repr__()
¶
String representation of the event.
log(**kwargs)
¶
Log an event with a dictionary of fields.
Args: kwargs (dict): Keyword arguments to log.
Returns: None
Examples: >>> event = source.add_event("motor_stats") >>> event.log(rpm=3500, torque=42.8)
log_at(time_ns, **kwargs)
¶
Log an event with data provided as a dictionary at a specific time. Performs type checking based on the schema.
Args: time_ns (int): Timestamp in nanoseconds since Unix epoch. fields (dict[str, Any]): Dictionary of field names to values.
Raises: ValueError: If a field is not in the schema. TypeError: If a value's type doesn't match the schema. RuntimeError: If sending the event fails internally.
Examples: >>> event = source.get_event("motor_stats") >>> # Log with custom timestamp >>> event.log_at(1625097600000000000, { ... "rpm": 3500.0, ... "torque": 42.8, ... "temperature": 75.5 ... })
TraceStdout
¶
Python wrapper for the stdout trace sink.
This sink outputs trace events to stdout with configurable log levels. It subscribes to all trace events from the router and formats them as structured log messages.
The sink uses context management and should be used with a with statement
to ensure proper resource cleanup and automatic start/stop of trace capture.
Examples: >>> # Basic usage with default settings (info level) >>> with TraceStdout() as sink: ... # Trace events will be logged to stdout ... pass >>> >>> # Custom log level and batch configuration >>> with TraceStdout(log_level="debug", batch_size=500, batch_timeout_ms=2000) as sink: ... # Trace events will be logged with custom settings ... pass
__repr__()
¶
String representation of the sink.
close()
¶
Stop the stdout sink and finalize trace capture.
This method gracefully shuts down the sink and cancels background tasks. It's automatically called when exiting the context manager.
Returns: None
open()
¶
Start the stdout sink and begin capturing events.
This method subscribes to the trace router and starts a background task to process and output trace events to stdout. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the sink cannot be initialized.
TraceWriter
¶
Python wrapper for the TraceWriter.
This writer manages writing trace events to a local file, with support for batching and buffering. It can be used with a TraceSource to capture events for later analysis.
The writer uses context management and should be used with a with statement
to ensure proper resource cleanup and automatic start/stop of trace capture.
Examples: >>> # Basic usage with default settings >>> with TraceWriter("my_trace.trz") as writer: ... # Trace events will be captured automatically ... pass >>> >>> # Custom batch configuration >>> with TraceWriter("my_trace.trz", batch_size=500, batch_timeout_ms=2000) as writer: ... # Trace events will be captured with custom batch settings ... pass
__repr__()
¶
String representation of the writer.
close()
¶
Stop the trace writer and finalize trace capture.
This method gracefully shuts down the writer, cancels background tasks, and ensures all buffered events are written to the trace file. It's automatically called when exiting the context manager.
Returns: None
Note: This method is called automatically by exit when using the context manager pattern.
open()
¶
Start the trace writer and begin capturing events.
This method initializes the writer and starts background tasks for batching and writing trace events. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the writer cannot be initialized.
Note: This method is called automatically by enter when using the context manager pattern.
get_global_router_sender()
¶
Get the global default trace router sender (from global namespace)
Returns: TraceSender: The global namespace's router sender
Examples: >>> sender = get_global_router_sender()
init(name=None, *, url=None, client_config=None, log_level=None, trace=True, actions=False, block=False)
¶
Initialize the Zelos SDK tracing and actions systems.
Args: name: A unique identifier for your application. Defaults to "python". client_config: Configuration options for the TracePublishClient. Can include: url, batch_size, batch_timeout_ms. log_level: Logging level to enable, None disables logging. trace: Whether to initialize the trace system. Defaults to True. actions: Whether to initialize the actions system. Defaults to False. block: Whether to block the current thread until exit. Defaults to False.
Examples: >>> # Initialize with defaults >>> init() >>> >>> # Initialize with custom name >>> init("my_app") >>> >>> # Initialize with custom config >>> init( ... "my_app", ... url="grpc://localhost:2300", ... log_level="debug" ... ) >>> >>> # Initialize only logging, no tracing or actions >>> init(log_level="debug", trace=False) >>> >>> # Initialize actions and block until exit >>> init(actions=True, block=True)