Skip to content

Creating Device Drivers for OpenAVC

OpenAVC supports three ways to create device drivers, from easiest to most powerful:

  1. Driver Builder UI. Visual wizard in the Programmer IDE. No code required.
  2. Driver Definition File (.avcdriver). Write a YAML file by hand. No code required.
  3. Python Driver. Full Python class for advanced protocols.

All three methods produce drivers that work identically at runtime. Choose the simplest method that covers your device’s protocol.

MethodSkill LevelBest For
Driver Builder UIBeginnerText-based protocols (Extron SIS, Kramer, generic RS-232)
.avcdriver FileIntermediateText-based protocols, sharing drivers as files
Python DriverAdvancedBinary protocols, authentication handshakes, complex state

Quick Decision Guide

Can the device be controlled with text commands over TCP or serial? (e.g., sending "POWR ON\r" and getting back "POWR=ON\r")

  • Yes: Use the Driver Builder UI or a .avcdriver definition.
  • No, it uses HTTP/REST: Use a .avcdriver definition with transport: http. HTTP commands use method, path, and body fields instead of raw command strings. See the HTTP section below.
  • No, it uses a binary protocol: Use a Python driver.
  • No, it uses UDP broadcast: Use a Python driver (see the Wake-on-LAN driver as an example).

Method 1: Driver Builder UI

The Driver Builder is a visual tool inside the Programmer IDE. Open it by clicking the Driver Library icon (hard drive) in the sidebar.

Step-by-step Walkthrough

1. Create a new driver

Click Create New Driver in the left panel. The editor opens with seven tabs.

2. General tab: Name your driver

FieldExampleNotes
Driver IDextron_sw4Lowercase, no spaces. Cannot be changed later.
Driver NameExtron SW4 HD 4KHuman-readable. Shown in the “Add Device” dialog.
ManufacturerExtron
CategorySwitcherPick from the dropdown.
Version1.0.0
AuthorYour Name
DescriptionControls Extron SW4 HD 4K HDMI switcher via RS-232 or TCP.

3. Transport tab: How to connect

  • Transport Type: TCP (network) or Serial (RS-232/RS-485).
  • Message Delimiter: The character(s) that mark the end of every message. Check your device’s protocol guide. Common values:
    • \r for most AV devices (Extron, Kramer, PJLink)
    • \r\n for some network devices
    • \n is rare
  • Default Port (TCP) or Default Baud Rate (Serial): Pre-filled when adding this device.

4. State Variables tab: What to track

Define the properties you want to read from the device. Each state variable becomes visible in the Devices view and available for macros, scripts, and UI bindings.

Variable IDLabelHelp TextType
inputCurrent InputActive input numberInteger
volumeVolumeVolume level 0-100Integer
muteMuteAudio mute stateBoolean

Types: string, integer, boolean, enum.

The Help Text column is optional but recommended. It’s shown in the Driver Builder and used by the AI assistant to understand what each variable represents.

5. Commands tab: What to send

Click Add Command, then fill in:

FieldExampleNotes
Command IDset_inputUsed in macros and scripts.
Display LabelSet InputShown in the UI.
Help TextSwitch the active input source.Shown when selecting this command in the IDE and used by the AI assistant.
Command String{input}!\rThe raw bytes to send. Use {param_name} for parameter placeholders.
Parametersinput (Integer)Defines what the user fills in when using this command.

Parameter placeholders: The string {input}!\r with parameter input=3 becomes 3!\r when sent.

Escape sequences: The following escape sequences are supported in command strings and delimiters: \r (carriage return), \n (newline), \t (tab), \\ (literal backslash), \xHH (hex byte, e.g. \x1B for ESC).

Example commands for an Extron switcher:

Command IDLabelStringParameters
set_inputSet Input{input}!\rinput (Integer)
set_volumeSet Volume{level}V\rlevel (Integer)
mute_onMute On1Z\r(none)
mute_offMute Off0Z\r(none)
query_inputQuery Input!\r(none)

6. Responses tab: How to parse replies

Each response pattern is a regular expression (regex) that matches a line the device sends back. When a match is found, capture groups are mapped to state variables.

Example: The Extron switcher responds In3 All when input 3 is selected.

Regex PatternGroupState VariableType
In(\d+) All1inputInteger
  • Group 1 means the first (\d+) capture group, the number.
  • Type tells the system how to convert the captured string: integer parses it as a number, boolean treats 1/true/on as true, string keeps it as-is.

Value maps: For devices that return codes instead of readable values, you can add a value map. For example, if a projector returns POWR=0 for off and POWR=1 for on:

Regex PatternGroupState VariableTypeMap
POWR=(\d)1powerString{"0": "off", "1": "on"}

Value maps are configured in the JSON definition (see Method 2). The UI shows a type dropdown.

7. Polling tab: Automatic status queries

If the device doesn’t push status changes on its own, you can poll it periodically.

  • Poll Interval: How often to send queries (seconds). Set to 0 to disable. Typical: 10—30 seconds.
  • Poll Queries: The command strings to send each cycle. For example, !\r to query the current input.

8. Live Test tab: Try it out

Enter a device’s IP address and port, type a command string, and hit Send. You’ll see the raw response from the device. Use this to verify your command strings and response patterns before saving.

9. Save

Click Save. The driver is immediately available in the “Add Device” dialog. The definition file is saved to the driver_repo/ directory.

Importing and Exporting Drivers

Exporting a driver to share it

You can export any driver as an .avcdriver file:

  • From the list: Click the download icon next to a driver in the left panel.
  • From the editor: Click the Export button in the editor header (next to Save).

This downloads an .avcdriver file you can share with other OpenAVC users, commit to a git repo, or back up.

Importing a driver someone shared with you

Click Import from File in the left panel. You have two options:

  • Choose a file: Click “Choose a .avcdriver file” to pick a driver definition file from your computer.
  • Paste JSON/YAML: Paste the definition text directly into the text area and click Import.

The driver is validated, saved to driver_repo/, and immediately available for use.

What about Python drivers?

Python drivers (.py files) can also be imported through the UI or installed from the community repository. They are saved to driver_repo/ and loaded automatically at startup.

Community Driver Repository

The Browse Community tab in the Driver Builder view connects to the OpenAVC Community Driver Library on GitHub. From there you can:

  • Search for drivers by manufacturer, model, or device type
  • Browse community-contributed drivers (both YAML and Python)
  • Filter by category (Projector, Display, Switcher, Audio, Camera, etc.)
  • Install with one click

Installed drivers are saved to driver_repo/ and immediately available in the “Add Device” dialog.


Method 2: Driver Definition File (.avcdriver)

A driver definition is a YAML file with the .avcdriver extension. It’s what the Driver Builder UI creates under the hood. Writing one by hand is useful for sharing drivers, version-controlling them, or when you want to work in a text editor.

YAML was chosen over JSON because it supports comments (essential for documenting protocol details from manufacturer manuals) and doesn’t require double-escaping regex patterns.

Where to put .avcdriver files

DirectoryPurpose
server/drivers/definitions/Built-in drivers (shipped with OpenAVC)
driver_repo/Community and user drivers

Both directories are scanned at startup. Files are loaded, validated, and registered automatically.

You can also import an .avcdriver file through the Driver Builder UI (click Import from File), which copies it into driver_repo/ for you.

Full Example: Extron SIS Switcher

# Extron SIS Switcher Driver
# Reference: Extron SIS Command/Response Reference, Section 3
# Protocol: text-based over TCP (port 23) or RS-232 (9600 8N1)
id: extron_sis_switcher
name: Extron SIS Switcher
manufacturer: Extron
category: switcher
version: 1.0.0
author: OpenAVC Community
description: Controls Extron SIS-compatible switchers over TCP or serial.
transport: tcp
delimiter: "\r\n"
help:
overview: >
Controls Extron SIS-compatible switchers. Supports input routing,
volume control, and mute.
setup: >
1. Connect the switcher to the network or via RS-232 (9600 8N1).
2. For TCP, use port 23 (default Extron telnet port).
default_config:
host: ""
port: 23
poll_interval: 15
config_schema:
host:
type: string
required: true
label: IP Address
port:
type: integer
default: 23
label: Port
poll_interval:
type: integer
default: 15
min: 0
label: Poll Interval (sec)
state_variables:
input:
type: integer
label: Current Input
volume:
type: integer
label: Volume
mute:
type: boolean
label: Mute
commands:
set_input:
label: Set Input
string: "{input}!\r\n" # e.g., "3!\r\n" to select input 3
help: Route a specific input to all outputs.
params:
input: { type: integer, required: true, help: "Input number (1-based)" }
set_volume:
label: Set Volume
string: "{level}V\r\n" # e.g., "45V\r\n" to set volume to 45
help: Set the audio volume level.
params:
level: { type: integer, required: true, help: "Volume level 0-100" }
mute_on:
label: Mute On
string: "1Z\r\n"
help: Mute the audio output.
params: {}
mute_off:
label: Mute Off
string: "0Z\r\n"
help: Unmute the audio output.
params: {}
query_input:
label: Query Input
string: "!\r\n" # Response: "In3 All"
params: {}
query_volume:
label: Query Volume
string: "V\r\n" # Response: "Vol45"
params: {}
responses:
# "In3 All" -> input = 3
- pattern: 'In(\d+) All'
mappings:
- { group: 1, state: input, type: integer }
# "Vol45" -> volume = 45
- pattern: 'Vol(\d+)'
mappings:
- { group: 1, state: volume, type: integer }
# "Amt1" -> mute = true, "Amt0" -> mute = false
- pattern: 'Amt(\d+)'
mappings:
- { group: 1, state: mute, type: boolean }
polling:
interval: 15
queries:
- "!\r\n" # Query current input
- "V\r\n" # Query current volume

Notice how much cleaner this is compared to JSON: comments explain the protocol, regex patterns don’t need double-escaping, and the structure is easy to scan.

Definition Reference

Top-level fields

FieldRequiredDescription
idYesUnique driver identifier. Lowercase, underscores.
nameYesHuman-readable display name.
transportYes"tcp" or "serial".
manufacturerNoManufacturer name. Default: "Generic".
categoryNoOne of: projector, display, switcher, scaler, audio, camera, lighting, relay, utility, other.
versionNoSemantic version. Default: "1.0.0".
authorNoWho wrote this driver.
descriptionNoBrief description.
helpNoHelp text object: {overview: "...", setup: "..."}. Shown in the Add Device dialog and available to the AI assistant.
delimiterNoMessage delimiter. Default: "\\r". Use "\\r\\n" for CRLF.
default_configNoDefault values for config fields.
config_schemaNoDescribes config fields shown in “Add Device” dialog.
device_settingsNoConfigurable settings that live on the device. See below.
state_variablesNoState properties this driver exposes.
commandsNoCommands this driver can send.
responsesNoRegex patterns for parsing device replies.
pollingNoPeriodic status query configuration.
frame_parserNoAdvanced: custom framing (see below).
protocolsNoProtocol names this driver speaks (e.g., ["pjlink"], ["extron_sis"]). Helps discovery match devices to drivers.
discoveryNoDiscovery hints for network scanning. See Discovery Hints below.

config_schema entry

"host": {
"type": "string",
"required": true,
"label": "IP Address",
"default": "",
"description": "Help text shown below the field"
}

Types: string, integer, number, enum, object. For enum, add a "values" array.

device_settings entry

Device settings are configurable values that live on the device, readable via polling and writable by the driver. Unlike config (which is stored in the project file), device settings are pushed directly to the device hardware. Examples: NDI source name, device hostname, tally mode, video format.

device_settings:
ndi_name:
type: string
label: NDI Source Name
help: >
The name other devices use to subscribe to this NDI source
on the network. Must be unique across all NDI devices.
state_key: ndi_name
default: BIRDDOG
setup: true
unique: true
write:
method: POST
path: /encodesetup
body: '{"NDIName": "{value}"}'
FieldRequiredDescription
typeYesstring, integer, number, boolean, or enum.
labelYesHuman-readable label shown in the Programmer IDE.
helpYesInline help text explaining what the setting does.
state_keyNoWhich state variable provides the current value. Defaults to the setting key.
defaultYesDefault value for new devices.
setupNoIf true, the setting is prompted during add-to-project. Default: false.
uniqueNoIf true, the system generates a non-clashing default (appends device ID). Default: false.
valuesNoFor enum type: array of allowed values.
min / maxNoFor integer / number types: value range.
regexNoOptional regex for string validation.
writeNoHow to write the setting to the device (YAML drivers only, see below).

Write definitions (YAML drivers):

For HTTP drivers, the write section specifies the HTTP request to send:

write:
method: POST
path: /encodesetup
body: '{"NDIName": "{value}"}'

For TCP/serial drivers, use a send string:

write:
send: 'SET HOSTNAME {value}\r'

The {value} placeholder is replaced with the new setting value at runtime. Config values like {host} are also available.

Python drivers override set_device_setting(key, value) instead of using write definitions:

async def set_device_setting(self, key: str, value: Any) -> Any:
match key:
case "ndi_name":
await self._api_post("encodesetup", {"NDIName": str(value)})
self.set_state("ndi_name", str(value))
case _:
raise ValueError(f"Unknown device setting: {key}")

In the Programmer IDE, device settings appear in a dedicated “Device Settings” section in the device detail view, separate from commands. Each setting shows its current value (from state polling), label, and help text. Users can click to edit and the new value is pushed directly to the device.

state_variables entry

"power": {
"type": "enum",
"values": ["off", "on", "warming", "cooling"],
"label": "Power State",
"help": "Current power state. 'warming' and 'cooling' are transitional."
}

Types: string, integer, boolean, enum.

The optional help field provides a description shown in the Driver Builder UI and available to the AI assistant.

commands entry

"set_input": {
"label": "Set Input",
"string": "{input}!\\r",
"help": "Switch the active input source on the switcher.",
"params": {
"input": { "type": "integer", "required": true, "help": "Input number (1-based)" }
}
}
  • string: The raw bytes to send. {param_name} placeholders are substituted at runtime. Supported escape sequences: \r, \n, \t, \\, \xHH (hex byte).
  • help: Optional description of what the command does. Shown in the Programmer IDE command testing panel, macro editor, UI builder, and used by the AI assistant to understand commands.
  • params: Parameter definitions. Each key matches a {placeholder} in the string. Each parameter can include an optional help field describing what values are expected.

responses entry

{
"pattern": "POWR=(\\d)",
"mappings": [
{
"group": 1,
"state": "power",
"type": "string",
"map": { "0": "off", "1": "on" }
}
]
}
  • pattern: A regular expression. Use capture groups () to extract values.
  • mappings[].group: Which regex capture group (1-based).
  • mappings[].state: Which state variable to update.
  • mappings[].type: How to convert the captured text: string, integer, float, boolean.
  • mappings[].map (optional): A lookup table. If the captured value is a key in this object, the mapped value is used instead.

Responses are checked in order. The first matching pattern wins.

polling section

"polling": {
"interval": 15,
"queries": ["!\\r\\n", "V\\r\\n"]
}
  • interval: Seconds between poll cycles. Also set via poll_interval in device config.
  • queries: Command strings sent each cycle.

frame_parser (advanced)

For protocols that don’t use a simple delimiter, you can specify a frame parser:

"frame_parser": {
"type": "length_prefix",
"header_size": 2,
"header_offset": 0,
"include_header": false
}

Types: length_prefix (reads a length header then N bytes), fixed_length (messages are always N bytes). For anything more complex, use a Python driver.

Discovery Hints

The discovery section helps OpenAVC’s network discovery system identify devices on the network and match them to your driver. When a user runs a discovery scan, these hints improve how accurately your driver is suggested for detected devices.

discovery:
ports: [23]
mac_prefixes: ["00:05:a6"]
FieldTypeDescription
portslist[int]TCP ports this device typically listens on. Used to match against open ports found during scanning.
mac_prefixeslist[str]IEEE OUI prefixes (first 3 bytes of MAC address) for this manufacturer’s devices. Format: "00:05:a6".
mdns_serviceslist[str]mDNS/Bonjour service types the device advertises (e.g., "_pjlink._tcp.local.").
upnp_typeslist[str]UPnP device type URNs the device advertises.
hostname_patternslist[str]Regex patterns to match against the device’s hostname (e.g., "^DTP-.*").

All fields are optional. Even without a discovery section, the driver’s manufacturer, category, and default_config.port are used as basic hints. Adding explicit discovery hints makes matching more accurate.

Protocol Declaration

If your device speaks a known protocol that OpenAVC’s discovery probes can identify (PJLink, Extron SIS, Biamp Tesira, QSC Q-SYS, Kramer P3000, Samsung MDC, VISCA, etc.), declare it with the top-level protocols field:

protocols: ["pjlink"]

This lets the discovery system match your driver directly when it identifies the protocol on a device, without relying on the built-in fallback mapping.

Where to find MAC prefixes: Look at the MAC address of your device (shown in device network settings or arp -a). The first three octets (e.g., 00:05:a6) identify the manufacturer. You can verify at IEEE OUI lookup.

HTTP/REST Drivers (.avcdriver)

For devices controlled via HTTP/REST APIs (Panasonic PTZ cameras, Sony Bravia displays, Crestron DM NVX, Zoom Rooms, etc.), set transport: http and use HTTP-specific command fields.

HTTP commands use method, path, and body instead of string/send:

# Panasonic AW-series PTZ Camera (HTTP CGI control)
id: panasonic_aw_ptz
name: Panasonic AW PTZ Camera
manufacturer: Panasonic
category: camera
transport: http
default_config:
host: ""
port: 80
poll_interval: 5
config_schema:
host:
type: string
required: true
label: IP Address
port:
type: integer
default: 80
label: Port
auth_type:
type: enum
values: ["none", "basic", "digest"]
default: "none"
label: Authentication
username:
type: string
default: "admin"
label: Username
password:
type: string
default: ""
label: Password
secret: true
verify_ssl:
type: boolean
default: false
label: Verify SSL Certificate
state_variables:
power:
type: enum
values: ["off", "on"]
label: Power State
pan:
type: string
label: Pan Position
tilt:
type: string
label: Tilt Position
commands:
power_on:
label: Power On
method: GET
path: "/cgi-bin/aw_ptz?cmd=%23O1&res=1"
power_off:
label: Power Off
method: GET
path: "/cgi-bin/aw_ptz?cmd=%23O0&res=1"
recall_preset:
label: Recall Preset
method: GET
path: "/cgi-bin/aw_ptz?cmd=%23R{preset:02d}&res=1"
params:
preset:
type: integer
required: true
label: Preset Number
min: 1
max: 100
set_pan_tilt:
label: Set Pan/Tilt
method: GET
path: "/cgi-bin/aw_ptz?cmd=%23APC{pan}{tilt}&res=1"
params:
pan:
type: string
required: true
label: Pan (hex, 4 chars)
tilt:
type: string
required: true
label: Tilt (hex, 4 chars)
responses:
# Power query response contains "p1" (on) or "p0" (off)
- match: 'p1'
set: { power: "on" }
- match: 'p0'
set: { power: "off" }
polling:
interval: 5
queries:
- "/cgi-bin/aw_ptz?cmd=%23O&res=1"

HTTP command fields

FieldRequiredDescription
methodNoHTTP method: GET, POST, PUT, DELETE. Default: GET.
pathYesURL path (appended to http://host:port). Supports {param} substitution.
bodyNoRequest body (JSON string). Supports {param} substitution. Used with POST/PUT.
query_paramsNoQuery parameters as key-value pairs. Supports {param} substitution.
paramsNoParameter definitions (same as TCP/serial commands).

HTTP config fields

These fields in config_schema are recognized by the HTTP transport:

FieldDescription
hostDevice IP or hostname (required)
portPort number (default: 80)
sslUse HTTPS (default: false)
auth_type"none", "basic", "bearer", "api_key", "digest"
usernameFor basic/digest auth
passwordFor basic/digest auth
tokenFor bearer auth
api_keyFor API key auth
api_key_headerHeader name for API key (default: X-API-Key)
verify_sslVerify HTTPS certificates (default: true, set false for self-signed)
timeoutRequest timeout in seconds (default: 10)

HTTP polling

For HTTP drivers, polling queries can be:

  • Command names: executes that command (e.g., "get_status")
  • URL paths: sends a GET request to that path (e.g., "/api/status")

Response text from polled endpoints is matched against responses patterns, same as TCP/serial.

JSON body with parameter substitution

For REST APIs that expect JSON bodies, use the body field. Parameter placeholders {name} are substituted, and literal JSON braces are preserved:

commands:
set_volume:
label: Set Volume
method: POST
path: "/api/audio"
body: '{"channel": "program", "level": {level}}'
params:
level:
type: integer
required: true
min: 0
max: 100

With level=75, this sends POST /api/audio with body {"channel": "program", "level": 75}.

Managing Drivers via API

EndpointMethodDescription
/api/driver-definitionsGETList all driver definitions
/api/driver-definitions/{id}GETGet a single definition
/api/driver-definitionsPOSTCreate a new definition
/api/driver-definitions/{id}PUTUpdate a definition
/api/driver-definitions/{id}DELETEDelete a definition
/api/driver-definitions/{id}/test-commandPOSTTest a command against live hardware

Method 3: Python Driver

Python drivers give you full control. Use this method when:

  • The device uses a binary protocol (bytes, checksums, length headers).
  • The device requires an authentication handshake on connect.
  • You need complex state logic that can’t be expressed as regex patterns.
  • The device uses a non-standard transport (UDP, HTTP, etc.).

Minimal Example: Simple TCP Device

If your device uses a text protocol over TCP with \r delimiters, you can rely on the auto-transport system and only implement send_command():

server/drivers/my_switcher.py
from server.drivers.base import BaseDriver
from typing import Any
class MySwitcherDriver(BaseDriver):
"""Controls my custom video switcher."""
DRIVER_INFO = {
"id": "my_switcher",
"name": "My Video Switcher",
"manufacturer": "Custom",
"category": "switcher",
"version": "1.0.0",
"author": "Your Name",
"description": "Controls my custom switcher via TCP.",
"transport": "tcp",
"default_config": {
"host": "",
"port": 23,
"poll_interval": 15,
},
"config_schema": {
"host": {"type": "string", "required": True, "label": "IP Address"},
"port": {"type": "integer", "default": 23, "label": "Port"},
"poll_interval": {"type": "integer", "default": 15, "label": "Poll Interval (sec)"},
},
"state_variables": {
"input": {"type": "integer", "label": "Current Input"},
},
"commands": {
"set_input": {
"label": "Set Input",
"params": {
"input": {"type": "integer", "required": True},
},
},
"query_input": {"label": "Query Input", "params": {}},
},
}
async def send_command(self, command: str, params: dict[str, Any] | None = None) -> Any:
params = params or {}
if not self.transport or not self.transport.connected:
raise ConnectionError(f"[{self.device_id}] Not connected")
match command:
case "set_input":
input_num = params.get("input", 1)
await self.transport.send(f"{input_num}!\r".encode())
case "query_input":
await self.transport.send(b"!\r")
async def on_data_received(self, data: bytes) -> None:
text = data.decode("ascii", errors="ignore").strip()
# Parse "In3 All" style responses
if text.startswith("In") and "All" in text:
try:
input_num = int(text[2:].split()[0])
self.set_state("input", input_num)
except ValueError:
pass
async def poll(self) -> None:
if self.transport and self.transport.connected:
await self.transport.send(b"!\r")

What’s happening here:

  • connect() and disconnect() are not defined. The base class handles them automatically using DRIVER_INFO["transport"] and the device config.
  • The default delimiter (\r) is used for message framing.
  • Polling is started automatically if poll_interval > 0 in the device config.
  • on_data_received() is called with complete, delimiter-stripped messages.
  • set_state("input", 3) writes to device.<device_id>.input in the state store.
  • All sent and received data is automatically logged in the device log. No logging code needed. See “Device Log” below.

Installing a Python Driver

Place your .py driver file in the driver_repo/ directory. OpenAVC scans this directory at startup and dynamically loads any Python file that contains a BaseDriver subclass with a valid DRIVER_INFO dict.

You can also install Python drivers through the Programmer IDE:

  • Browse Community tab: click Install on any Python driver
  • Import from File: upload a .py file from your computer

After installation or restart, the driver appears in the “Add Device” dialog.

Full Example: Binary Protocol (Samsung MDC)

For binary protocols, you override _create_frame_parser() and _resolve_delimiter() to tell the transport how to split the byte stream into messages. This example is the actual Samsung MDC driver included with OpenAVC.

server/drivers/samsung_mdc.py
from server.drivers.base import BaseDriver
from server.transport.binary_helpers import checksum_sum
from server.transport.frame_parsers import CallableFrameParser, FrameParser
from typing import Any, Optional
# MDC command constants
CMD_POWER = 0x11
CMD_VOLUME = 0x12
# Frame builder helper
def _build_mdc_frame(cmd: int, display_id: int, data: bytes = b"") -> bytes:
frame = bytes([cmd, display_id, len(data)]) + data
cs = checksum_sum(frame)
return bytes([0xAA]) + frame + bytes([cs])
# Frame parser helper
def _parse_mdc_frame(buffer: bytes) -> tuple[bytes | None, bytes]:
start = buffer.find(0xAA)
if start == -1:
return None, b""
if start > 0:
buffer = buffer[start:]
if len(buffer) < 4:
return None, buffer
data_len = buffer[3]
total_len = 4 + data_len + 1
if len(buffer) < total_len:
return None, buffer
frame = buffer[1 : total_len - 1]
return frame, buffer[total_len:]
class SamsungMDCDriver(BaseDriver):
DRIVER_INFO = {
"id": "samsung_mdc",
"name": "Samsung MDC Display",
"manufacturer": "Samsung",
"category": "display",
"transport": "tcp",
"default_config": {"host": "", "port": 1515, "display_id": 1, "poll_interval": 15},
"config_schema": {
"host": {"type": "string", "required": True, "label": "IP Address"},
"port": {"type": "integer", "default": 1515, "label": "Port"},
"display_id": {"type": "integer", "default": 1, "label": "Display ID"},
},
"state_variables": {
"power": {"type": "enum", "values": ["off", "on"], "label": "Power"},
"volume": {"type": "integer", "label": "Volume"},
},
"commands": {
"power_on": {"label": "Power On", "params": {}},
"set_volume": {"label": "Set Volume", "params": {"level": {"type": "integer"}}},
},
}
def _create_frame_parser(self) -> Optional[FrameParser]:
# Use a callable parser for custom binary framing
return CallableFrameParser(_parse_mdc_frame)
def _resolve_delimiter(self) -> Optional[bytes]:
# Binary protocol -- no delimiter
return None
async def send_command(self, command: str, params: dict[str, Any] | None = None) -> Any:
params = params or {}
if not self.transport or not self.transport.connected:
raise ConnectionError(f"[{self.device_id}] Not connected")
display_id = self.config.get("display_id", 1)
match command:
case "power_on":
await self.transport.send(_build_mdc_frame(CMD_POWER, display_id, bytes([1])))
case "set_volume":
level = max(0, min(100, int(params.get("level", 0))))
await self.transport.send(_build_mdc_frame(CMD_VOLUME, display_id, bytes([level])))
async def on_data_received(self, data: bytes) -> None:
if len(data) < 3:
return
cmd = data[0]
payload = data[3:] if len(data) > 3 else b""
if cmd == CMD_POWER and payload:
self.set_state("power", "on" if payload[0] else "off")
elif cmd == CMD_VOLUME and payload:
self.set_state("volume", payload[0])

Key differences from the text-protocol example:

  • _create_frame_parser() returns a CallableFrameParser with a custom function that knows how to find message boundaries in the binary stream.
  • _resolve_delimiter() returns None because there’s no text delimiter.
  • on_data_received() gets complete binary frames (header and checksum already stripped by the parser).

Custom connect(): Authentication Handshake

If a device requires a handshake on connect (like PJLink’s greeting), override connect():

async def connect(self) -> None:
from server.transport.tcp import TCPTransport
host = self.config.get("host", "")
port = self.config.get("port", 4352)
self.transport = await TCPTransport.create(
host=host,
port=port,
on_data=self.on_data_received,
on_disconnect=self._handle_transport_disconnect,
delimiter=b"\r",
name=self.device_id, # identifies this device in the log
)
# Wait for the device's greeting message
await asyncio.sleep(0.1)
# Send authentication if needed
password = self.config.get("password", "")
if password:
await self.transport.send(f"AUTH {password}\r".encode())
await asyncio.sleep(0.1)
self._connected = True
self.set_state("connected", True)
await self.events.emit(f"device.connected.{self.device_id}")
poll_interval = self.config.get("poll_interval", 15)
if poll_interval > 0:
await self.start_polling(poll_interval)

Important: Always pass name=self.device_id when creating a transport manually. This ensures all sent and received data appears in the device log with the correct device name for filtering.

Custom Transport: UDP / Wake-on-LAN

For devices that don’t use persistent connections, override both connect() and disconnect():

class WakeOnLANDriver(BaseDriver):
DRIVER_INFO = {
"id": "wake_on_lan",
"transport": "udp",
# ...
}
async def connect(self) -> None:
# No persistent connection needed
self._connected = True
self.set_state("connected", True)
await self.events.emit(f"device.connected.{self.device_id}")
async def disconnect(self) -> None:
self._connected = False
self.set_state("connected", False)
await self.events.emit(f"device.disconnected.{self.device_id}")
async def send_command(self, command: str, params=None) -> Any:
if command == "wake":
# Create a temporary UDP socket, send, close
udp = UDPTransport(name=self.device_id)
await udp.open(allow_broadcast=True)
await udp.send(magic_packet, "255.255.255.255", 9)
udp.close()

BaseDriver Hooks Reference

These methods can be overridden in your driver subclass:

MethodRequiredDefault Behavior
send_command(command, params)Yes(abstract, must implement)
connect()NoAuto-creates TCP or serial transport from DRIVER_INFO and config
disconnect()NoStops polling, closes transport, updates state
on_data_received(data)NoNo-op. Override to parse device responses.
poll()NoNo-op. Override to send status queries.
_create_frame_parser()NoReturns None (uses delimiter framing). Override for binary protocols.
_resolve_delimiter()NoChecks DRIVER_INFO, then config, then defaults to b"\r".
_handle_transport_disconnect()NoSets connected=False, emits disconnect event.

Convenience Methods

These are available on every driver via the BaseDriver base class:

MethodDescription
self.set_state("power", "on")Sets device.<device_id>.power in the state store
self.get_state("power")Gets the current value of device.<device_id>.power
await self.start_polling(15)Starts calling self.poll() every 15 seconds
await self.stop_polling()Stops the polling loop
await self.transport.send(data)Send raw bytes to the device
await self.transport.send_and_wait(data, timeout=5)Send and wait for the next response
self.device_idThe device’s ID (e.g., "projector1")
self.configThe device’s config dict from project.avc
self.eventsThe EventBus instance (for emitting custom events)

Device Log

All transport types (TCP, serial, HTTP, UDP) automatically log every send and receive at INFO level, tagged with the device ID. This means:

  • You do not need to add any logging code for protocol traffic. It’s built into the transport layer.
  • Every TX (sent) and RX (received) message appears in the Programmer IDE’s device log, filterable by device.
  • Text data is shown decoded. Binary data is shown as hex.

Example log output (automatic, no code needed):

[my_projector] TX: %1POWR 1
[my_projector] RX: %1POWR=OK
[my_projector] TX: %1POWR ?
[my_projector] RX: %1POWR=3

When to add your own logging: Use log.info(f"[{self.device_id}] ...") for semantic events that add meaning beyond the raw protocol. For example, interpreting a power state code into a human-readable value:

from server.utils.logger import get_logger
log = get_logger(__name__)
async def on_data_received(self, data: bytes) -> None:
# The transport already logged: [my_projector] RX: %1POWR=3
# Add a semantic log for the interpreted meaning:
if code == "POWR":
state = {"0": "off", "1": "on", "2": "cooling", "3": "warming"}[value]
self.set_state("power", state)
log.info(f"[{self.device_id}] Power: {state}")

If you override connect(): Always pass name=self.device_id when creating a transport manually. If you forget, the log will show the IP address instead of the device name, and device log filtering won’t work.

Available Frame Parsers

Import from server.transport.frame_parsers:

ParserUse Case
DelimiterFrameParser(b"\r")Text protocols with a line ending
LengthPrefixFrameParser(header_size=2)Protocols with a length byte/word before the payload
FixedLengthFrameParser(length=8)Protocols where every message is exactly N bytes
CallableFrameParser(your_function)Custom protocols where you write the parsing logic

All frame parsers accept an optional max_buffer parameter (default: 65536 bytes / 64 KB). If the internal buffer exceeds this limit (for example, when a device sends garbage data without proper delimiters), the buffer is automatically cleared to prevent unbounded memory growth.

Available Binary Helpers

Import from server.transport.binary_helpers:

FunctionDescription
checksum_xor(data)XOR all bytes together
checksum_sum(data)Sum all bytes, masked to 0xFF
crc16_ccitt(data)CRC-16/CCITT-FALSE
hex_dump(data)Format bytes as a hex dump string for logging
escape_bytes(data, escape_char, special)Escape special bytes
unescape_bytes(data, escape_char, special)Reverse escape

DRIVER_INFO Reference

Every driver, whether Python, JSON, or Driver Builder, defines the same metadata structure. Here’s the complete reference:

DRIVER_INFO = {
# --- Required ---
"id": "unique_driver_id", # Lowercase, underscores only
"name": "Human-Readable Name",
"transport": "tcp", # "tcp", "serial", "http", or "udp"
# --- Optional metadata ---
"manufacturer": "Generic",
"category": "utility", # projector, display, switcher, etc.
"version": "1.0.0",
"author": "Your Name",
"description": "What this driver does.",
# --- Help text (shown in UI and used by AI assistant) ---
"help": {
"overview": "Brief description of what this driver controls.",
"setup": "Step-by-step setup instructions for connecting the device.",
},
# --- Connection defaults ---
"default_config": {
"host": "",
"port": 23,
"poll_interval": 15,
},
# --- Config fields shown in "Add Device" dialog ---
"config_schema": {
"host": {
"type": "string",
"required": True,
"label": "IP Address",
},
"port": {
"type": "integer",
"default": 23,
"label": "Port",
},
},
# --- State properties this driver exposes ---
"state_variables": {
"power": {
"type": "enum",
"values": ["off", "on"],
"label": "Power State",
"help": "Current power state of the device.",
},
"volume": {
"type": "integer",
"label": "Volume",
"help": "Current volume level (0-100).",
},
},
# --- Commands this driver accepts ---
"commands": {
"power_on": {
"label": "Power On",
"params": {},
"help": "Turn on the device.",
},
"set_volume": {
"label": "Set Volume",
"params": {
"level": {
"type": "integer",
"min": 0,
"max": 100,
"required": True,
"help": "Volume level 0-100.",
},
},
"help": "Set the audio volume level.",
},
},
# --- Protocol declarations (optional, improves discovery matching) ---
"protocols": ["extron_sis"],
# --- Discovery hints (optional, improves network scanning) ---
"discovery": {
"ports": [23],
"mac_prefixes": ["00:05:a6"],
},
}

Testing Your Driver

Without hardware (simulation mode)

For serial drivers, use the SIM: prefix as the port name (e.g., SIM:test). This creates a simulated serial connection that accepts sends without error.

For TCP drivers, you can run a simple echo server or one of the included simulators:

Terminal window
# PJLink simulator (built-in)
python -m tests.simulators.pjlink_simulator
# Samsung MDC simulator (built-in)
python -m tests.simulators.samsung_mdc_simulator
# vMix simulator (built-in)
python -m tests.simulators.vmix_simulator

With the dev server

Terminal window
python dev.py

Then open the Programmer UI at http://localhost:8080/programmer:

  1. Go to Devices > Add Device.
  2. Select your new driver from the dropdown.
  3. Enter the connection details.
  4. Use the Command Testing section to send commands and see state updates.

Writing automated tests

See tests/test_pjlink_driver.py or tests/test_samsung_mdc_driver.py for examples. The pattern is:

  1. Create a simulator fixture that listens on a test port.
  2. Create a driver fixture that connects to the simulator.
  3. Send commands and assert state changes.
async def test_power_on(my_driver, state):
await my_driver.send_command("power_on")
await asyncio.sleep(0.2) # Wait for response
assert my_driver.get_state("power") == "on"

Run tests with:

Terminal window
pytest tests/test_my_driver.py -v