Quick Start¶
Get your first data streaming to Zelos.
Prerequisites¶
- Zelos App or Agent running (default:
localhost:2300
) - Python 3.10+, Rust 1.70+, or Go 1.21+
Step 1: Install¶
Step 2: Stream Data¶
Create a simple program that streams sinusoidal voltage and current:
stream.py
import math
import time
import zelos_sdk
# Connect to local agent (auto-reconnects on failure)
zelos_sdk.init()
# Create a source for our power data
source = zelos_sdk.TraceSource("power_monitor")
# Stream at 1 kHz
print("Streaming power data... Press Ctrl+C to stop")
start_time = time.time()
while True:
t = time.time() - start_time
# Generate test signals
voltage = 12.0 + 0.5 * math.sin(2 * math.pi * 50 * t)
current = 2.0 + 0.2 * math.sin(2 * math.pi * 50 * t + math.pi/4)
# Log as a single event (all fields get same timestamp)
source.log("measurements", {
"voltage": voltage,
"current": current,
"power": voltage * current
})
time.sleep(0.001) # 1ms = 1kHz
main.rs
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use zelos::{TraceRouter, TraceSource};
use zelos::trace_grpc::publish::{TracePublishClient, TracePublishClientConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup infrastructure
let cancel = CancellationToken::new();
let (router, router_task) = TraceRouter::new(cancel.clone());
tokio::spawn(router_task);
// Connect to agent
let config = TracePublishClientConfig::default(); // localhost:2300
let (client, client_task) = TracePublishClient::new(router.clone(), config);
tokio::spawn(client_task);
// Wait for connection
client.wait_until_connected(Duration::from_secs(5)).await?;
println!("Connected to agent");
// Create source and define event schema
let source = TraceSource::new("power_monitor", router.sender());
let event = source
.build_event("measurements")
.add_f64_field("voltage", Some("V".to_string()))
.add_f64_field("current", Some("A".to_string()))
.add_f64_field("power", Some("W".to_string()))
.build()?;
// Stream at 1 kHz
println!("Streaming power data... Press Ctrl+C to stop");
let start = Instant::now();
loop {
let t = start.elapsed().as_secs_f64();
// Generate test signals
let voltage = 12.0 + 0.5 * (2.0 * std::f64::consts::PI * 50.0 * t).sin();
let current = 2.0 + 0.2 * (2.0 * std::f64::consts::PI * 50.0 * t + std::f64::consts::FRAC_PI_4).sin();
// Log as single event
event.build()
.try_insert_f64("voltage", voltage)?
.try_insert_f64("current", current)?
.try_insert_f64("power", voltage * current)?
.emit()?;
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
main.go
package main
import (
"context"
"fmt"
"log"
"math"
"time"
zelos "github.com/zeloscloud/zelos/go"
)
func main() {
ctx := context.Background()
// Setup infrastructure
router, sender, receiver := zelos.NewTraceRouter(ctx)
// Connect to agent
config := zelos.DefaultTracePublishClientConfig() // localhost:2300
client := zelos.NewTracePublishClient(ctx, receiver, config)
go func() {
if err := client.Run(); err != nil {
log.Printf("Client error: %v", err)
}
}()
// Wait for connection
if err := client.WaitUntilConnected(5 * time.Second); err != nil {
log.Fatal(err)
}
fmt.Println("Connected to agent")
// Create source and define event schema
source, err := zelos.NewTraceSource("power_monitor", sender)
if err != nil {
log.Fatal(err)
}
defer source.Close()
unitV := "V"
unitA := "A"
unitW := "W"
event, err := source.BuildEvent("measurements").
AddFloat64Field("voltage", &unitV).
AddFloat64Field("current", &unitA).
AddFloat64Field("power", &unitW).
Build()
if err != nil {
log.Fatal(err)
}
// Stream at 1 kHz
fmt.Println("Streaming power data... Press Ctrl+C to stop")
start := time.Now()
for {
t := time.Since(start).Seconds()
// Generate test signals
voltage := 12.0 + 0.5*math.Sin(2*math.Pi*50*t)
current := 2.0 + 0.2*math.Sin(2*math.Pi*50*t+math.Pi/4)
// Log as single event
builder, _ := event.Build().TryInsertFloat64("voltage", voltage)
builder, _ = builder.TryInsertFloat64("current", current)
builder, _ = builder.TryInsertFloat64("power", voltage*current)
if err := builder.Emit(); err != nil {
log.Printf("Emit failed: %v", err)
}
time.Sleep(time.Millisecond)
}
}
Step 3: View Your Data¶
- Run your program in a terminal
- Open Zelos App and navigate to the Signals view
-
See your data streaming in real-time:
-
power_monitor/measurements.voltage
power_monitor/measurements.current
power_monitor/measurements.power
Connection Options¶
Remote Agent¶
Key Concepts¶
Sources¶
A source represents a logical component (e.g., "motor_controller", "battery_monitor"). Create multiple sources to organize your data.
Events¶
An event is a collection of related fields logged together with the same timestamp (e.g., voltage + current + power).
Signals¶
A signal is an individual data stream, identified by the path source/event.field
(e.g., power_monitor/measurements.voltage
).
Common Patterns¶
Define Schema Explicitly¶
Defining your event schema provides type safety and units:
# Define schema with types and units
event = source.add_event("telemetry", [
zelos_sdk.TraceEventFieldMetadata("speed", zelos_sdk.DataType.Float64, "m/s"),
zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C"),
zelos_sdk.TraceEventFieldMetadata("status", zelos_sdk.DataType.String)
])
# Type-safe logging
event.log(speed=15.5, temperature=72.3, status="nominal")
Multiple Components¶
Organize data from different subsystems:
# Create sources for each component
motor = zelos_sdk.TraceSource("motor")
battery = zelos_sdk.TraceSource("battery")
sensors = zelos_sdk.TraceSource("sensors")
# Log to appropriate source
motor.log("control", {"rpm": 2500, "torque": 35.5})
battery.log("status", {"voltage": 48.2, "soc": 85})
sensors.log("imu", {"accel_x": 0.1, "gyro_z": 0.02})
Error Handling¶
The SDK handles transient errors automatically:
- Auto-reconnect: Reconnects to agent if connection drops
- Buffering: Queues data during disconnection (up to buffer limit)
- Backpressure: Slows down if agent can't keep up
You should handle initialization errors: