How to Record to Files¶
Python Only
The following APIs are only available in the python API
Record trace data to .trz
files for offline analysis, debugging, or archival.
Basic Recording¶
Use TraceWriter
to capture all events to a file:
import zelos_sdk
import random
zelos_sdk.init()
source = zelos_sdk.TraceSource("experiment")
# Record to file using context manager
with zelos_sdk.TraceWriter("output.trz") as writer:
# All events are automatically captured
for i in range(1000):
source.log("measurement", {
"index": i,
"value": random.random() * 100,
"valid": True
})
print("Data saved to output.trz")
Recording Methods¶
Context Manager (Recommended)¶
with zelos_sdk.TraceWriter("data.trz") as writer:
source.log("event", {"value": 123})
# File automatically closed
Manual Control¶
writer = zelos_sdk.TraceWriter("data.trz")
try:
writer.open()
source.log("event", {"value": 123})
finally:
writer.close() # Always close!
Common Patterns¶
Timestamped Filenames¶
import datetime
import zelos_sdk
def record_session(duration_seconds=60):
"""Record with timestamp in filename"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"session_{timestamp}.trz"
zelos_sdk.init()
source = zelos_sdk.TraceSource("session")
with zelos_sdk.TraceWriter(filename) as writer:
start = time.time()
count = 0
while time.time() - start < duration_seconds:
source.log("data", {"value": random.random()})
count += 1
time.sleep(0.01)
print(f"Recorded {count} events to {filename}")
return filename
Conditional Recording¶
Start/stop recording based on conditions:
import zelos_sdk
import time
import random
class ConditionalRecorder:
"""Record only when threshold exceeded"""
def __init__(self, threshold=80.0):
zelos_sdk.init()
self.source = zelos_sdk.TraceSource("monitor")
self.threshold = threshold
self.writer = None
self.recording = False
def update(self, value):
"""Check value and manage recording"""
# Always log for real-time viewing
self.source.log("measurement", {"value": value})
# Start recording if threshold exceeded
if value > self.threshold and not self.recording:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.writer = zelos_sdk.TraceWriter(f"event_{timestamp}.trz")
self.writer.open()
self.recording = True
self.source.log("trigger", {
"threshold": self.threshold,
"value": value
})
print(f"Started recording: value {value} > {self.threshold}")
# Stop recording when value returns to normal
elif value < self.threshold * 0.8 and self.recording:
self.source.log("recovered", {"value": value})
self.writer.close()
self.recording = False
print(f"Stopped recording: value {value} back to normal")
def close(self):
"""Ensure file is closed"""
if self.recording and self.writer:
self.writer.close()
# Usage
recorder = ConditionalRecorder(threshold=75.0)
try:
for _ in range(1000):
# Simulate sensor with occasional spikes
value = 50 + random.gauss(0, 10)
if random.random() < 0.01: # 1% chance of spike
value += 40
recorder.update(value)
time.sleep(0.1)
finally:
recorder.close()
Ring Buffer Recording¶
Keep a rolling buffer and save on trigger:
import collections
import time
import random
import zelos_sdk
class RingBufferRecorder:
"""Continuously buffer data, save on trigger"""
def __init__(self, buffer_seconds=30):
zelos_sdk.init()
self.source = zelos_sdk.TraceSource("buffer")
self.buffer_seconds = buffer_seconds
self.buffer = collections.deque()
def add_sample(self, data):
"""Add to ring buffer"""
timestamp = time.time()
# Add new sample
self.buffer.append((timestamp, data))
# Remove old samples
cutoff = timestamp - self.buffer_seconds
while self.buffer and self.buffer[0][0] < cutoff:
self.buffer.popleft()
# Always log for real-time
self.source.log("sample", data)
return len(self.buffer)
def save_buffer(self, filename=None):
"""Save buffer contents to file"""
if not filename:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"buffer_{timestamp}.trz"
with zelos_sdk.TraceWriter(filename) as writer:
# Log metadata
self.source.log("buffer_saved", {
"samples": len(self.buffer),
"duration": self.buffer_seconds if self.buffer else 0
})
# Replay buffer with original timestamps
for timestamp, data in self.buffer:
timestamp_ns = int(timestamp * 1e9)
self.source.log_at(timestamp_ns, "sample", data)
print(f"Saved {len(self.buffer)} samples to {filename}")
return filename
# Usage
buffer = RingBufferRecorder(buffer_seconds=10)
for i in range(500):
data = {
"index": i,
"value": random.random() * 100,
"status": "OK" if random.random() > 0.1 else "ERROR"
}
buffer.add_sample(data)
# Save buffer on error
if data["status"] == "ERROR":
buffer.save_buffer()
time.sleep(0.1)
Multi-File Recording¶
Split large recordings across multiple files:
import os
import zelos_sdk
class MultiFileRecorder:
"""Split recording across files by size or time"""
def __init__(self, prefix="trace", max_mb=100, max_seconds=3600):
zelos_sdk.init()
self.source = zelos_sdk.TraceSource("multi")
self.prefix = prefix
self.max_bytes = max_mb * 1024 * 1024
self.max_seconds = max_seconds
self.file_index = 0
self.writer = None
self.current_file = None
self.file_start_time = None
def _rotate_file(self):
"""Start a new file"""
# Close current
if self.writer:
self.writer.close()
# Open new file
self.file_index += 1
self.current_file = f"{self.prefix}_{self.file_index:04d}.trz"
self.writer = zelos_sdk.TraceWriter(self.current_file)
self.writer.open()
self.file_start_time = time.time()
self.source.log("file_started", {
"filename": self.current_file,
"index": self.file_index
})
print(f"Recording to {self.current_file}")
def _should_rotate(self):
"""Check if rotation needed"""
if not self.current_file:
return True
# Check file size
if os.path.exists(self.current_file):
if os.path.getsize(self.current_file) >= self.max_bytes:
return True
# Check time limit
if time.time() - self.file_start_time >= self.max_seconds:
return True
return False
def log(self, event_name, data):
"""Log with automatic rotation"""
if self._should_rotate():
self._rotate_file()
self.source.log(event_name, data)
def close(self):
"""Close current file"""
if self.writer:
self.writer.close()
print(f"Closed {self.current_file}")
# Usage
recorder = MultiFileRecorder(prefix="long_test", max_mb=50, max_seconds=300)
try:
for i in range(100000):
recorder.log("data", {
"index": i,
"timestamp": time.time(),
"value": random.random()
})
time.sleep(0.01)
finally:
recorder.close()
Experiment Recording¶
Record experiments with metadata:
import json
import hashlib
import zelos_sdk
def record_experiment(config, run_function, duration_s=60):
"""
Record an experiment with metadata.
Args:
config: Dictionary of experiment parameters
run_function: Function that generates data
duration_s: Max duration in seconds
Returns:
Filename of recorded data
"""
# Generate unique filename
config_hash = hashlib.md5(
json.dumps(config, sort_keys=True).encode()
).hexdigest()[:8]
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{config['name']}_{timestamp}_{config_hash}.trz"
zelos_sdk.init()
source = zelos_sdk.TraceSource("experiment")
with zelos_sdk.TraceWriter(filename) as writer:
# Log experiment metadata
source.log("metadata", {
"config": config,
"duration_planned": duration_s,
"start_time": time.time()
})
# Run experiment
start = time.time()
samples = 0
while time.time() - start < duration_s:
try:
data = run_function(time.time() - start)
if data is None: # Function signals completion
break
source.log("measurement", data)
samples += 1
except Exception as e:
source.log("error", {"message": str(e)})
break
# Log summary
actual_duration = time.time() - start
source.log("summary", {
"duration_actual": actual_duration,
"samples": samples,
"rate_hz": samples / actual_duration if actual_duration > 0 else 0
})
print(f"Experiment complete: {filename}")
print(f" Samples: {samples}")
print(f" Duration: {actual_duration:.1f}s")
print(f" Rate: {samples/actual_duration:.1f} Hz")
return filename
# Usage
def sine_wave_generator(elapsed_time):
"""Example data generator"""
if elapsed_time > 30: # Stop after 30 seconds
return None
return {
"time": elapsed_time,
"sine": math.sin(2 * math.pi * elapsed_time),
"cosine": math.cos(2 * math.pi * elapsed_time),
"noise": random.gauss(0, 0.1)
}
config = {
"name": "waveform_test",
"frequency": 1.0,
"amplitude": 1.0,
"noise_level": 0.1
}
filename = record_experiment(config, sine_wave_generator, duration_s=60)
Best Practices¶
1. Always Close Files¶
Use context managers or try/finally:
# Automatic cleanup
with zelos_sdk.TraceWriter("data.trz") as writer:
# Your code
# Explicit cleanup
writer = zelos_sdk.TraceWriter("data.trz")
try:
writer.open()
# Your code
finally:
writer.close()
2. Use Descriptive Filenames¶
Include timestamp and context:
3. Log Metadata First¶
Always record context at the start:
# Context for analysis
source.log("metadata", {
"version": "1.2.3",
"config": experiment_config,
"operator": username,
"timestamp": time.time()
})
4. Handle Errors Gracefully¶
Don't let recording failures crash your app: