Skip to content

Serial Codec

The serial codec is included in the zeloscloud package, and requires extras serial to be installed.

Introduction

The Serial Codec is designed to streamline communication over serial interfaces. It simplifies interactions with devices by abstracting the complexity of command/query interfaces, and wrapping them into signal/message objects.

The key components are:

  • Serial Codec: Manages the encoding and decoding of data sent over a serial link.
  • Serial Link: Represents the physical connection over which data is transmitted.
  • Signals/Messages: Abstractions for sending commands and receiving data.

Read these!

To get a full understanding of about this, please read through the serial schema and serial link documentation.

How the Codec Works

The SerialCodec operates by utilizing a set of predefined messages/signals and a configuration that dictates their behavior. Signals, represented by classes such as CmdSignal, QuerySignal, and CmdQuerySignal, are the means through which the codec sends commands and queries to the connected device.

When you initialize the codec, you provide a configuration that includes: - The encoding for data transmission. - Timeouts and termination characters for commands and queries. - Definitions for each signal, outlining how to set/get specific values.

Quick Start Guide

This section will walk through creating and using a SerialCodec, a SerialLink, and the corresponding Signals/ Messages

Basic Pytest Setup

To set up a basic codec, you need to initialize it with a name, link, and configuration file. Here's a quick example to get you started:

conftest.py

import pytest
import yaml
from zeloscloud.links.serial_link import SerialLink
from zeloscloud.codecs.serial import SerialCodec


@pytest.fixture(scope="session")
def serial_link():
    """Initialize and open a serial link."""
    with SerialLink("my_link", "/dev/ttyUSB0", 115200) as link:
        yield link


@pytest.fixture(scope="session")
def serial_config():
    """Load the serial config yaml which defines the cmd/queries."""
    with open("cli.yml", "r") as f:
        config = yaml.safe_load(f)
    yield config


@pytest.fixture(scope="session")
def serial_codec(serial_link, serial_config):
    """Create a serial codec with and generate all the cmd/query objects."""
    with SerialCodec(name="my_codec", link=serial_link, config=serial_config) as codec:
        yield codec

Pytest Fixtures

We are utilizing pytest fixtures to inject dependencies into our tests and setups. This method ensures that our components are modular and can be tested independently. The serial_link and serial_codec fixtures can also be used as standalone components.

Example Configuration

This section will describe how to create a configuration file for the SerialCodec. The configuration file, in YAML format, defines the commands and queries along with other settings for the codec. The only required field is signals, but for copy pasta sake, we'll define extra config options.

cli.yml

# NOTE: This is meant to serve as an example configuration for a serial codec.
# More information about the attributes and schema in this file can be found in the docs:
#   https://docs.zeloscloud.io/py/codecs/serial/serial-schema/
encoding: "utf-8"
prompt_pattern: '\x1b\[[0-9;]*muart:~\$ \x1b\[[0-9;]*m$' # Example zephyr-shell prompt pattern -> 'uart:~$'
prompt_timeout_s: 2
echo: True
cmd_termination: "\r\n"
query_termination: "\r\n"

signals:
  # This is an example CmdSignal. It contains only a command, no query
  device_restart:
    cmd: "restart now"

  # This is an example CmdSignal that takes a value. It uses format specifiers to determine
  # where the command you want to place the value. Additionally, you send multiple values -> "led {} {}"
  led_enable:
    cmd: "led enable {}"

  # This is an example QuerySignal. It contains only a query, no command
  wifi_status:
    query: "status get wifi"
    # This will convert the decoded response to a int. You can do this for any signal
    data_type: int

  # This is an example QuerySignal with pattern matching. It contains only a query and a regex pattern.
  device_uptime:
    query:
      cmd: "uptime get"
      pattern: (\\d+\\.\\d+\\.\\d+)"

  # This is an example CmdQuerySignal. It contains both a command and a query.
  wifi_psk:
    cmd: "settings set wifi/psk {}"
    query: "settings get wifi/psk"

  # This is an example CmdQuerySignal. It contains both a command and a query, with pattern matching.
  hardware_revision:
    cmd: "hwrev set {}"
    query:
      cmd: "hwrev get"
      pattern: "(?<=Current hardware revision: ).*"

# These define messages that can contain multiple points of interest
messages:
  # Message example: Contains multiple signals
  device_info:
    query: "info get all"
    signals:
      firmware_version:
        pattern: "Firmware: (\\d+\\.\\d+\\.\\d+)"
      battery_level:
        pattern: "Battery: (\\d+\\.\\d+)%"

        # This will convert the decoded response to a float. You can do this for each signal
        data_type: float

Configuration Details

This example cli.yml includes: - Basic settings like encoding, prompt patterns, and command/query terminations. - Signal configurations for commands and queries. - Message configurations for grouping multiple signals under a single query.

Example Usage

After setting up the codec with the configuration, you can use it in your application to send commands and receive data. Here's an example showing how to interact with the SerialCodec.

Using the Codec in an Application

def test_wifi_psk(serial_codec, check):
    """An example test for the wifi signal."""

    # Sending a command
    serial_codec.wifi_psk.set("super_secret_psk")

    # Queries and check the data
    check.that(serial_codec.wifi_psk, "is", "super_secret_psk")


def test_temperature_info(serial_codec, check):
    """
    An example test for the temperature message.

    Assumes there is a serial device connected that responds:

        Temperature Info
            Ambient: 25.5
            CPU: 47.3
            GPU: 50.1

    on the given serial link.
    """
    # NOTE: This is how you send query for the message and recv the signals in the payload
    serial_codec.temp_info.recv()

    # Check the signal values from the message. This does not actively send a query, but represents
    # the last value updated from the message
    check.that(serial_codec.temp_info.ambient_temp, "==", 25.5)
    check.that(serial_codec.temp_info.cpu_temp, "==", 47.3)
    check.that(serial_codec.temp_info.gpu_temp, "==", 50.1)

Using the Codec

In this example, the SerialCodec is used to send a command to set the WiFi PSK and then query it back. It also demonstrates how to use the check plugin for making assertions on the data.

Accessing Signals within Messages

To access a signal that is part of a message by referencing by name as a signal through the message.

Example:

Consider the following message configuration in your schema:

messages:
  device_info:
    query: "info get all"
    signals:
      firmware_version:
        pattern: "Firmware: (\\d+\\.\\d+\\.\\d+)"
      battery_level:
        pattern: "Battery: (\\d+\\.\\d+)%"
        data_type: float

In this setup, the signals firmware_version and battery_level are part of the device_info message.

Here's how you might use this feature in practice:

# Accessing from the message
codec.device_info.firmware_version.get()
codec.device_info.battery_level.get()

Iterating Through Codec and Message Signals

The SerialCodec is designed to allow direct iteration through its signals, as well as through the signals contained in each message. This feature facilitates operations on multiple signals, whether to gather data or to apply certain actions.

Iterating Codec-Level Signals

You can directly iterate through all signals/messages registered with the codec. This includes both standalone signals and those that are part of messages.

Example Usage:

# Assuming codec is an instance of SerialCodec

for signal in serial_codec.signals.values():
    print(f"Signal Name: {signal.name}, Signal Value: {signal.get()}")
    # Here you can perform operations with each signal

Iterating Signals within a Message

Each message within the codec also allows iteration through its associated signals, enabling you to focus on a specific group of related signals.

Example Usage:

# Assuming 'device_info' is a message within the codec
for signal in serial_codec.device_info:
    print(f"Signal Name: {signal.name}, Signal Value: {signal.get()}")
    # This loop gives you access to each signal within the 'device_info' message

Troubleshooting

Permissions

If you see an issue with respect to permissions opening your serial device port (from the serial link), you might want to try this:

sudo chmod 666 /dev/ttyUSB0
Note: Replace /dev/ttyUSB0 with your actual device.

Debugging

You might find it useful to interact with your serial connection live as you implement your config. You can do this by launching a screen session. This example is for doing this on linux/mac in the terminal, but there a plenty of application support for any OS with many differing interfaces.

screen /dev/ttyUSB0 115200
Note: Replace /dev/ttyUSB0 with your actual device, and 115200 with your actual baud-rate.

Once you close the session (option-a + d on mac), you may run into issues with the serial connection being busy. Try the follow to kill all the active serial sessions.

pkill screen

API Reference

See zeloscloud.codecs.serial in the API Reference.