How to Filter Events¶
Python Only
The following APIs are only available in the python API
Reduce data volume and improve performance by filtering events before they're sent.
Quick Start¶
Enable smart filtering with one line:
import zelos_sdk
zelos_sdk.init()
# Use TraceSourceCacheLast for filtering
source = zelos_sdk.TraceSourceCacheLast("filtered")
# Enable default filtering: log on change or every 1 second
source.set_default_log_condition()
# Now only logs when values change
source.log("sensor", {"temperature": 25.0}) # Logs
source.log("sensor", {"temperature": 25.0}) # Skipped (no change)
source.log("sensor", {"temperature": 25.1}) # Logs (changed)
Condition Types¶
ValueLogCondition¶
Logs when the value changes:
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSourceCacheLast("state")
# Only log when state changes
event = source.add_event("machine", [
zelos_sdk.TraceEventFieldMetadata("mode", zelos_sdk.DataType.String)
], conditions={
"mode": zelos_sdk.ValueLogCondition()
})
# Usage
source.log("machine", {"mode": "IDLE"}) # Logs (first value)
source.log("machine", {"mode": "IDLE"}) # Skipped (same)
source.log("machine", {"mode": "RUNNING"}) # Logs (changed)
EpsilonLogCondition¶
Logs when numeric values change by more than epsilon:
# Only log if temperature changes by >0.5°C
event = source.add_event("sensor", [
zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float64, "°C")
], conditions={
"temperature": zelos_sdk.EpsilonLogCondition(epsilon=0.5)
})
# Example
source.log("sensor", {"temperature": 25.0}) # Logs (first)
source.log("sensor", {"temperature": 25.3}) # Skipped (Δ=0.3 < 0.5)
source.log("sensor", {"temperature": 25.6}) # Logs (Δ=0.6 > 0.5)
DeltaLogCondition¶
Prevents drift by comparing to last logged value:
# Log if position changes by >0.0001° (~11 meters)
event = source.add_event("gps", [
zelos_sdk.TraceEventFieldMetadata("latitude", zelos_sdk.DataType.Float64, "deg")
], conditions={
"latitude": zelos_sdk.DeltaLogCondition(delta=0.0001)
})
# Prevents drift accumulation
# Last logged: 37.0000
source.log("gps", {"latitude": 37.00005}) # Skipped (Δ=0.00005)
source.log("gps", {"latitude": 37.00008}) # Skipped (Δ=0.00008)
source.log("gps", {"latitude": 37.00011}) # Logs! (Δ=0.00011)
# New reference: 37.00011
TimeLogCondition¶
Logs at fixed intervals regardless of value:
# Log every 30 seconds
event = source.add_event("heartbeat", [
zelos_sdk.TraceEventFieldMetadata("uptime", zelos_sdk.DataType.Float64, "s")
], conditions={
"uptime": zelos_sdk.TimeLogCondition(time_threshold_s=30.0)
})
Composite Conditions¶
Combine conditions with OR logic:
# Log on change OR timeout (heartbeat)
zelos_sdk.ValueOrTimeLogCondition(time_threshold_s=60.0)
# Log if change > epsilon OR timeout
zelos_sdk.EpsilonOrTimeLogCondition(epsilon=0.5, time_threshold_s=30.0)
# Log if change > delta OR timeout
zelos_sdk.DeltaOrTimeLogCondition(delta=1.0, time_threshold_s=10.0)
Common Patterns¶
High-Frequency Sensor Filtering¶
Reduce data from noisy sensors:
import time
import random
import math
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSourceCacheLast("vibration")
# Aggressive filtering for high-rate sensor
source.set_default_log_condition(
zelos_sdk.DeltaOrTimeLogCondition(
delta=0.1, # Log if change >0.1g
time_threshold_s=1.0 # Heartbeat every second
)
)
event = source.add_event("accel", [
zelos_sdk.TraceEventFieldMetadata("x", zelos_sdk.DataType.Float32, "g"),
zelos_sdk.TraceEventFieldMetadata("y", zelos_sdk.DataType.Float32, "g"),
zelos_sdk.TraceEventFieldMetadata("z", zelos_sdk.DataType.Float32, "g")
])
# Simulate 1kHz sensor with noise
samples_sent = 0
samples_logged = 0
start_time = time.time()
while time.time() - start_time < 10: # Run for 10 seconds
# Simulate vibration with noise
t = time.time() - start_time
x = 0.5 * math.sin(2 * math.pi * 10 * t) + random.gauss(0, 0.05)
y = 0.3 * math.cos(2 * math.pi * 10 * t) + random.gauss(0, 0.05)
z = 9.8 + random.gauss(0, 0.05)
source.log("accel", {"x": x, "y": y, "z": z})
samples_sent += 1
time.sleep(0.001) # 1kHz
print(f"Samples generated: {samples_sent}")
print(f"Samples logged: ~{samples_logged}")
print(f"Reduction: {(1 - samples_logged/samples_sent)*100:.1f}%")
State Machine Monitoring¶
Log only state transitions:
from enum import IntEnum
import random
import time
import zelos_sdk
class State(IntEnum):
IDLE = 0
RUNNING = 1
ERROR = 2
zelos_sdk.init()
source = zelos_sdk.TraceSourceCacheLast("machine")
# Log state changes and periodic heartbeat
status = source.add_event("status", [
zelos_sdk.TraceEventFieldMetadata("state", zelos_sdk.DataType.UInt8),
zelos_sdk.TraceEventFieldMetadata("cycles", zelos_sdk.DataType.UInt32),
zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C")
], conditions={
"state": zelos_sdk.ValueLogCondition(), # Any state change
"cycles": zelos_sdk.ValueLogCondition(), # Every cycle
"temperature": zelos_sdk.EpsilonOrTimeLogCondition(
epsilon=2.0, # Log if temp changes >2°C
time_threshold_s=60.0 # Or every minute
)
})
# Add readable state names
source.add_value_table("status", "state", {
0: "IDLE", 1: "RUNNING", 2: "ERROR"
})
# Simulate machine
current_state = State.IDLE
cycles = 0
temp = 25.0
for i in range(1000):
# Occasional state changes
if random.random() < 0.01:
current_state = random.choice(list(State))
if current_state == State.RUNNING:
cycles += 1
# Temperature fluctuates slowly
temp += random.uniform(-0.3, 0.3)
temp = max(20, min(80, temp)) # Clamp to range
# Log (filtered by conditions)
source.log("status", {
"state": current_state.value,
"cycles": cycles,
"temperature": temp
})
time.sleep(0.1)
print(f"Final state: {current_state.name}, Cycles: {cycles}, Temp: {temp:.1f}°C")
Battery Monitoring¶
Different sensitivities for different parameters:
import random
import time
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSourceCacheLast("battery")
# Each parameter has appropriate filtering
battery = source.add_event("status", [
zelos_sdk.TraceEventFieldMetadata("voltage", zelos_sdk.DataType.Float32, "V"),
zelos_sdk.TraceEventFieldMetadata("current", zelos_sdk.DataType.Float32, "A"),
zelos_sdk.TraceEventFieldMetadata("soc", zelos_sdk.DataType.Float32, "%"),
zelos_sdk.TraceEventFieldMetadata("temperature", zelos_sdk.DataType.Float32, "°C"),
zelos_sdk.TraceEventFieldMetadata("charging", zelos_sdk.DataType.Boolean)
], conditions={
# Voltage: sensitive (0.1V changes)
"voltage": zelos_sdk.EpsilonOrTimeLogCondition(epsilon=0.1, time_threshold_s=10.0),
# Current: less sensitive (1A changes)
"current": zelos_sdk.DeltaOrTimeLogCondition(delta=1.0, time_threshold_s=10.0),
# SOC: 1% changes
"soc": zelos_sdk.DeltaLogCondition(delta=1.0),
# Temperature: 0.5°C changes or every minute
"temperature": zelos_sdk.EpsilonOrTimeLogCondition(epsilon=0.5, time_threshold_s=60.0),
# Charging: state changes only
"charging": zelos_sdk.ValueLogCondition()
})
# Simulate battery
voltage = 12.6
current = 0.0
soc = 100.0
temp = 25.0
charging = False
for _ in range(100):
# Simulate discharge/charge
if charging:
current = random.uniform(1, 5) # Charging current
soc = min(100, soc + 0.1)
voltage = 12.0 + (soc / 100) * 2.6
if soc >= 100:
charging = False
else:
current = random.uniform(-10, -1) # Discharge current
soc = max(0, soc - 0.05)
voltage = 11.0 + (soc / 100) * 2.6
if soc <= 20:
charging = True
# Add noise
voltage += random.gauss(0, 0.05)
current += random.gauss(0, 0.2)
temp += random.uniform(-0.1, 0.1)
# Log (filtered)
source.log("status", {
"voltage": voltage,
"current": current,
"soc": soc,
"temperature": temp,
"charging": charging
})
time.sleep(0.1)
print(f"Final SOC: {soc:.1f}%, Charging: {charging}")
Adaptive Filtering¶
Adjust filtering based on conditions:
import zelos_sdk
class AdaptiveMonitor:
"""Switch between normal and high-precision modes"""
def __init__(self):
zelos_sdk.init()
self.source = zelos_sdk.TraceSourceCacheLast("adaptive")
self.set_precision(high=False)
# Define schema once
self.event = self.source.add_event("data", [
zelos_sdk.TraceEventFieldMetadata("value", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("quality", zelos_sdk.DataType.Float32)
])
def set_precision(self, high=False):
"""Toggle between filtering modes"""
if high:
# Minimal filtering for critical operations
condition = zelos_sdk.EpsilonOrTimeLogCondition(
epsilon=0.01,
time_threshold_s=0.1
)
print("High precision mode: minimal filtering")
else:
# Aggressive filtering for normal operation
condition = zelos_sdk.DeltaOrTimeLogCondition(
delta=1.0,
time_threshold_s=10.0
)
print("Normal mode: aggressive filtering")
self.source.set_default_log_condition(condition)
def log(self, value, quality):
"""Log data with current filter settings"""
self.source.log("data", {
"value": value,
"quality": quality
})
# Usage
monitor = AdaptiveMonitor()
# Normal operation
monitor.set_precision(high=False)
for i in range(100):
monitor.log(value=i * 0.1, quality=0.95)
# Critical measurement
monitor.set_precision(high=True)
for i in range(100):
monitor.log(value=50 + i * 0.01, quality=0.99)
# Back to normal
monitor.set_precision(high=False)
Using Cached Values¶
Access previously logged values:
import random
import zelos_sdk
zelos_sdk.init()
source = zelos_sdk.TraceSourceCacheLast("control")
# Define PID controller data
pid = source.add_event("pid", [
zelos_sdk.TraceEventFieldMetadata("setpoint", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("measured", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("error", zelos_sdk.DataType.Float64),
zelos_sdk.TraceEventFieldMetadata("output", zelos_sdk.DataType.Float64)
])
# Simple PID simulation
kp, ki, kd = 0.5, 0.1, 0.05
integral = 0.0
setpoint = 50.0
measured = 30.0
for _ in range(100):
# Calculate error
error = setpoint - measured
# Access previous error for derivative
prev_error = source.pid.error.get() if source.pid.error.get() is not None else 0.0
# PID calculation
integral += error * 0.01 # dt = 0.01
derivative = (error - prev_error) / 0.01
output = kp * error + ki * integral + kd * derivative
# Log current values
source.log("pid", {
"setpoint": setpoint,
"measured": measured,
"error": error,
"output": output
})
# Simulate system response
measured += output * 0.1 # System gain
measured += random.gauss(0, 0.5) # Noise
time.sleep(0.01)
print(f"Final: Setpoint={setpoint:.1f}, Measured={measured:.1f}, Error={error:.1f}")
Performance Comparison¶
Scenario | Without Filtering | With Filtering | Reduction |
---|---|---|---|
1kHz noisy sensor | 1,000 events/s | ~10 events/s | 99% |
State machine | Every cycle | State changes only | 90% |
Battery monitor | 10 Hz all params | Variable per param | 80% |
GPS tracking | Every reading | Significant movement | 95% |
Best Practices¶
1. Match Conditions to Data¶
# Appropriate conditions
conditions = {
"discrete_state": zelos_sdk.ValueLogCondition(), # Discrete
"noisy_sensor": zelos_sdk.EpsilonLogCondition(0.5), # Noisy
"slow_drift": zelos_sdk.DeltaLogCondition(1.0), # Drift
"counter": zelos_sdk.ValueLogCondition() # Every change
}
2. Always Include Heartbeats¶
# Ensure liveness
condition = zelos_sdk.DeltaOrTimeLogCondition(
delta=1.0,
time_threshold_s=30.0 # Max 30s between events
)
3. Document Filter Behavior¶
# Clear documentation
event = source.add_event("temp", [...], conditions={
# Logs if temperature changes >0.5°C from last logged value
# Also sends heartbeat every 60s regardless
"value": zelos_sdk.EpsilonOrTimeLogCondition(
epsilon=0.5,
time_threshold_s=60.0
)
})
4. Test Filter Effectiveness¶
# Monitor reduction ratio
def test_filtering():
events_generated = 0
events_before = source._trace_source._event_count
for data in test_data:
source.log("event", data)
events_generated += 1
events_after = source._trace_source._event_count
events_logged = events_after - events_before
print(f"Generated: {events_generated}")
print(f"Logged: {events_logged}")
print(f"Reduction: {(1 - events_logged/events_generated)*100:.1f}%")