Python SDK Reference¶
This section is generated from the installed zelos_sdk package.
Use the sidebar to browse modules, or jump directly:
Package Overview¶
zelos_sdk
¶
DataSegment
¶
Python wrapper for a data segment.
Represents a segment of trace data with metadata about its time range and producer.
QueryResult
¶
Python wrapper for query results.
Contains the results of a trace data query, including field names, the raw Arrow data as Python bytes, and the SQL query that was executed.
to_arrow()
¶
Convert the Arrow data to a Python object that can be read by PyArrow.
Returns: bytes: Arrow IPC stream data
Examples: >>> import pyarrow as pa >>> result = reader.query(...) >>> arrow_bytes = result.to_arrow() >>> reader = pa.ipc.open_stream(arrow_bytes) >>> table = reader.read_all()
TimeRange
¶
Python wrapper for time range.
Represents a time interval with start and end timestamps.
TraceEventFieldMetadata
¶
Metadata describing a field in a trace event schema.
This class defines the structure of a field within an event schema, including its name, data type, and optional unit of measurement.
Args: name (str): The field name. data_type (DataType): The data type for the field. unit (Optional[str]): Optional unit of measurement.
Examples: >>> # Define a field for HTTP status code >>> status_field = TraceEventFieldMetadata("status_code", DataType.Int32) >>> >>> # Define a field with a unit of measurement >>> duration_field = TraceEventFieldMetadata( ... "duration_ms", DataType.Float64, "milliseconds")
TraceMetadata
¶
Python wrapper for trace metadata.
Contains information about a complete trace including its time range, producer, and associated data segments.
TraceNamespace
¶
A namespace that manages and organizes TraceSources.
TraceNamespace provides a centralized registry for TraceSources with an isolated router. Each namespace has its own router, allowing complete isolation between different namespaces for testing or multi-tenant scenarios.
Examples: >>> # Create an isolated namespace >>> ns = TraceNamespace("my_app") >>> source = TraceSource("service", namespace=ns) >>> with TraceWriter("data.trz", namespace=ns) as writer: ... source.log("event", value=42)
TraceReadEvent
¶
Python wrapper for a trace read event.
Represents an event containing multiple fields.
TraceReadEventField
¶
Python wrapper for a trace read event field.
Represents a single field within an event.
TraceReadSource
¶
Python wrapper for a trace read source.
Represents a source (e.g., "can") containing multiple events.
TraceReader
¶
Python wrapper for the TraceReader.
This reader provides read-only access to trace files, allowing you to query metadata and retrieve trace data programmatically. It supports listing data segments, querying time ranges, and retrieving raw or downsampled data.
The reader uses context management and should be used with a with statement
to ensure proper resource cleanup.
Complete End-to-End Workflow¶
This example demonstrates opening a trace file, discovering available fields, and querying specific data:
import zelos_sdk
import pyarrow as pa
# Open trace file for reading
with zelos_sdk.TraceReader("recording.trz") as reader:
# Discover available segments
segments = reader.list_data_segments()
assert len(segments) > 0
# Discover available fields hierarchically
sources = reader.list_fields()
assert len(sources) > 0
# Navigate hierarchy: source → event → field
can_source = next(s for s in sources if s.name == "can")
msg_event = next(e for e in can_source.events if e.name == "VehicleSpeed")
speed_field = next(f for f in msg_event.fields if f.name == "speed")
# Query discovered field
time_range = reader.time_range()
result = reader.query(
data_segment_ids=[s.id for s in segments],
fields=[speed_field.path], # "*/can/VehicleSpeed.speed"
start=time_range.start,
end=time_range.end,
)
# Verify data received
arrow_reader = pa.ipc.open_stream(result.to_arrow())
table = arrow_reader.read_all()
assert table.num_rows > 0
assert speed_field.path in result.fields or any(speed_field.path in s for s in result.fields)
__repr__()
¶
String representation of the reader.
close()
¶
Close the trace reader.
This method closes the trace file and releases resources. It's automatically called when exiting the context manager.
Returns: None
get_value_table(data_segment_id, field_path)
¶
Get the value table (enum mapping) for a specific field.
Args: data_segment_id (str): Data segment ID to query. field_path (str): Field path in format "source/event.field" (without the "*/" prefix).
Returns: dict: Mapping of integer keys to string values, or None if no value table exists.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... # Get enum mapping for a status field ... status_map = reader.get_value_table( ... segments[0].id, ... "controller/state.status" ... ) ... if status_map: ... print(status_map) # {0: "IDLE", 1: "RUNNING", 2: "ERROR"}
list_data_segments()
¶
List all data segments in the trace.
Returns: List[DataSegment]: List of data segment metadata.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... for seg in segments: ... print(f"Segment {seg.id}: {seg.producer}")
list_data_segments_in_time_range(start, end)
¶
List data segments within a specific time range.
Args: start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).
Returns: List[DataSegment]: List of data segments overlapping the time range.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> from datetime import datetime, timezone >>> start = datetime(2024, 1, 1, tzinfo=timezone.utc) >>> end = datetime(2024, 1, 2, tzinfo=timezone.utc) >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments_in_time_range(start, end)
list_fields(data_segment_id=...)
¶
List all fields in the trace organized by source and event.
This method discovers all available fields in the trace by querying the database schema and organizing them hierarchically by source and event.
Args: data_segment_id (str, optional): Specific data segment ID to query. If None, queries all segments.
Returns: List[TraceReadSource]: List of sources, each containing events and fields.
Raises: RuntimeError: If the reader is not open or query fails.
Example: Discover and Query Fields¶
with TraceReader("recording.trz") as reader:
# Discover all available fields
sources = reader.list_fields()
assert len(sources) > 0
# Navigate the hierarchy
for source in sources:
for event in source.events:
for field in event.fields:
# field.path is the field path for queries (e.g., "*/can/VehicleSpeed.speed")
assert field.path.startswith("*/") and "." in field.path
list_traces()
¶
List all traces in the trace file.
Returns: List[TraceMetadata]: List of trace metadata.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... traces = reader.list_traces() ... for trace in traces: ... print(f"Trace {trace.name}: {trace.start_date} to {trace.end_date}")
open()
¶
Open the trace file for reading.
This method initializes the reader and opens the trace file in read-only mode. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the trace file cannot be opened.
query(data_segment_ids, fields, start, end)
¶
Query data for specified fields within a time range.
This returns raw, unsampled data for the requested fields.
Args: data_segment_ids (List[str]): List of data segment IDs to query. fields (List[str]): List of field paths (e.g., "bus0/msg1/sig1"). start (datetime): Start of time range (inclusive). end (datetime): End of time range (inclusive).
Returns: QueryResult: Query results with Arrow data.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... segments = reader.list_data_segments() ... time_range = reader.time_range() ... result = reader.query( ... data_segment_ids=[s.id for s in segments], ... fields=["bus0/msg1/sig1", "bus0/msg2/sig3"], ... start=time_range.start, ... end=time_range.end, ... ) ... # Convert to PyArrow table ... import pyarrow as pa ... arrow_reader = pa.ipc.open_stream(result.to_arrow()) ... table = arrow_reader.read_all() ... df = table.to_pandas()
time_range()
¶
Get the time range covered by the trace.
Returns: TimeRange: Object containing start and end timestamps.
Raises: RuntimeError: If the reader is not open or query fails.
Examples: >>> with TraceReader("my_trace.trz") as reader: ... time_range = reader.time_range() ... print(f"Start: {time_range.start}") ... print(f"End: {time_range.end}")
TraceSender
¶
Communication channel for sending trace events.
This class is typically obtained from a TracePublishClient and passed to a TraceSource during creation. It handles the underlying message transport.
Note: Users generally don't need to interact with this class directly; it's used internally to connect TraceSource to TracePublishClient.
TraceSourceCacheLast(name)
¶
A TraceSource wrapper that caches the last value of each field.
Example: source = TraceSourceCacheLast("motor_controller") source.add_event("motor_stats", [ TraceEventFieldMetadata("rpm", DataType.Float64), TraceEventFieldMetadata("torque", DataType.Float64, "Nm") ])
# Log some data
source.log("motor_stats", {"rpm": 3500.0, "torque": 42.8})
# Access cached values
assert source.motor_stats.rpm.get() == 3500.0
assert source.motor_stats.torque.get() == 42.8
# Dictionary-style access
assert source["motor_stats"].rpm.get() == 3500.0
assert source["motor_stats/rpm"] == 3500.0
# Log via event object
source.motor_stats.log(rpm=3250.0, torque=45.2)
__getattr__(name)
¶
Get an event by attribute access. Only returns existing events.
__getitem__(key)
¶
Support dictionary-style access for events and fields.
Examples: source["my_event"] # Returns TraceSourceCacheLastEvent source["my_event/subevent"] # Returns nested TraceSourceCacheLastEvent source["my_event/field"] # Returns TraceSourceCacheLastField object source["event/sub/field"] # Returns deeply nested TraceSourceCacheLastField object
add_event(name, schema, conditions=None)
¶
Add an event to the source and create a cached version.
add_event_from_dict(name, data)
¶
Add an event from a dictionary and create a cached version.
Args: name: The name of the event. data: A dictionary representing the event data.
Returns: TraceSourceCacheLastEvent: The cached event.
add_value_table(name, field_name, data)
¶
Add a value table (enum mapping) to the underlying TraceSource.
Args: name: The name of the value table (typically the event name). field_name: The field name this value table applies to. data: A dictionary mapping integer values to string labels.
Examples: >>> source.add_value_table("motor_status", "state", {0: "stopped", 1: "running"})
get_event(name)
¶
Get a cached event by name.
Args: name: The name of the event.
Returns: TraceSourceCacheLastEvent: The cached event.
Raises: KeyError: If the event doesn't exist.
get_source()
¶
Get the underlying TraceSource.
log(name, data)
¶
Log data to an event and update the cache.
log_at(time_ns, name, data)
¶
Log data to an event at a specific time and update the cache.
log_dict(name, data)
¶
Log data to an event using dictionary format and update the cache.
This is an alias for log() to match the TraceSource API.
Args: name: The name of the event. data: A dictionary of field values to log.
log_dict_at(time_ns, name, data)
¶
Log data to an event at a specific time using dictionary format and update the cache.
This is an alias for log_at() to match the TraceSource API.
Args: time_ns: The timestamp in nanoseconds. name: The name of the event. data: A dictionary of field values to log.
set_default_log_condition(condition=DefaultLogCondition())
¶
Set a default log condition that applies to all fields without explicit conditions.
This will also update existing fields that currently use the default condition to use the new default.
Args: condition: The default logging condition to use, or None to disable default conditions.
TraceSourceCacheLastEvent(name, event, source, conditions=None)
¶
A cached event that provides access to fields and submessages.
Example: event = source.motor_stats event.rpm.get() # Get field value event.thermal.temp.get() # Get nested field value event.log(rpm=3500) # Log new values
TraceSourceCacheLastField(name, metadata, full_path, condition=None, uses_default=False)
¶
A cached field that stores the last logged value.
Example: field = event.rpm # Get field field.get() # Get cached value field.name # Get full path like "motor_stats.rpm"
TraceStdout
¶
Python wrapper for the stdout trace sink.
This sink outputs trace events to stdout with configurable log levels. It subscribes to all trace events from the router and formats them as structured log messages.
The sink uses context management and should be used with a with statement
to ensure proper resource cleanup and automatic start/stop of trace capture.
Examples: >>> # Basic usage with default settings (info level) >>> with TraceStdout() as sink: ... # Trace events will be logged to stdout ... pass >>> >>> # Custom log level and batch configuration >>> with TraceStdout(log_level="debug", batch_size=500, batch_timeout_ms=2000) as sink: ... # Trace events will be logged with custom settings ... pass
__repr__()
¶
String representation of the sink.
close()
¶
Stop the stdout sink and finalize trace capture.
This method gracefully shuts down the sink and cancels background tasks. It's automatically called when exiting the context manager.
Returns: None
open()
¶
Start the stdout sink and begin capturing events.
This method subscribes to the trace router and starts a background task to process and output trace events to stdout. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the sink cannot be initialized.
TraceWriter
¶
Python wrapper for the TraceWriter.
This writer manages writing trace events to a local file, with support for batching and buffering. It can be used with a TraceSource to capture events for later analysis.
The writer uses context management and should be used with a with statement
to ensure proper resource cleanup and automatic start/stop of trace capture.
Examples: >>> # Basic usage with default settings >>> with TraceWriter("my_trace.trz") as writer: ... # Trace events will be captured automatically ... pass >>> >>> # Custom batch configuration >>> with TraceWriter("my_trace.trz", batch_size=500, batch_timeout_ms=2000) as writer: ... # Trace events will be captured with custom batch settings ... pass
__repr__()
¶
String representation of the writer.
close()
¶
Stop the trace writer and finalize trace capture.
This method gracefully shuts down the writer, cancels background tasks, and ensures all buffered events are written to the trace file. It's automatically called when exiting the context manager.
Returns: None
Note: This method is called automatically by exit when using the context manager pattern.
open()
¶
Start the trace writer and begin capturing events.
This method initializes the writer and starts background tasks for batching and writing trace events. It's automatically called when entering the context manager (with statement).
Returns: None
Raises: RuntimeError: If the writer cannot be initialized.
Note: This method is called automatically by enter when using the context manager pattern.
get_global_router_sender()
¶
Get the global default trace router sender (from global namespace)
Returns: TraceSender: The global namespace's router sender
Examples: >>> sender = get_global_router_sender()
init(name=None, *, url=None, client_config=None, log_level=None, trace=True, actions=False, block=False)
¶
Initialize the Zelos SDK tracing and actions systems.
Args: name: A unique identifier for your application. Defaults to "python". client_config: Configuration options for the TracePublishClient. Can include: url, batch_size, batch_timeout_ms. log_level: Logging level to enable, None disables logging. trace: Whether to initialize the trace system. Defaults to True. actions: Whether to initialize the actions system. Defaults to False. block: Whether to block the current thread until exit. Defaults to False.
Examples: >>> # Initialize with defaults >>> init() >>> >>> # Initialize with custom name >>> init("my_app") >>> >>> # Initialize with custom config >>> init( ... "my_app", ... url="grpc://localhost:2300", ... log_level="debug" ... ) >>> >>> # Initialize only logging, no tracing or actions >>> init(log_level="debug", trace=False) >>> >>> # Initialize actions and block until exit >>> init(actions=True, block=True)