Skip to content

Quick Start

Get your first data streaming to Zelos.

Prerequisites

  • Zelos App or Agent running (default: localhost:2300)
  • Python 3.10+, Rust 1.70+, or Go 1.21+

Step 1: Install

pip install zelos-sdk
cargo add zelos
go get github.com/zeloscloud/zelos/go@latest

Step 2: Stream Data

Create a simple program that streams sinusoidal voltage and current:

stream.py
import math
import time
import zelos_sdk

# Connect to local agent (auto-reconnects on failure)
zelos_sdk.init()

# Create a source for our power data
source = zelos_sdk.TraceSource("power_monitor")

# Stream at 1 kHz
print("Streaming power data... Press Ctrl+C to stop")
start_time = time.time()

while True:
    t = time.time() - start_time

    # Generate test signals
    voltage = 12.0 + 0.5 * math.sin(2 * math.pi * 50 * t)
    current = 2.0 + 0.2 * math.sin(2 * math.pi * 50 * t + math.pi/4)

    # Log as a single event (all fields get same timestamp)
    source.log("measurements", {
        "voltage": voltage,
        "current": current,
        "power": voltage * current
    })

    time.sleep(0.001)  # 1ms = 1kHz
main.rs
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use zelos::{TraceRouter, TraceSource};
use zelos::trace_grpc::publish::{TracePublishClient, TracePublishClientConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Setup infrastructure
    let cancel = CancellationToken::new();
    let (router, router_task) = TraceRouter::new(cancel.clone());
    tokio::spawn(router_task);

    // Connect to agent
    let config = TracePublishClientConfig::default(); // localhost:2300
    let (client, client_task) = TracePublishClient::new(router.clone(), config);
    tokio::spawn(client_task);

    // Wait for connection
    client.wait_until_connected(Duration::from_secs(5)).await?;
    println!("Connected to agent");

    // Create source and define event schema
    let source = TraceSource::new("power_monitor", router.sender());
    let event = source
        .build_event("measurements")
        .add_f64_field("voltage", Some("V".to_string()))
        .add_f64_field("current", Some("A".to_string()))
        .add_f64_field("power", Some("W".to_string()))
        .build()?;

    // Stream at 1 kHz
    println!("Streaming power data... Press Ctrl+C to stop");
    let start = Instant::now();

    loop {
        let t = start.elapsed().as_secs_f64();

        // Generate test signals
        let voltage = 12.0 + 0.5 * (2.0 * std::f64::consts::PI * 50.0 * t).sin();
        let current = 2.0 + 0.2 * (2.0 * std::f64::consts::PI * 50.0 * t + std::f64::consts::FRAC_PI_4).sin();

        // Log as single event
        event.build()
            .try_insert_f64("voltage", voltage)?
            .try_insert_f64("current", current)?
            .try_insert_f64("power", voltage * current)?
            .emit()?;

        tokio::time::sleep(Duration::from_millis(1)).await;
    }
}
main.go
package main

import (
    "context"
    "fmt"
    "log"
    "math"
    "time"

    zelos "github.com/zeloscloud/zelos/go"
)

func main() {
    ctx := context.Background()

    // Setup infrastructure
    router, sender, receiver := zelos.NewTraceRouter(ctx)

    // Connect to agent
    config := zelos.DefaultTracePublishClientConfig() // localhost:2300
    client := zelos.NewTracePublishClient(ctx, receiver, config)
    go func() {
        if err := client.Run(); err != nil {
            log.Printf("Client error: %v", err)
        }
    }()

    // Wait for connection
    if err := client.WaitUntilConnected(5 * time.Second); err != nil {
        log.Fatal(err)
    }
    fmt.Println("Connected to agent")

    // Create source and define event schema
    source, err := zelos.NewTraceSource("power_monitor", sender)
    if err != nil {
        log.Fatal(err)
    }
    defer source.Close()

    unitV := "V"
    unitA := "A"
    unitW := "W"
    event, err := source.BuildEvent("measurements").
        AddFloat64Field("voltage", &unitV).
        AddFloat64Field("current", &unitA).
        AddFloat64Field("power", &unitW).
        Build()
    if err != nil {
        log.Fatal(err)
    }

    // Stream at 1 kHz
    fmt.Println("Streaming power data... Press Ctrl+C to stop")
    start := time.Now()

    for {
        t := time.Since(start).Seconds()

        // Generate test signals
        voltage := 12.0 + 0.5*math.Sin(2*math.Pi*50*t)
        current := 2.0 + 0.2*math.Sin(2*math.Pi*50*t+math.Pi/4)

        // Log as single event
        builder, _ := event.Build().TryInsertFloat64("voltage", voltage)
        builder, _ = builder.TryInsertFloat64("current", current)
        builder, _ = builder.TryInsertFloat64("power", voltage*current)

        if err := builder.Emit(); err != nil {
            log.Printf("Emit failed: %v", err)
        }

        time.Sleep(time.Millisecond)
    }
}

Step 3: View Your Data

  1. Run your program in a terminal
  2. Open Zelos App and navigate to the Signals view
  3. See your data streaming in real-time:

  4. power_monitor/measurements.voltage

  5. power_monitor/measurements.current
  6. power_monitor/measurements.power

Connection Options

Remote Agent

# Via init parameter
zelos_sdk.init(url="grpc://192.168.1.100:2300")

# Via environment variable
# export ZELOS_URL=grpc://192.168.1.100:2300
zelos_sdk.init()  # Uses ZELOS_URL
// Via config
let config = TracePublishClientConfig {
    url: "grpc://192.168.1.100:2300".to_string(),
    ..Default::default()
};

// Via environment variable
let url = std::env::var("ZELOS_URL")
    .unwrap_or_else(|_| "grpc://127.0.0.1:2300".to_string());
// Via config
config := zelos.DefaultTracePublishClientConfig()
config.URL = "grpc://192.168.1.100:2300"

// Via environment variable
url := os.Getenv("ZELOS_URL")
if url == "" {
    url = "grpc://127.0.0.1:2300"
}

Key Concepts

Sources

A source represents a logical component (e.g., "motor_controller", "battery_monitor"). Create multiple sources to organize your data.

Events

An event is a collection of related fields logged together with the same timestamp (e.g., voltage + current + power).

Signals

A signal is an individual data stream, identified by the path source/event.field (e.g., power_monitor/measurements.voltage).

Common Patterns

Define Schema Explicitly

Defining your event schema provides type safety and units:

# Define schema with types and units
event = source.add_event("telemetry", [
    zelos_sdk.TraceEventFieldMetadata("speed", zelos_sdk.DataType.Float64, "m/s"),
    zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C"),
    zelos_sdk.TraceEventFieldMetadata("status", zelos_sdk.DataType.String)
])

# Type-safe logging
event.log(speed=15.5, temperature=72.3, status="nominal")
let event = source
    .build_event("telemetry")
    .add_f64_field("speed", Some("m/s".to_string()))
    .add_f32_field("temperature", Some("°C".to_string()))
    .add_string_field("status", None)
    .build()?;
unitMs := "m/s"
unitC := "°C"
event, _ := source.BuildEvent("telemetry").
    AddFloat64Field("speed", &unitMs).
    AddFloat32Field("temperature", &unitC).
    AddStringField("status", nil).
    Build()

Multiple Components

Organize data from different subsystems:

# Create sources for each component
motor = zelos_sdk.TraceSource("motor")
battery = zelos_sdk.TraceSource("battery")
sensors = zelos_sdk.TraceSource("sensors")

# Log to appropriate source
motor.log("control", {"rpm": 2500, "torque": 35.5})
battery.log("status", {"voltage": 48.2, "soc": 85})
sensors.log("imu", {"accel_x": 0.1, "gyro_z": 0.02})
let motor = TraceSource::new("motor", router.sender());
let battery = TraceSource::new("battery", router.sender());
let sensors = TraceSource::new("sensors", router.sender());
motor, _ := zelos.NewTraceSource("motor", sender)
battery, _ := zelos.NewTraceSource("battery", sender)
sensors, _ := zelos.NewTraceSource("sensors", sender)

Error Handling

The SDK handles transient errors automatically:

  • Auto-reconnect: Reconnects to agent if connection drops
  • Buffering: Queues data during disconnection (up to buffer limit)
  • Backpressure: Slows down if agent can't keep up

You should handle initialization errors:

try:
    zelos_sdk.init()
except Exception as e:
    print(f"Failed to initialize Zelos: {e}")
    # Decide whether to continue without tracing
// Connection errors are handled in the spawned task
// Use ? operator for other errors
let event = source.build_event("data")
    .add_f64_field("value", None)
    .build()?;  // Propagate error
if err := client.WaitUntilConnected(5 * time.Second); err != nil {
    log.Printf("Warning: Could not connect to agent: %v", err)
    // Decide whether to continue
}

Next Steps