How to Read Trace Files¶
Python Only
The following APIs are only available in the python API
Read .trz trace files for offline analysis, debugging, or post-processing.
PyArrow
The TraceReader API returns data as PyArrow tables. Use pyarrow.compute for analysis operations.
Basic Reading¶
Use TraceReader to query metadata and data from trace files:
import zelos_sdk
import pyarrow as pa
# Read trace file using context manager
with zelos_sdk.TraceReader("output.trz") as reader:
    # Get time range
    time_range = reader.time_range()
    print(f"Trace from {time_range.start} to {time_range.end}")
    # List data segments
    segments = reader.list_data_segments()
    print(f"Found {len(segments)} data segments")
    # Query data (field format: */source/event.field)
    result = reader.query(
        data_segment_ids=[s.id for s in segments],
        fields=["*/experiment/measurement.value"],
        start=time_range.start,
        end=time_range.end,
    )
    # Convert to PyArrow table
    arrow_reader = pa.ipc.open_stream(result.to_arrow())
    table = arrow_reader.read_all()
    print(f"Retrieved {table.num_rows} rows with columns: {table.column_names}")
Reading Methods¶
Context Manager (Recommended)¶
with zelos_sdk.TraceReader("data.trz") as reader:
    segments = reader.list_data_segments()
    # File automatically closed
Manual Control¶
reader = zelos_sdk.TraceReader("data.trz")
try:
    reader.open()
    segments = reader.list_data_segments()
finally:
    reader.close()  # Always close!
Common Patterns¶
Field Discovery¶
Discover available fields before querying:
import zelos_sdk
import pyarrow as pa
# 1. Open trace file for reading
with zelos_sdk.TraceReader("recording.trz") as reader:
    # 2. Discover available segments
    segments = reader.list_data_segments()
    assert len(segments) > 0
    # 3. Discover all available fields hierarchically
    sources = reader.list_fields()
    # Navigate hierarchy: source → event → field
    for source in sources:
        print(f"Source: {source.name}")
        for event in source.events:
            print(f"  Event: {event.name}")
            for field in event.fields:
                print(f"    Field: {field.name} → {field.path}")
    # 4. Query discovered field
    can_source = next(s for s in sources if s.name == "can")
    speed_event = next(e for e in can_source.events if e.name == "VehicleSpeed")
    speed_field = next(f for f in speed_event.fields if f.name == "speed")
    time_range = reader.time_range()
    result = reader.query(
        data_segment_ids=[s.id for s in segments],
        fields=[speed_field.path],  # "*/can/VehicleSpeed.speed"
        start=time_range.start,
        end=time_range.end,
    )
    # Convert to Arrow table
    arrow_reader = pa.ipc.open_stream(result.to_arrow())
    table = arrow_reader.read_all()
    print(f"Retrieved {table.num_rows} rows")
Query and Analyze Data¶
Query trace data and perform analysis using PyArrow:
import zelos_sdk
import pyarrow as pa
import pyarrow.compute as pc
with zelos_sdk.TraceReader("trace.trz") as reader:
    segments = reader.list_data_segments()
    time_range = reader.time_range()
    # Query field data
    result = reader.query(
        data_segment_ids=[s.id for s in segments],
        fields=["*/sensor/temperature.value"],
        start=time_range.start,
        end=time_range.end
    )
    # Convert to Arrow table
    arrow_reader = pa.ipc.open_stream(result.to_arrow())
    table = arrow_reader.read_all()
    # Analyze with PyArrow
    temp_col = [c for c in table.column_names if "temperature" in c][0]
    values = table.column(temp_col)
    print(f"Mean: {pc.mean(values).as_py():.2f}")
    print(f"Min: {pc.min(values).as_py():.2f}")
    print(f"Max: {pc.max(values).as_py():.2f}")
Metadata Inspection¶
Query trace metadata without loading data:
import zelos_sdk
def inspect_trace(filename):
    """Display trace file information"""
    with zelos_sdk.TraceReader(filename) as reader:
        # Time coverage
        time_range = reader.time_range()
        print(f"Time Range: {time_range.start} to {time_range.end}")
        # Data segments
        segments = reader.list_data_segments()
        print(f"\nData Segments ({len(segments)}):")
        for seg in segments:
            print(f"  {seg.id}")
            print(f"    Producer: {seg.producer}")
            if seg.start_date:
                print(f"    Time: {seg.start_date} to {seg.end_date}")
        # Traces (if available)
        traces = reader.list_traces()
        if traces:
            print(f"\nTraces ({len(traces)}):")
            for trace in traces:
                print(f"  {trace.name}")
                print(f"    ID: {trace.id}")
                print(f"    Segments: {len(trace.data_segments)}")
# Usage
inspect_trace("session_20240115_143022.trz")
Basic Analysis¶
Analyze trace data using PyArrow:
import zelos_sdk
import pyarrow as pa
import pyarrow.compute as pc
def load_fields(filename, field_patterns):
    """
    Load fields from trace file as Arrow table.
    Args:
        filename: Path to .trz file
        field_patterns: List of field paths (e.g., ["source/event/field"])
    Returns:
        PyArrow Table with time_s and requested fields
    """
    with zelos_sdk.TraceReader(filename) as reader:
        time_range = reader.time_range()
        segments = reader.list_data_segments()
        # Query data
        result = reader.query(
            data_segment_ids=[s.id for s in segments],
            fields=field_patterns,
            start=time_range.start,
            end=time_range.end,
        )
        # Convert to Arrow table
        arrow_reader = pa.ipc.open_stream(result.to_arrow())
        table = arrow_reader.read_all()
        return table
# Usage
table = load_fields("test.trz", [
    "experiment/measurement/value",
    "experiment/measurement/index"
])
# Find the value column (includes UUID prefix)
value_col = [c for c in table.column_names if c.endswith('/experiment/measurement/value')][0]
# Compute statistics using PyArrow
column = table.column(value_col)
print(f"Mean: {pc.mean(column).as_py()}")
print(f"Max: {pc.max(column).as_py()}")
print(f"Min: {pc.min(column).as_py()}")
print(f"Rows: {table.num_rows}")
Note on Column Names: Arrow table columns use the full field path including the data segment ID (UUID). When you query with a wildcard pattern like */experiment/measurement/value, the resulting column name will be <uuid>/experiment/measurement/value. Use .endswith() to find the actual column name.
Time Range Filtering¶
Query specific time windows:
import zelos_sdk
import pyarrow as pa
def load_time_window(filename, start_time, end_time, fields):
    """Load data from a specific time range"""
    with zelos_sdk.TraceReader(filename) as reader:
        # Get segments in time range
        segments = reader.list_data_segments_in_time_range(start_time, end_time)
        if not segments:
            print("No data in specified time range")
            return None
        # Query the window
        result = reader.query(
            data_segment_ids=[s.id for s in segments],
            fields=fields,
            start=start_time,
            end=end_time,
        )
        # Convert to Arrow table
        arrow_reader = pa.ipc.open_stream(result.to_arrow())
        table = arrow_reader.read_all()
        return table
# Usage - query one hour of data
table = load_time_window(
    "long_recording.trz",
    start_time="2024-01-15T14:00:00Z",
    end_time="2024-01-15T15:00:00Z",
    fields=["*/sensor/temperature.value"]
)
Querying Enumeration Data¶
Query enum fields and convert integers to strings using stored value tables:
import zelos_sdk
import pyarrow as pa
with zelos_sdk.TraceReader("controller_log.trz") as reader:
    segments = reader.list_data_segments()
    time_range = reader.time_range()
    # Query enum field (returns integer values)
    result = reader.query(
        data_segment_ids=[s.id for s in segments],
        fields=["*/controller/state.status"],
        start=time_range.start,
        end=time_range.end,
    )
    # Extract integer values
    arrow_reader = pa.ipc.open_stream(result.to_arrow())
    table = arrow_reader.read_all()
    status_col = next(c for c in table.column_names if "status" in c)
    status_ints = table.column(status_col).to_pylist()  # [0, 1, 2]
    # Retrieve enum mapping from database and convert to strings
    status_map = reader.get_value_table(segments[0].id, "controller/state.status")
    if status_map:
        status_strs = [status_map[v] for v in status_ints]
        print(status_strs)  # ["IDLE", "RUNNING", "ERROR"]
See Also¶
- Recording Files - How to create trace files