How to Stream Data¶
Learn how to stream data from your application to Zelos at various rates and patterns.
Basic Streaming¶
The simplest way to stream data is to log events in a loop:
import time
import random # For demo purposes
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSource("sensors")
# Define schema (optional but recommended)
temperature_event = source.add_event("temperature", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64, "°C")
])
while True:
# Read your sensor (simulated here)
temperature = 20.0 + random.uniform(-2, 2)
# Log to Zelos
temperature_event.log(value=temperature)
# Control rate
time.sleep(1.0) # 1 Hz
use std::time::Duration;
use tokio::time;
use rand::Rng; // For demo purposes
// Define schema (required in Rust)
let temp_event = source
.build_event("temperature")
.add_f64_field("value", Some("°C".to_string()))
.build()?;
let mut interval = time::interval(Duration::from_secs(1));
let mut rng = rand::thread_rng();
loop {
interval.tick().await;
// Read your sensor (simulated here)
let temperature = 20.0 + rng.gen_range(-2.0..2.0);
temp_event.build()
.try_insert_f64("value", temperature)?
.emit()?;
}
import (
"math/rand"
"time"
)
// Define schema (required in Go)
unitC := "°C"
tempEvent, _ := source.BuildEvent("temperature").
AddFloat64Field("value", &unitC).
Build()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
// Read your sensor (simulated here)
temperature := 20.0 + rand.Float64()*4 - 2
builder, _ := tempEvent.Build().TryInsertFloat64("value", temperature)
builder.Emit()
}
High-Frequency Streaming¶
For rates above 100 Hz, use precise timing:
import time
import math
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSource("high_freq")
# Define schema for better performance
data_event = source.add_event("data", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64, "V")
])
# 1 kHz = 1ms period
period = 0.001
start_time = time.time()
next_time = start_time
while True:
current_time = time.time() - start_time
# Generate signal (example: 100 Hz sine wave)
value = math.sin(2 * math.pi * 100 * current_time)
# Log data
data_event.log(value=value)
# Precise sleep to maintain rate
next_time += period
sleep_duration = next_time - time.time()
if sleep_duration > 0:
time.sleep(sleep_duration)
use tokio::time::{interval, MissedTickBehavior};
use std::time::{Duration, Instant};
let data_event = source
.build_event("data")
.add_f64_field("value", Some("V".to_string()))
.build()?;
// 1 kHz = 1ms period
let mut interval = interval(Duration::from_millis(1));
// Skip missed ticks instead of bursting
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let start = Instant::now();
loop {
interval.tick().await;
let elapsed = start.elapsed().as_secs_f64();
// Generate signal (example: 100 Hz sine wave)
let value = (2.0 * std::f64::consts::PI * 100.0 * elapsed).sin();
data_event.build()
.try_insert_f64("value", value)?
.emit()?;
}
import (
"math"
"time"
)
unitVolt := "V"
dataEvent, _ := source.BuildEvent("data").
AddFloat64Field("value", &unitVolt).
Build()
// 1 kHz = 1ms period
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
start := time.Now()
for range ticker.C {
elapsed := time.Since(start).Seconds()
// Generate signal (example: 100 Hz sine wave)
value := math.Sin(2 * math.Pi * 100 * elapsed)
builder, _ := dataEvent.Build().TryInsertFloat64("value", value)
builder.Emit()
}
Burst Streaming¶
Stream data in bursts when events occur:
import time
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSource("events")
# Define event schemas
source.add_event("event_start", [
zelos_sdk.TraceEventFieldMetadata("trigger", zelos_sdk.DataType.String),
zelos_sdk.TraceEventFieldMetadata("timestamp_ms", zelos_sdk.DataType.Int64)
])
source.add_event("event_sample", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("index", zelos_sdk.DataType.UInt32)
])
source.add_event("event_end", [
zelos_sdk.TraceEventFieldMetadata("duration_ms", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("sample_count", zelos_sdk.DataType.UInt32)
])
def handle_burst_event(trigger_name: str, samples: list):
"""Log a burst of data when an event occurs"""
start_time = time.time()
# Log event start
source.log("event_start", {
"trigger": trigger_name,
"timestamp_ms": int(start_time * 1000)
})
# Log burst of samples
for i, value in enumerate(samples):
source.log("event_sample", {
"value": value,
"index": i
})
# Log event end
duration_ms = (time.time() - start_time) * 1000
source.log("event_end", {
"duration_ms": duration_ms,
"sample_count": len(samples)
})
# Example usage
while True:
# Wait for trigger condition
time.sleep(5)
# Generate burst data (simulated)
burst_data = [i * 0.1 for i in range(100)]
handle_burst_event("threshold_exceeded", burst_data)
use std::time::{Duration, Instant};
use tokio::time;
// Define event schemas
let start_event = source
.build_event("event_start")
.add_string_field("trigger", None)
.add_i64_field("timestamp_ms", None)
.build()?;
let sample_event = source
.build_event("event_sample")
.add_f64_field("value", None)
.add_u32_field("index", None)
.build()?;
let end_event = source
.build_event("event_end")
.add_f64_field("duration_ms", None)
.add_u32_field("sample_count", None)
.build()?;
async fn handle_burst_event(
trigger_name: &str,
samples: Vec<f64>,
start_event: &TraceEvent,
sample_event: &TraceEvent,
end_event: &TraceEvent,
) -> Result<()> {
let start_time = Instant::now();
// Log event start
start_event.build()
.try_insert_string("trigger", trigger_name.to_string())?
.try_insert_i64("timestamp_ms", start_time.elapsed().as_millis() as i64)?
.emit()?;
// Log burst of samples
for (i, value) in samples.iter().enumerate() {
sample_event.build()
.try_insert_f64("value", *value)?
.try_insert_u32("index", i as u32)?
.emit()?;
}
// Log event end
let duration_ms = start_time.elapsed().as_secs_f64() * 1000.0;
end_event.build()
.try_insert_f64("duration_ms", duration_ms)?
.try_insert_u32("sample_count", samples.len() as u32)?
.emit()?;
Ok(())
}
// Example usage
loop {
// Wait for trigger condition
time::sleep(Duration::from_secs(5)).await;
// Generate burst data (simulated)
let burst_data: Vec<f64> = (0..100).map(|i| i as f64 * 0.1).collect();
handle_burst_event("threshold_exceeded", burst_data, &start_event, &sample_event, &end_event).await?;
}
import (
"time"
)
// Define event schemas
startEvent, _ := source.BuildEvent("event_start").
AddStringField("trigger", nil).
AddInt64Field("timestamp_ms", nil).
Build()
sampleEvent, _ := source.BuildEvent("event_sample").
AddFloat64Field("value", nil).
AddUint32Field("index", nil).
Build()
endEvent, _ := source.BuildEvent("event_end").
AddFloat64Field("duration_ms", nil).
AddUint32Field("sample_count", nil).
Build()
func handleBurstEvent(triggerName string, samples []float64) error {
startTime := time.Now()
// Log event start
builder, _ := startEvent.Build().TryInsertString("trigger", triggerName)
builder, _ = builder.TryInsertInt64("timestamp_ms", startTime.UnixMilli())
if err := builder.Emit(); err != nil {
return err
}
// Log burst of samples
for i, value := range samples {
b, _ := sampleEvent.Build().TryInsertFloat64("value", value)
b, _ = b.TryInsertUint32("index", uint32(i))
if err := b.Emit(); err != nil {
return err
}
}
// Log event end
durationMs := float64(time.Since(startTime).Milliseconds())
builder, _ = endEvent.Build().TryInsertFloat64("duration_ms", durationMs)
builder, _ = builder.TryInsertUint32("sample_count", uint32(len(samples)))
return builder.Emit()
}
// Example usage
for {
// Wait for trigger condition
time.Sleep(5 * time.Second)
// Generate burst data (simulated)
burstData := make([]float64, 100)
for i := range burstData {
burstData[i] = float64(i) * 0.1
}
if err := handleBurstEvent("threshold_exceeded", burstData); err != nil {
log.Printf("Error handling burst: %v", err)
}
}
Multiple Sources in Parallel¶
Stream from different components at different rates:
import threading
import time
import random
import zelos_sdk
zelos_sdk.init()
def stream_motor():
"""100 Hz motor telemetry"""
source = zelos_sdk.TraceSource("motor")
# Define schema
telemetry = source.add_event("telemetry", [
zelos_sdk.TraceEventFieldMetadata("rpm", zelos_sdk.DataType.Float64, "rpm"),
zelos_sdk.TraceEventFieldMetadata("torque", zelos_sdk.DataType.Float64, "Nm")
])
while True:
telemetry.log(
rpm=2000 + random.uniform(-100, 100),
torque=50 + random.uniform(-5, 5)
)
time.sleep(0.01) # 100 Hz
def stream_battery():
"""1 Hz battery status"""
source = zelos_sdk.TraceSource("battery")
# Define schema
status = source.add_event("status", [
zelos_sdk.TraceEventFieldMetadata("voltage", zelos_sdk.DataType.Float64, "V"),
zelos_sdk.TraceEventFieldMetadata("current", zelos_sdk.DataType.Float64, "A"),
zelos_sdk.TraceEventFieldMetadata("soc", zelos_sdk.DataType.Float64, "%")
])
soc = 85.0 # State of charge
while True:
# Simulate battery discharge
soc = max(20.0, soc - 0.1)
status.log(
voltage=48.0 + random.uniform(-0.5, 0.5),
current=random.uniform(-10, 50),
soc=soc
)
time.sleep(1.0) # 1 Hz
def stream_gps():
"""10 Hz GPS updates"""
source = zelos_sdk.TraceSource("gps")
# Define schema
position = source.add_event("position", [
zelos_sdk.TraceEventFieldMetadata("lat", zelos_sdk.DataType.Float64, "deg"),
zelos_sdk.TraceEventFieldMetadata("lon", zelos_sdk.DataType.Float64, "deg"),
zelos_sdk.TraceEventFieldMetadata("alt", zelos_sdk.DataType.Float64, "m")
])
# Base coordinates (Palo Alto, CA)
base_lat, base_lon = 37.4419, -122.1430
while True:
position.log(
lat=base_lat + random.uniform(-0.001, 0.001),
lon=base_lon + random.uniform(-0.001, 0.001),
alt=30 + random.uniform(-1, 1)
)
time.sleep(0.1) # 10 Hz
# Run all streams in parallel
threads = [
threading.Thread(target=stream_motor, daemon=True),
threading.Thread(target=stream_battery, daemon=True),
threading.Thread(target=stream_gps, daemon=True)
]
for t in threads:
t.start()
# Keep main thread alive
try:
for t in threads:
t.join()
except KeyboardInterrupt:
print("Stopping streams...")
use tokio;
use rand::Rng;
use std::time::Duration;
async fn stream_motor(source: TraceSource) -> Result<()> {
let telemetry = source
.build_event("telemetry")
.add_f64_field("rpm", Some("rpm".to_string()))
.add_f64_field("torque", Some("Nm".to_string()))
.build()?;
let mut interval = tokio::time::interval(Duration::from_millis(10)); // 100 Hz
let mut rng = rand::thread_rng();
loop {
interval.tick().await;
telemetry.build()
.try_insert_f64("rpm", 2000.0 + rng.gen_range(-100.0..100.0))?
.try_insert_f64("torque", 50.0 + rng.gen_range(-5.0..5.0))?
.emit()?;
}
}
async fn stream_battery(source: TraceSource) -> Result<()> {
let status = source
.build_event("status")
.add_f64_field("voltage", Some("V".to_string()))
.add_f64_field("current", Some("A".to_string()))
.add_f64_field("soc", Some("%".to_string()))
.build()?;
let mut interval = tokio::time::interval(Duration::from_secs(1)); // 1 Hz
let mut rng = rand::thread_rng();
let mut soc = 85.0;
loop {
interval.tick().await;
// Simulate battery discharge
soc = (soc - 0.1).max(20.0);
status.build()
.try_insert_f64("voltage", 48.0 + rng.gen_range(-0.5..0.5))?
.try_insert_f64("current", rng.gen_range(-10.0..50.0))?
.try_insert_f64("soc", soc)?
.emit()?;
}
}
async fn stream_gps(source: TraceSource) -> Result<()> {
let position = source
.build_event("position")
.add_f64_field("lat", Some("deg".to_string()))
.add_f64_field("lon", Some("deg".to_string()))
.add_f64_field("alt", Some("m".to_string()))
.build()?;
let mut interval = tokio::time::interval(Duration::from_millis(100)); // 10 Hz
let mut rng = rand::thread_rng();
// Base coordinates (Palo Alto, CA)
let base_lat = 37.4419;
let base_lon = -122.1430;
loop {
interval.tick().await;
position.build()
.try_insert_f64("lat", base_lat + rng.gen_range(-0.001..0.001))?
.try_insert_f64("lon", base_lon + rng.gen_range(-0.001..0.001))?
.try_insert_f64("alt", 30.0 + rng.gen_range(-1.0..1.0))?
.emit()?;
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Setup router and client (see quickstart for full setup)
// ...
// Create sources
let motor = TraceSource::new("motor", router.sender());
let battery = TraceSource::new("battery", router.sender());
let gps = TraceSource::new("gps", router.sender());
// Run all streams in parallel
tokio::select! {
res = stream_motor(motor) => res?,
res = stream_battery(battery) => res?,
res = stream_gps(gps) => res?,
}
}
import (
"math"
"math/rand"
"sync"
"time"
)
func streamMotor(source *zelos.TraceSource, wg *sync.WaitGroup) {
defer wg.Done()
unitRpm := "rpm"
unitNm := "Nm"
telemetry, _ := source.BuildEvent("telemetry").
AddFloat64Field("rpm", &unitRpm).
AddFloat64Field("torque", &unitNm).
Build()
ticker := time.NewTicker(10 * time.Millisecond) // 100 Hz
defer ticker.Stop()
for range ticker.C {
builder, _ := telemetry.Build().
TryInsertFloat64("rpm", 2000+rand.Float64()*200-100)
builder, _ = builder.
TryInsertFloat64("torque", 50+rand.Float64()*10-5)
builder.Emit()
}
}
func streamBattery(source *zelos.TraceSource, wg *sync.WaitGroup) {
defer wg.Done()
unitV := "V"
unitA := "A"
unitPct := "%"
status, _ := source.BuildEvent("status").
AddFloat64Field("voltage", &unitV).
AddFloat64Field("current", &unitA).
AddFloat64Field("soc", &unitPct).
Build()
ticker := time.NewTicker(time.Second) // 1 Hz
defer ticker.Stop()
soc := 85.0
for range ticker.C {
// Simulate battery discharge
soc = math.Max(20.0, soc-0.1)
builder, _ := status.Build().
TryInsertFloat64("voltage", 48.0+rand.Float64()-0.5)
builder, _ = builder.
TryInsertFloat64("current", rand.Float64()*60-10)
builder, _ = builder.
TryInsertFloat64("soc", soc)
builder.Emit()
}
}
func streamGPS(source *zelos.TraceSource, wg *sync.WaitGroup) {
defer wg.Done()
unitDeg := "deg"
unitM := "m"
position, _ := source.BuildEvent("position").
AddFloat64Field("lat", &unitDeg).
AddFloat64Field("lon", &unitDeg).
AddFloat64Field("alt", &unitM).
Build()
ticker := time.NewTicker(100 * time.Millisecond) // 10 Hz
defer ticker.Stop()
// Base coordinates (Palo Alto, CA)
baseLat := 37.4419
baseLon := -122.1430
for range ticker.C {
builder, _ := position.Build().
TryInsertFloat64("lat", baseLat+rand.Float64()*0.002-0.001)
builder, _ = builder.
TryInsertFloat64("lon", baseLon+rand.Float64()*0.002-0.001)
builder, _ = builder.
TryInsertFloat64("alt", 30+rand.Float64()*2-1)
builder.Emit()
}
}
func main() {
// Setup router and client (see quickstart for full setup)
// ...
// Create sources
motor, _ := zelos.NewTraceSource("motor", sender)
battery, _ := zelos.NewTraceSource("battery", sender)
gps, _ := zelos.NewTraceSource("gps", sender)
defer motor.Close()
defer battery.Close()
defer gps.Close()
// Run all streams in parallel
var wg sync.WaitGroup
wg.Add(3)
go streamMotor(motor, &wg)
go streamBattery(battery, &wg)
go streamGPS(gps, &wg)
wg.Wait()
}
Stream with Custom Timestamps¶
Use specific timestamps for replay or synchronization:
import time
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSource("replay")
# Define schema
measurement = source.add_event("measurement", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64)
])
# Method 1: Current timestamp (automatic)
measurement.log(value=1.0)
# Method 2: Specific timestamp using log_at
specific_time_ns = 1699564234567890123
measurement.log_at(specific_time_ns, value=2.0)
# Method 3: Calculated timestamp
past_time_ns = time.time_ns() - (60 * 1_000_000_000) # 1 minute ago
measurement.log_at(past_time_ns, value=3.0)
# Method 4: Synchronized timestamps
def stream_synchronized(sync_offset_ns: int):
"""Stream with synchronized timestamps"""
while True:
# Calculate synchronized timestamp
timestamp_ns = time.time_ns() + sync_offset_ns
# Read sensor value
value = read_sensor() # Your sensor reading function
# Log with specific timestamp
measurement.log_at(timestamp_ns, value=value)
time.sleep(0.1)
# Method 5: Replay historical data
def replay_historical_data(records: list):
"""Replay data with original timestamps"""
for record in records:
# Use the original timestamp from the record
measurement.log_at(
record['timestamp_ns'],
value=record['value']
)
# Optional: add delay to simulate real-time replay
time.sleep(0.001)
use zelos_trace::time::now_time_ns;
use std::time::Duration;
use tokio::time;
let measurement = source
.build_event("measurement")
.add_f64_field("value", None)
.build()?;
// Method 1: Current timestamp (automatic)
measurement.build()
.try_insert_f64("value", 1.0)?
.emit()?;
// Method 2: Specific timestamp using emit_at
let specific_time_ns = 1699564234567890123i64;
measurement.build()
.try_insert_f64("value", 2.0)?
.emit_at(specific_time_ns)?;
// Method 3: Calculated timestamp
let past_time_ns = now_time_ns() - (60 * 1_000_000_000);
measurement.build()
.try_insert_f64("value", 3.0)?
.emit_at(past_time_ns)?;
// Method 4: Synchronized timestamps
async fn stream_synchronized(
sync_offset_ns: i64,
measurement: &TraceEvent
) -> Result<()> {
let mut interval = time::interval(Duration::from_millis(100));
loop {
interval.tick().await;
// Calculate synchronized timestamp
let timestamp_ns = now_time_ns() + sync_offset_ns;
// Read sensor value
let value = read_sensor(); // Your sensor reading function
// Log with specific timestamp
measurement.build()
.try_insert_f64("value", value)?
.emit_at(timestamp_ns)?;
}
}
// Method 5: Replay historical data
struct Record {
timestamp_ns: i64,
value: f64,
}
async fn replay_historical_data(
records: Vec<Record>,
measurement: &TraceEvent
) -> Result<()> {
for record in records {
// Use the original timestamp from the record
measurement.build()
.try_insert_f64("value", record.value)?
.emit_at(record.timestamp_ns)?;
// Optional: add delay to simulate real-time replay
time::sleep(Duration::from_millis(1)).await;
}
Ok(())
}
import (
"time"
)
measurement, _ := source.BuildEvent("measurement").
AddFloat64Field("value", nil).
Build()
// Method 1: Current timestamp (automatic)
builder, _ := measurement.Build().TryInsertFloat64("value", 1.0)
builder.Emit()
// Method 2: Specific timestamp using EmitAt
specificTimeNs := int64(1699564234567890123)
builder, _ = measurement.Build().TryInsertFloat64("value", 2.0)
builder.EmitAt(specificTimeNs)
// Method 3: Calculated timestamp
pastTimeNs := time.Now().Add(-1 * time.Minute).UnixNano()
builder, _ = measurement.Build().TryInsertFloat64("value", 3.0)
builder.EmitAt(pastTimeNs)
// Method 4: Synchronized timestamps
func streamSynchronized(syncOffsetNs int64, measurement *zelos.TraceSourceEvent) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
// Calculate synchronized timestamp
timestampNs := time.Now().UnixNano() + syncOffsetNs
// Read sensor value
value := readSensor() // Your sensor reading function
// Log with specific timestamp
builder, _ := measurement.Build().TryInsertFloat64("value", value)
builder.EmitAt(timestampNs)
}
}
// Method 5: Replay historical data
type Record struct {
TimestampNs int64
Value float64
}
func replayHistoricalData(records []Record, measurement *zelos.TraceSourceEvent) error {
for _, record := range records {
// Use the original timestamp from the record
builder, _ := measurement.Build().
TryInsertFloat64("value", record.Value)
if err := builder.EmitAt(record.TimestampNs); err != nil {
return err
}
// Optional: add delay to simulate real-time replay
time.Sleep(time.Millisecond)
}
return nil
}
Performance Tips¶
1. Define Schemas Upfront¶
Defining schemas improves performance and provides type safety:
# Good: Schema defined once, reused many times
event = source.add_event("data", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64)
])
for i in range(1000000):
event.log(value=i * 0.1)
# Less efficient: Dynamic typing on every call
for i in range(1000000):
source.log("data", {"value": i * 0.1})
2. Use Appropriate Data Types¶
Choose the smallest data type that fits your needs:
# Use Int8 for small values (-128 to 127)
source.add_event("status", [
zelos_sdk.TraceEventFieldMetadata("state", zelos_sdk.DataType.Int8)
])
# Use Float32 when Float64 precision isn't needed
source.add_event("sensor", [
zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C")
])
3. Batch Related Fields¶
Log related fields together to reduce overhead:
# Good: Single event with multiple fields
source.log("imu", {
"accel_x": ax, "accel_y": ay, "accel_z": az,
"gyro_x": gx, "gyro_y": gy, "gyro_z": gz
})
# Less efficient: Multiple events
source.log("accel", {"x": ax, "y": ay, "z": az})
source.log("gyro", {"x": gx, "y": gy, "z": gz})
4. Control Timing Precisely¶
For high-frequency streaming, use precise timing methods shown in the examples above rather than simple sleep()
calls.
Common Patterns¶
State Machine Monitoring¶
state_event = source.add_event("state", [
zelos_sdk.TraceEventFieldMetadata("current", zelos_sdk.DataType.UInt8),
zelos_sdk.TraceEventFieldMetadata("previous", zelos_sdk.DataType.UInt8),
zelos_sdk.TraceEventFieldMetadata("transition_time_ms", zelos_sdk.DataType.Float64)
])
# Add value table for readable state names
source.add_value_table("state", "current", {
0: "IDLE", 1: "INIT", 2: "RUNNING", 3: "ERROR"
})
source.add_value_table("state", "previous", {
0: "IDLE", 1: "INIT", 2: "RUNNING", 3: "ERROR"
})