Skip to content

UDS Codec

Unified Diagnostic Services (UDS) https://www.csselectronics.com/pages/uds-protocol-tutorial-unified-diagnostic-services The UDS (Unified Diagnostic Services) codec provides a high-level interface for diagnostic communication with automotive ECUs using the ISO-14229 protocol.

Introduction

Unified Diagnostic Services (UDS) is a diagnostic communication protocol used in automotive systems. The UdsCodec simplifies interaction with ECUs by providing:

  • High-level API for diagnostic services
  • Thread-safe communication
  • Automatic message formatting and parsing
  • Error handling and logging

For more information about UDS: UDS Protocol Tutorial

Quick Start

Basic Setup

from zeloscloud.links.uds_link import UdsLink
from zeloscloud.links.can_link import CanLink
from zeloscloud.codecs.uds import HexDidCodec, UdsCodec

# Configure and create the CAN link
can_config = {
    "channel": "vcan0",
    "interface": "socketcan",
}
can_link = CanLink(config=can_config)

# Configure UDS addressing
isotp_addr = {
    "txid": 0x123,  # Request ID
    "rxid": 0x456,  # Response ID
}

# Create UDS connection
connection = UdsLink.can_connection(can_link, isotp_addr)

# Configure the codec
codec_config = {
    "client_config": {
        "request_timeout": 0.100,
        "data_identifiers": {"default": HexDidCodec},
        "input_output": {"default": HexDidCodec},
    },
    "tp_period": 1,
    "did_definitions": [
        {
            "identifier": 0x123,
            "did_codec": "<H",
            "name": "my_identifier"
        }
    ]
}

# Create and use the codec
with UdsLink(name="uds-link", connection=connection) as link, \
     UdsCodec(name="uds-codec", link=link, config=codec_config) as codec:

    # Your UDS operations here
    codec.read_data_by_identifier(0x123)

Pytest Setup

import pytest
from zeloscloud.links.uds_link import UdsLink
from zeloscloud.codecs.uds import HexDidCodec, UdsCodec

@pytest.fixture(scope="session")
def uds_link():
    """Create a UDS link for diagnostic communication"""
    can_config = {
        "channel": "vcan0",
        "interface": "socketcan",
    }
    with CanLink(config=can_config) as can_link:
        isotp_addr = {
            "txid": 0x123,
            "rxid": 0x456,
        }
        connection = UdsLink.can_connection(can_link, isotp_addr)
        with UdsLink(name="uds-link", connection=connection) as link:
            yield link

@pytest.fixture(scope="session")
def uds_codec(uds_link):
    """Create a UDS codec for diagnostic services"""
    config = {
        "client_config": {
            "request_timeout": 0.100,
            "data_identifiers": {"default": HexDidCodec},
            "input_output": {"default": HexDidCodec},
        },
        "tp_period": 1,
        "did_definitions": [
            {
                "identifier": 0x123,
                "did_codec": "<H",
                "name": "my_identifier",
            }
        ]
    }
    with UdsCodec(name="uds-codec", link=uds_link, config=config) as codec:
        yield codec

def test_read_vin(uds_codec):
    """Example test reading Vehicle Identification Number"""
    VIN_DID = 0xF190
    response = uds_codec.read_data_by_identifier(VIN_DID)
    assert response is not None
    assert len(response) == 17

Common Operations

Session Control

from udsoncan.services import DiagnosticSessionControl

# Change to different diagnostic sessions using enums
codec.change_session(DiagnosticSessionControl.Session.defaultSession)
codec.change_session(DiagnosticSessionControl.Session.programmingSession)
codec.change_session(DiagnosticSessionControl.Session.extendedDiagnosticSession)
codec.change_session(DiagnosticSessionControl.Session.safetySystemDiagnosticSession)

# Example with session management and error handling
try:
    # Enter programming session
    codec.change_session(DiagnosticSessionControl.Session.programmingSession)

    # Start tester present to maintain session
    codec.start_tp()

    # Perform programming operations
    # ... your code here ...

finally:
    # Always return to default session
    codec.stop_tp()
    codec.change_session(DiagnosticSessionControl.Session.defaultSession)

Security Access

# Request seed and calculate key
response = codec.request_seed(level=0x01)
seed = response.service_data.seed
key = calculate_key(seed)  # Implementation specific

# Send key
codec.send_key(level=0x01, key=key)

Data Reading/Writing

# Read multiple DIDs
response = codec.read_data_by_identifier([0x1234, 0x5678])

# Write DID
codec.write_data_by_identifier(0x1234, b'\x01\x02\x03')

Routine Control

routine_id = 0x123  # Example routine ID

# Start a routine
response = codec.start_routine(routine_id)

# Get routine results
results = codec.get_routine_results(routine_id)

# Stop a routine
codec.stop_routine(routine_id)

Tester Present

# Send single tester present
codec.send_tp()

# Start periodic tester present
codec.start_tp()  # Uses tp_period from config

IO Control

# Reset to default
codec.reset_to_default(0x123)

# Freeze current state
codec.freeze_current_state(0x123)

# Short term adjustment with raw data
codec.short_term_adjustment(0x123, b'\xAA')

# Return control to ECU
codec.return_control_to_ecu(0x123)

The IO Control service provides several control parameters: - returnControlToECU: Returns control of the I/O to the ECU - resetToDefault: Resets the I/O to its default state - freezeCurrentState: Maintains the current state of the I/O - shortTermAdjustment: Temporarily adjusts the I/O to a specified state

When using shortTermAdjustment, you can optionally provide a control mask to specify which bits/values should be adjusted.

File Transfer

# Download data to ECU
data = b'\x01\x02\x03\x04'  # Data to download
address = 0x1000            # Memory address
address_format = 32         # 32-bit addressing
block_size = 4             # Optional: bytes per block

# Perform download sequence
codec.perform_download_sequence(
    data=data,
    address=address,
    address_format=address_format,
    block_size=block_size
)

# Upload data from ECU
size = 1024                # Number of bytes to upload
address = 0x2000           # Memory address to read from
address_format = 32        # 32-bit addressing
block_size = 64            # Optional: bytes per block

# Perform upload sequence
data = codec.perform_upload_sequence(
    size=size,
    address=address,
    address_format=address_format,
    block_size=block_size
)

# Example: Download a file to ECU
with open('firmware.bin', 'rb') as f:
    data = f.read()
    codec.perform_download_sequence(
        data=data,
        address=0x10000,    # Flash memory address
        address_format=32,  # 32-bit addressing
        block_size=512     # 512 bytes per block
    )

# Example: Upload memory contents to file
data = codec.perform_upload_sequence(
    size=8192,            # 8KB of data
    address=0x20000,      # RAM address
    address_format=32,    # 32-bit addressing
    block_size=256       # 256 bytes per block
)
with open('memory_dump.bin', 'wb') as f:
    f.write(data)

The file transfer service provides methods for: - perform_download_sequence: Downloads data to the ECU - perform_upload_sequence: Uploads data from the ECU

Both methods support: - Configurable memory addressing - Automatic block size negotiation - Progress monitoring via logging - Automatic tester present handling - Session management

File Transfer Operations

# Add a new file
response = codec.add_file(
    path_and_name="/path/to/newfile.txt",
    data_format_identifier=0x00,  # Optional: format identifier
    file_size=1024               # Optional: expected size
)

# Delete an existing file
response = codec.delete_file("/path/to/oldfile.txt")

# Replace an existing file
response = codec.replace_file(
    path_and_name="/path/to/file.txt",
    data_format_identifier=0x00,
    file_size=2048
)

# Read a file
response = codec.read_file(
    path_and_name="/path/to/file.txt",
    data_format_identifier=0x00
)

# Read directory contents
response = codec.read_dir("/path/to/directory")

# Example: Complete file transfer sequence
try:
    # 1. Create new file
    response = codec.add_file(
        path_and_name="/firmware.bin",
        file_size=len(firmware_data)
    )

    # 2. Get transfer parameters from response
    max_block_length = response.service_data.max_length

    # 3. Perform the actual data transfer
    codec.perform_download_sequence(
        data=firmware_data,
        block_size=max_block_length
    )

except Exception as e:
    # Clean up on failure
    codec.delete_file("/firmware.bin")
    raise

The RequestFileTransfer service provides methods for: - add_file: Create a new file - delete_file: Remove an existing file - replace_file: Replace an existing file - read_file: Read file contents - read_dir: Read directory contents

All methods support: - Path and filename specification - Optional data format identifiers for compression/encryption - File size specification where applicable - Automatic error handling and logging

Configuration Options

Codec Configuration

Parameter Description Default
request_timeout Overall timeout for requests 5.0
p2_timeout Server response timeout 1.0
exception_on_negative_response Raise exception on negative response True
exception_on_invalid_response Raise exception on invalid response True

Advanced Configuration

config = {
    # Required client configuration
    "client_config": {
        "request_timeout": 0.100,
        "data_identifiers": {"default": HexDidCodec},
        "input_output": {"default": HexDidCodec},

        # Optional client settings
        "p2_timeout": 1.0,
        "p2_star_timeout": 5.0,
        "exception_on_negative_response": True,
        "exception_on_invalid_response": True,
        "exception_on_unexpected_response": True,
        "tolerate_zero_padding": True,
        "use_server_timing": True,
    },

    # Optional codec settings
    "tp_period": 1,  # TesterPresent period in seconds

    # Optional DID definitions
    "did_definitions": [
        {
            "identifier": 0x123,
            "did_codec": "<H",
            "name": "my_identifier"
        }
    ],

    # Optional periodic DID operations
    "did_periodics": [
        {
            "identifier": 0x123,
            "period": 1,
            "log_exception": False,
            # Optional: value field for periodic writes
            # "value": some_value
        }
    ]
}

Error Handling

The codec provides comprehensive error handling:

from udsoncan.exceptions import NegativeResponseException

try:
    codec.read_data_by_identifier(0x1234)
except NegativeResponseException as e:
    print(f"Negative response: {e.response.code_name}")
except TimeoutException as e:
    print(f"Timeout: {e}")

Troubleshooting

Common Issues

  1. Timeouts

    # Increase timeouts in configuration
    config = {
        "client_config": {
            "request_timeout": 5.0,  # Longer overall timeout
            "p2_timeout": 2.0,      # Longer response timeout
        }
    }
    

  2. Session Management

    # Keep session alive with periodic tester present
    codec.start_tp(period=1.0)
    
    # Ensure proper session cleanup
    try:
        codec.change_session(0x02)  # Enter programming session
        # ... perform operations ...
    finally:
        codec.change_session(0x01)  # Return to default session
    

See also: - zelos-uds - UDS Link Documentation

API Reference

See zeloscloud.codecs.uds.UdsCodec in the API Reference.