Skip to content

How to Stream Data

Learn how to stream data from your application to Zelos at various rates and patterns.

Basic Streaming

The simplest way to stream data is to log events in a loop:

import time
import random  # For demo purposes
import zelos_sdk

zelos_sdk.init()
source = zelos_sdk.TraceSource("sensors")

# Define schema (optional but recommended)
temperature_event = source.add_event("temperature", [
    zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64, "°C")
])

while True:
    # Read your sensor (simulated here)
    temperature = 20.0 + random.uniform(-2, 2)

    # Log to Zelos
    temperature_event.log(value=temperature)

    # Control rate
    time.sleep(1.0)  # 1 Hz
use std::time::Duration;
use tokio::time;
use rand::Rng;  // For demo purposes

// Define schema (required in Rust)
let temp_event = source
    .build_event("temperature")
    .add_f64_field("value", Some("°C".to_string()))
    .build()?;

let mut interval = time::interval(Duration::from_secs(1));
let mut rng = rand::thread_rng();

loop {
    interval.tick().await;

    // Read your sensor (simulated here)
    let temperature = 20.0 + rng.gen_range(-2.0..2.0);

    temp_event.build()
        .try_insert_f64("value", temperature)?
        .emit()?;
}
import (
    "math/rand"
    "time"
)

// Define schema (required in Go)
unitC := "°C"
tempEvent, _ := source.BuildEvent("temperature").
    AddFloat64Field("value", &unitC).
    Build()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for range ticker.C {
    // Read your sensor (simulated here)
    temperature := 20.0 + rand.Float64()*4 - 2

    builder, _ := tempEvent.Build().TryInsertFloat64("value", temperature)
    builder.Emit()
}

High-Frequency Streaming

For rates above 100 Hz, use precise timing:

import time
import math
import zelos_sdk

zelos_sdk.init()
source = zelos_sdk.TraceSource("high_freq")

# Define schema for better performance
data_event = source.add_event("data", [
    zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64, "V")
])

# 1 kHz = 1ms period
period = 0.001
start_time = time.time()
next_time = start_time

while True:
    current_time = time.time() - start_time

    # Generate signal (example: 100 Hz sine wave)
    value = math.sin(2 * math.pi * 100 * current_time)

    # Log data
    data_event.log(value=value)

    # Precise sleep to maintain rate
    next_time += period
    sleep_duration = next_time - time.time()
    if sleep_duration > 0:
        time.sleep(sleep_duration)
use tokio::time::{interval, MissedTickBehavior};
use std::time::{Duration, Instant};

let data_event = source
    .build_event("data")
    .add_f64_field("value", Some("V".to_string()))
    .build()?;

// 1 kHz = 1ms period
let mut interval = interval(Duration::from_millis(1));
// Skip missed ticks instead of bursting
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

let start = Instant::now();

loop {
    interval.tick().await;

    let elapsed = start.elapsed().as_secs_f64();
    // Generate signal (example: 100 Hz sine wave)
    let value = (2.0 * std::f64::consts::PI * 100.0 * elapsed).sin();

    data_event.build()
        .try_insert_f64("value", value)?
        .emit()?;
}
import (
    "math"
    "time"
)

unitVolt := "V"
dataEvent, _ := source.BuildEvent("data").
    AddFloat64Field("value", &unitVolt).
    Build()

// 1 kHz = 1ms period
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

start := time.Now()

for range ticker.C {
    elapsed := time.Since(start).Seconds()
    // Generate signal (example: 100 Hz sine wave)
    value := math.Sin(2 * math.Pi * 100 * elapsed)

    builder, _ := dataEvent.Build().TryInsertFloat64("value", value)
    builder.Emit()
}

Burst Streaming

Stream data in bursts when events occur:

import time
import zelos_sdk

zelos_sdk.init()
source = zelos_sdk.TraceSource("events")

# Define event schemas
source.add_event("event_start", [
    zelos_sdk.TraceEventFieldMetadata("trigger", zelos_sdk.DataType.String),
    zelos_sdk.TraceEventFieldMetadata("timestamp_ms", zelos_sdk.DataType.Int64)
])

source.add_event("event_sample", [
    zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64),
    zelos_sdk.TraceEventFieldMetadata("index", zelos_sdk.DataType.UInt32)
])

source.add_event("event_end", [
    zelos_sdk.TraceEventFieldMetadata("duration_ms", zelos_sdk.DataType.Float64),
    zelos_sdk.TraceEventFieldMetadata("sample_count", zelos_sdk.DataType.UInt32)
])

def handle_burst_event(trigger_name: str, samples: list):
    """Log a burst of data when an event occurs"""
    start_time = time.time()

    # Log event start
    source.log("event_start", {
        "trigger": trigger_name,
        "timestamp_ms": int(start_time * 1000)
    })

    # Log burst of samples
    for i, value in enumerate(samples):
        source.log("event_sample", {
            "value": value,
            "index": i
        })

    # Log event end
    duration_ms = (time.time() - start_time) * 1000
    source.log("event_end", {
        "duration_ms": duration_ms,
        "sample_count": len(samples)
    })

# Example usage
while True:
    # Wait for trigger condition
    time.sleep(5)

    # Generate burst data (simulated)
    burst_data = [i * 0.1 for i in range(100)]
    handle_burst_event("threshold_exceeded", burst_data)
use std::time::{Duration, Instant};
use tokio::time;

// Define event schemas
let start_event = source
    .build_event("event_start")
    .add_string_field("trigger", None)
    .add_i64_field("timestamp_ms", None)
    .build()?;

let sample_event = source
    .build_event("event_sample")
    .add_f64_field("value", None)
    .add_u32_field("index", None)
    .build()?;

let end_event = source
    .build_event("event_end")
    .add_f64_field("duration_ms", None)
    .add_u32_field("sample_count", None)
    .build()?;

async fn handle_burst_event(
    trigger_name: &str,
    samples: Vec<f64>,
    start_event: &TraceEvent,
    sample_event: &TraceEvent,
    end_event: &TraceEvent,
) -> Result<()> {
    let start_time = Instant::now();

    // Log event start
    start_event.build()
        .try_insert_string("trigger", trigger_name.to_string())?
        .try_insert_i64("timestamp_ms", start_time.elapsed().as_millis() as i64)?
        .emit()?;

    // Log burst of samples
    for (i, value) in samples.iter().enumerate() {
        sample_event.build()
            .try_insert_f64("value", *value)?
            .try_insert_u32("index", i as u32)?
            .emit()?;
    }

    // Log event end
    let duration_ms = start_time.elapsed().as_secs_f64() * 1000.0;
    end_event.build()
        .try_insert_f64("duration_ms", duration_ms)?
        .try_insert_u32("sample_count", samples.len() as u32)?
        .emit()?;

    Ok(())
}

// Example usage
loop {
    // Wait for trigger condition
    time::sleep(Duration::from_secs(5)).await;

    // Generate burst data (simulated)
    let burst_data: Vec<f64> = (0..100).map(|i| i as f64 * 0.1).collect();
    handle_burst_event("threshold_exceeded", burst_data, &start_event, &sample_event, &end_event).await?;
}
import (
    "time"
)

// Define event schemas
startEvent, _ := source.BuildEvent("event_start").
    AddStringField("trigger", nil).
    AddInt64Field("timestamp_ms", nil).
    Build()

sampleEvent, _ := source.BuildEvent("event_sample").
    AddFloat64Field("value", nil).
    AddUint32Field("index", nil).
    Build()

endEvent, _ := source.BuildEvent("event_end").
    AddFloat64Field("duration_ms", nil).
    AddUint32Field("sample_count", nil).
    Build()

func handleBurstEvent(triggerName string, samples []float64) error {
    startTime := time.Now()

    // Log event start
    builder, _ := startEvent.Build().TryInsertString("trigger", triggerName)
    builder, _ = builder.TryInsertInt64("timestamp_ms", startTime.UnixMilli())
    if err := builder.Emit(); err != nil {
        return err
    }

    // Log burst of samples
    for i, value := range samples {
        b, _ := sampleEvent.Build().TryInsertFloat64("value", value)
        b, _ = b.TryInsertUint32("index", uint32(i))
        if err := b.Emit(); err != nil {
            return err
        }
    }

    // Log event end
    durationMs := float64(time.Since(startTime).Milliseconds())
    builder, _ = endEvent.Build().TryInsertFloat64("duration_ms", durationMs)
    builder, _ = builder.TryInsertUint32("sample_count", uint32(len(samples)))
    return builder.Emit()
}

// Example usage
for {
    // Wait for trigger condition
    time.Sleep(5 * time.Second)

    // Generate burst data (simulated)
    burstData := make([]float64, 100)
    for i := range burstData {
        burstData[i] = float64(i) * 0.1
    }

    if err := handleBurstEvent("threshold_exceeded", burstData); err != nil {
        log.Printf("Error handling burst: %v", err)
    }
}

Multiple Sources in Parallel

Stream from different components at different rates:

import threading
import time
import random
import zelos_sdk

zelos_sdk.init()

def stream_motor():
    """100 Hz motor telemetry"""
    source = zelos_sdk.TraceSource("motor")

    # Define schema
    telemetry = source.add_event("telemetry", [
        zelos_sdk.TraceEventFieldMetadata("rpm", zelos_sdk.DataType.Float64, "rpm"),
        zelos_sdk.TraceEventFieldMetadata("torque", zelos_sdk.DataType.Float64, "Nm")
    ])

    while True:
        telemetry.log(
            rpm=2000 + random.uniform(-100, 100),
            torque=50 + random.uniform(-5, 5)
        )
        time.sleep(0.01)  # 100 Hz

def stream_battery():
    """1 Hz battery status"""
    source = zelos_sdk.TraceSource("battery")

    # Define schema
    status = source.add_event("status", [
        zelos_sdk.TraceEventFieldMetadata("voltage", zelos_sdk.DataType.Float64, "V"),
        zelos_sdk.TraceEventFieldMetadata("current", zelos_sdk.DataType.Float64, "A"),
        zelos_sdk.TraceEventFieldMetadata("soc", zelos_sdk.DataType.Float64, "%")
    ])

    soc = 85.0  # State of charge

    while True:
        # Simulate battery discharge
        soc = max(20.0, soc - 0.1)

        status.log(
            voltage=48.0 + random.uniform(-0.5, 0.5),
            current=random.uniform(-10, 50),
            soc=soc
        )
        time.sleep(1.0)  # 1 Hz

def stream_gps():
    """10 Hz GPS updates"""
    source = zelos_sdk.TraceSource("gps")

    # Define schema
    position = source.add_event("position", [
        zelos_sdk.TraceEventFieldMetadata("lat", zelos_sdk.DataType.Float64, "deg"),
        zelos_sdk.TraceEventFieldMetadata("lon", zelos_sdk.DataType.Float64, "deg"),
        zelos_sdk.TraceEventFieldMetadata("alt", zelos_sdk.DataType.Float64, "m")
    ])

    # Base coordinates (Palo Alto, CA)
    base_lat, base_lon = 37.4419, -122.1430

    while True:
        position.log(
            lat=base_lat + random.uniform(-0.001, 0.001),
            lon=base_lon + random.uniform(-0.001, 0.001),
            alt=30 + random.uniform(-1, 1)
        )
        time.sleep(0.1)  # 10 Hz

# Run all streams in parallel
threads = [
    threading.Thread(target=stream_motor, daemon=True),
    threading.Thread(target=stream_battery, daemon=True),
    threading.Thread(target=stream_gps, daemon=True)
]

for t in threads:
    t.start()

# Keep main thread alive
try:
    for t in threads:
        t.join()
except KeyboardInterrupt:
    print("Stopping streams...")
use tokio;
use rand::Rng;
use std::time::Duration;

async fn stream_motor(source: TraceSource) -> Result<()> {
    let telemetry = source
        .build_event("telemetry")
        .add_f64_field("rpm", Some("rpm".to_string()))
        .add_f64_field("torque", Some("Nm".to_string()))
        .build()?;

    let mut interval = tokio::time::interval(Duration::from_millis(10)); // 100 Hz
    let mut rng = rand::thread_rng();

    loop {
        interval.tick().await;

        telemetry.build()
            .try_insert_f64("rpm", 2000.0 + rng.gen_range(-100.0..100.0))?
            .try_insert_f64("torque", 50.0 + rng.gen_range(-5.0..5.0))?
            .emit()?;
    }
}

async fn stream_battery(source: TraceSource) -> Result<()> {
    let status = source
        .build_event("status")
        .add_f64_field("voltage", Some("V".to_string()))
        .add_f64_field("current", Some("A".to_string()))
        .add_f64_field("soc", Some("%".to_string()))
        .build()?;

    let mut interval = tokio::time::interval(Duration::from_secs(1)); // 1 Hz
    let mut rng = rand::thread_rng();
    let mut soc = 85.0;

    loop {
        interval.tick().await;

        // Simulate battery discharge
        soc = (soc - 0.1).max(20.0);

        status.build()
            .try_insert_f64("voltage", 48.0 + rng.gen_range(-0.5..0.5))?
            .try_insert_f64("current", rng.gen_range(-10.0..50.0))?
            .try_insert_f64("soc", soc)?
            .emit()?;
    }
}

async fn stream_gps(source: TraceSource) -> Result<()> {
    let position = source
        .build_event("position")
        .add_f64_field("lat", Some("deg".to_string()))
        .add_f64_field("lon", Some("deg".to_string()))
        .add_f64_field("alt", Some("m".to_string()))
        .build()?;

    let mut interval = tokio::time::interval(Duration::from_millis(100)); // 10 Hz
    let mut rng = rand::thread_rng();

    // Base coordinates (Palo Alto, CA)
    let base_lat = 37.4419;
    let base_lon = -122.1430;

    loop {
        interval.tick().await;

        position.build()
            .try_insert_f64("lat", base_lat + rng.gen_range(-0.001..0.001))?
            .try_insert_f64("lon", base_lon + rng.gen_range(-0.001..0.001))?
            .try_insert_f64("alt", 30.0 + rng.gen_range(-1.0..1.0))?
            .emit()?;
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Setup router and client (see quickstart for full setup)
    // ...

    // Create sources
    let motor = TraceSource::new("motor", router.sender());
    let battery = TraceSource::new("battery", router.sender());
    let gps = TraceSource::new("gps", router.sender());

    // Run all streams in parallel
    tokio::select! {
        res = stream_motor(motor) => res?,
        res = stream_battery(battery) => res?,
        res = stream_gps(gps) => res?,
    }
}
import (
    "math"
    "math/rand"
    "sync"
    "time"
)

func streamMotor(source *zelos.TraceSource, wg *sync.WaitGroup) {
    defer wg.Done()

    unitRpm := "rpm"
    unitNm := "Nm"
    telemetry, _ := source.BuildEvent("telemetry").
        AddFloat64Field("rpm", &unitRpm).
        AddFloat64Field("torque", &unitNm).
        Build()

    ticker := time.NewTicker(10 * time.Millisecond) // 100 Hz
    defer ticker.Stop()

    for range ticker.C {
        builder, _ := telemetry.Build().
            TryInsertFloat64("rpm", 2000+rand.Float64()*200-100)
        builder, _ = builder.
            TryInsertFloat64("torque", 50+rand.Float64()*10-5)
        builder.Emit()
    }
}

func streamBattery(source *zelos.TraceSource, wg *sync.WaitGroup) {
    defer wg.Done()

    unitV := "V"
    unitA := "A"
    unitPct := "%"
    status, _ := source.BuildEvent("status").
        AddFloat64Field("voltage", &unitV).
        AddFloat64Field("current", &unitA).
        AddFloat64Field("soc", &unitPct).
        Build()

    ticker := time.NewTicker(time.Second) // 1 Hz
    defer ticker.Stop()

    soc := 85.0

    for range ticker.C {
        // Simulate battery discharge
        soc = math.Max(20.0, soc-0.1)

        builder, _ := status.Build().
            TryInsertFloat64("voltage", 48.0+rand.Float64()-0.5)
        builder, _ = builder.
            TryInsertFloat64("current", rand.Float64()*60-10)
        builder, _ = builder.
            TryInsertFloat64("soc", soc)
        builder.Emit()
    }
}

func streamGPS(source *zelos.TraceSource, wg *sync.WaitGroup) {
    defer wg.Done()

    unitDeg := "deg"
    unitM := "m"
    position, _ := source.BuildEvent("position").
        AddFloat64Field("lat", &unitDeg).
        AddFloat64Field("lon", &unitDeg).
        AddFloat64Field("alt", &unitM).
        Build()

    ticker := time.NewTicker(100 * time.Millisecond) // 10 Hz
    defer ticker.Stop()

    // Base coordinates (Palo Alto, CA)
    baseLat := 37.4419
    baseLon := -122.1430

    for range ticker.C {
        builder, _ := position.Build().
            TryInsertFloat64("lat", baseLat+rand.Float64()*0.002-0.001)
        builder, _ = builder.
            TryInsertFloat64("lon", baseLon+rand.Float64()*0.002-0.001)
        builder, _ = builder.
            TryInsertFloat64("alt", 30+rand.Float64()*2-1)
        builder.Emit()
    }
}

func main() {
    // Setup router and client (see quickstart for full setup)
    // ...

    // Create sources
    motor, _ := zelos.NewTraceSource("motor", sender)
    battery, _ := zelos.NewTraceSource("battery", sender)
    gps, _ := zelos.NewTraceSource("gps", sender)
    defer motor.Close()
    defer battery.Close()
    defer gps.Close()

    // Run all streams in parallel
    var wg sync.WaitGroup
    wg.Add(3)

    go streamMotor(motor, &wg)
    go streamBattery(battery, &wg)
    go streamGPS(gps, &wg)

    wg.Wait()
}

Stream with Custom Timestamps

Use specific timestamps for replay or synchronization:

import time
import zelos_sdk

zelos_sdk.init()
source = zelos_sdk.TraceSource("replay")

# Define schema
measurement = source.add_event("measurement", [
    zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64)
])

# Method 1: Current timestamp (automatic)
measurement.log(value=1.0)

# Method 2: Specific timestamp using log_at
specific_time_ns = 1699564234567890123
measurement.log_at(specific_time_ns, value=2.0)

# Method 3: Calculated timestamp
past_time_ns = time.time_ns() - (60 * 1_000_000_000)  # 1 minute ago
measurement.log_at(past_time_ns, value=3.0)

# Method 4: Synchronized timestamps
def stream_synchronized(sync_offset_ns: int):
    """Stream with synchronized timestamps"""
    while True:
        # Calculate synchronized timestamp
        timestamp_ns = time.time_ns() + sync_offset_ns

        # Read sensor value
        value = read_sensor()  # Your sensor reading function

        # Log with specific timestamp
        measurement.log_at(timestamp_ns, value=value)

        time.sleep(0.1)

# Method 5: Replay historical data
def replay_historical_data(records: list):
    """Replay data with original timestamps"""
    for record in records:
        # Use the original timestamp from the record
        measurement.log_at(
            record['timestamp_ns'],
            value=record['value']
        )

        # Optional: add delay to simulate real-time replay
        time.sleep(0.001)
use zelos_trace::time::now_time_ns;
use std::time::Duration;
use tokio::time;

let measurement = source
    .build_event("measurement")
    .add_f64_field("value", None)
    .build()?;

// Method 1: Current timestamp (automatic)
measurement.build()
    .try_insert_f64("value", 1.0)?
    .emit()?;

// Method 2: Specific timestamp using emit_at
let specific_time_ns = 1699564234567890123i64;
measurement.build()
    .try_insert_f64("value", 2.0)?
    .emit_at(specific_time_ns)?;

// Method 3: Calculated timestamp
let past_time_ns = now_time_ns() - (60 * 1_000_000_000);
measurement.build()
    .try_insert_f64("value", 3.0)?
    .emit_at(past_time_ns)?;

// Method 4: Synchronized timestamps
async fn stream_synchronized(
    sync_offset_ns: i64,
    measurement: &TraceEvent
) -> Result<()> {
    let mut interval = time::interval(Duration::from_millis(100));

    loop {
        interval.tick().await;

        // Calculate synchronized timestamp
        let timestamp_ns = now_time_ns() + sync_offset_ns;

        // Read sensor value
        let value = read_sensor(); // Your sensor reading function

        // Log with specific timestamp
        measurement.build()
            .try_insert_f64("value", value)?
            .emit_at(timestamp_ns)?;
    }
}

// Method 5: Replay historical data
struct Record {
    timestamp_ns: i64,
    value: f64,
}

async fn replay_historical_data(
    records: Vec<Record>,
    measurement: &TraceEvent
) -> Result<()> {
    for record in records {
        // Use the original timestamp from the record
        measurement.build()
            .try_insert_f64("value", record.value)?
            .emit_at(record.timestamp_ns)?;

        // Optional: add delay to simulate real-time replay
        time::sleep(Duration::from_millis(1)).await;
    }
    Ok(())
}
import (
    "time"
)

measurement, _ := source.BuildEvent("measurement").
    AddFloat64Field("value", nil).
    Build()

// Method 1: Current timestamp (automatic)
builder, _ := measurement.Build().TryInsertFloat64("value", 1.0)
builder.Emit()

// Method 2: Specific timestamp using EmitAt
specificTimeNs := int64(1699564234567890123)
builder, _ = measurement.Build().TryInsertFloat64("value", 2.0)
builder.EmitAt(specificTimeNs)

// Method 3: Calculated timestamp
pastTimeNs := time.Now().Add(-1 * time.Minute).UnixNano()
builder, _ = measurement.Build().TryInsertFloat64("value", 3.0)
builder.EmitAt(pastTimeNs)

// Method 4: Synchronized timestamps
func streamSynchronized(syncOffsetNs int64, measurement *zelos.TraceSourceEvent) {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for range ticker.C {
        // Calculate synchronized timestamp
        timestampNs := time.Now().UnixNano() + syncOffsetNs

        // Read sensor value
        value := readSensor() // Your sensor reading function

        // Log with specific timestamp
        builder, _ := measurement.Build().TryInsertFloat64("value", value)
        builder.EmitAt(timestampNs)
    }
}

// Method 5: Replay historical data
type Record struct {
    TimestampNs int64
    Value       float64
}

func replayHistoricalData(records []Record, measurement *zelos.TraceSourceEvent) error {
    for _, record := range records {
        // Use the original timestamp from the record
        builder, _ := measurement.Build().
            TryInsertFloat64("value", record.Value)
        if err := builder.EmitAt(record.TimestampNs); err != nil {
            return err
        }

        // Optional: add delay to simulate real-time replay
        time.Sleep(time.Millisecond)
    }
    return nil
}

Performance Tips

1. Define Schemas Upfront

Defining schemas improves performance and provides type safety:

# Good: Schema defined once, reused many times
event = source.add_event("data", [
    zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64)
])
for i in range(1000000):
    event.log(value=i * 0.1)

# Less efficient: Dynamic typing on every call
for i in range(1000000):
    source.log("data", {"value": i * 0.1})

2. Use Appropriate Data Types

Choose the smallest data type that fits your needs:

# Use Int8 for small values (-128 to 127)
source.add_event("status", [
    zelos_sdk.TraceEventFieldMetadata("state", zelos_sdk.DataType.Int8)
])

# Use Float32 when Float64 precision isn't needed
source.add_event("sensor", [
    zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C")
])

Log related fields together to reduce overhead:

# Good: Single event with multiple fields
source.log("imu", {
    "accel_x": ax, "accel_y": ay, "accel_z": az,
    "gyro_x": gx, "gyro_y": gy, "gyro_z": gz
})

# Less efficient: Multiple events
source.log("accel", {"x": ax, "y": ay, "z": az})
source.log("gyro", {"x": gx, "y": gy, "z": gz})

4. Control Timing Precisely

For high-frequency streaming, use precise timing methods shown in the examples above rather than simple sleep() calls.

Common Patterns

State Machine Monitoring

state_event = source.add_event("state", [
    zelos_sdk.TraceEventFieldMetadata("current", zelos_sdk.DataType.UInt8),
    zelos_sdk.TraceEventFieldMetadata("previous", zelos_sdk.DataType.UInt8),
    zelos_sdk.TraceEventFieldMetadata("transition_time_ms", zelos_sdk.DataType.Float64)
])

# Add value table for readable state names
source.add_value_table("state", "current", {
    0: "IDLE", 1: "INIT", 2: "RUNNING", 3: "ERROR"
})
source.add_value_table("state", "previous", {
    0: "IDLE", 1: "INIT", 2: "RUNNING", 3: "ERROR"
})

Sensor Array Streaming

# Define schema for array of sensors
sensor_array = source.add_event("array", [
    zelos_sdk.TraceEventFieldMetadata(f"sensor_{i}", zelos_sdk.DataType.Float32)
    for i in range(16)
])

# Log all sensors with single timestamp
sensor_array.log(**{f"sensor_{i}": values[i] for i in range(16)})