feat(protocol): pure framing/codecs/decoder/encoder + tests

Phase 1: hardware-free protocol core for the DA-12 station serial format.
- codecs: hex widths, signed-16/32 parsing, fixed-point scaling, alarm bitmask,
  duration/clock helpers, FieldReader (tab-delimited, type-letter-adjacent).
- framing: StreamFramer extracts {...} bodies with overflow protection.
- messages/decoder: A/B/C/I/D/E/F/G/H message types.
- encoder: refresh/reset/clock/clear/remove/add + SendSetting (scaled/string/IP).
- 33 passing tests with vectors derived from the VB6 source.
- Station clock epoch + sensor type/calc codes FLAGGED for hardware verification.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-01 17:08:38 -04:00
parent df8646a74b
commit ae159c1e80
13 changed files with 774 additions and 0 deletions

3
da12_service/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""DA-12 Monitoring Station Service Tool — modern rebuild."""
__version__ = "3.0.0"

View File

@@ -0,0 +1,5 @@
"""Pure DA-12 serial protocol: framing, codecs, decoder, encoder.
This package contains NO I/O. Everything here is deterministic and unit-tested
so the reverse-engineered wire format can be validated without hardware.
"""

View File

@@ -0,0 +1,164 @@
"""Pure hex / scaling / time codecs mirroring the VB6 wire conventions.
Key facts derived from the legacy source (Main.frm / Support.bas):
* ``CInt("&H"+s)`` parses a **signed 16-bit** integer; ``CLng("&H"+s)`` parses a
**signed 32-bit** integer. So a 4-hex field ``FFFF`` == -1 and ``8000`` == -32768.
* ``8000`` (or an empty field) is the "no value" sentinel; the outbound empty
payload is ``FFFF8000``.
* Stored values are fixed-point: ``stored = round(real * 10**fmt)``.
* The station clock behaves like C ``time_t`` (seconds since 1970, local time).
This is FLAGGED for hardware verification (see docs/HARDWARE-VERIFICATION.md).
"""
from __future__ import annotations
from datetime import datetime
# --------------------------------------------------------------------------- #
# Fixed-width hex encoders (VB Hex2/Hex4/Hex8)
# --------------------------------------------------------------------------- #
def to_hex2(value: int) -> str:
return format(value & 0xFF, "02X")
def to_hex4(value: int) -> str:
return format(value & 0xFFFF, "04X")
def to_hex8(value: int) -> str:
return format(value & 0xFFFFFFFF, "08X")
# --------------------------------------------------------------------------- #
# Signed hex decoders (VB CInt / CLng on "&H..." strings)
# --------------------------------------------------------------------------- #
def hex_signed16(s: str) -> int:
v = int(s, 16) & 0xFFFF
return v - 0x10000 if v & 0x8000 else v
def hex_signed32(s: str) -> int:
v = int(s, 16) & 0xFFFFFFFF
return v - 0x100000000 if v & 0x80000000 else v
# --------------------------------------------------------------------------- #
# Fixed-point scaling
# --------------------------------------------------------------------------- #
def decode_scaled(s: str, pow10: int) -> float | None:
"""Decode a fixed-point hex field to a float, honoring the NaN sentinel.
Fields up to 4 hex chars are signed-16; wider fields are signed-32, matching
the legacy ``AddValue`` / ``AddFloat`` behavior.
"""
s = s.strip()
if s == "" or s == "8000":
return None
raw = hex_signed16(s) if len(s) <= 4 else hex_signed32(s)
return raw / (10**pow10)
def encode_scaled(value: float, pow10: int) -> str:
"""Encode a float as an 8-hex fixed-point value (VB SendSetting fmt 0-6)."""
return to_hex8(int(value * (10**pow10)))
# --------------------------------------------------------------------------- #
# Alarm bitmask (VB AddAlarm) — precedence ER > HA > LA > LW > HW
# --------------------------------------------------------------------------- #
_ALARM_BITS = ((0x80, "ER"), (0x08, "HA"), (0x01, "LA"), (0x02, "LW"), (0x04, "HW"))
def decode_alarm(byte: int) -> str:
for mask, label in _ALARM_BITS:
if byte & mask:
return label
return "OK"
# --------------------------------------------------------------------------- #
# Duration / time helpers
# --------------------------------------------------------------------------- #
def decode_minutes(s: str) -> str:
"""Seconds (hex) -> 'MM:SS' (VB AddMinutes)."""
s = s.strip()
if not s:
return ""
n = int(s, 16)
m, sec = divmod(n, 60)
return f"{m:02d}:{sec:02d}"
def decode_clock_time(s: str) -> str:
"""Station clock long (hex8, Unix seconds) -> 'HH:MM:SS' local. FLAGGED."""
s = s.strip()
if not s:
return ""
try:
dt = datetime.fromtimestamp(int(s, 16) & 0xFFFFFFFF)
except (ValueError, OverflowError, OSError):
return ""
return dt.strftime("%H:%M:%S")
def decode_clock_datetime(s: str) -> str:
"""Station clock long (hex8) -> 'MM/DD/YY HH:MM:SS' local. FLAGGED."""
s = s.strip()
if not s:
return ""
try:
dt = datetime.fromtimestamp(int(s, 16) & 0xFFFFFFFF)
except (ValueError, OverflowError, OSError):
return ""
return dt.strftime("%m/%d/%y %H:%M:%S")
def station_time_hex8(dt: datetime) -> str:
"""datetime -> hex8 Unix seconds for the {T...} set-clock command. FLAGGED."""
return to_hex8(int(dt.timestamp()))
# --------------------------------------------------------------------------- #
# FieldReader — consuming reader for tab-delimited message bodies
# (VB GetChar / GetNext). The type letter is adjacent to the first field;
# tabs separate the remaining fields: "<T><field0>\t<field1>\t..."
# --------------------------------------------------------------------------- #
class FieldReader:
def __init__(self, body: str) -> None:
self._s = body
def char(self) -> str:
if not self._s:
return ""
ch, self._s = self._s[0], self._s[1:]
return ch
def next(self) -> str:
if "\t" not in self._s:
out, self._s = self._s, ""
return out
out, _, self._s = self._s.partition("\t")
return out
def next_bytes(self, count: int) -> str:
"""Consume exactly ``count`` chars (VB GetHexByte uses 2-char reads)."""
out, self._s = self._s[:count], self._s[count:]
return out
def remaining(self) -> str:
return self._s
def __len__(self) -> int:
return len(self._s)

View File

@@ -0,0 +1,152 @@
"""Decode a station message body (no braces) into a structured message.
Field orders mirror the legacy ``UpdateService`` (Main.frm). The wire format is
``<TypeLetter><field0>\\t<field1>\\t...`` — the type letter is adjacent to the
first field; tabs separate the remaining fields.
"""
from __future__ import annotations
from . import codecs as c
from . import messages as m
def _byte(reader: c.FieldReader) -> int:
"""Read a tab field as a byte (VB GetByte: CInt('&H'+s) And &HFF)."""
s = reader.next().strip()
if not s:
return 0
try:
return int(s, 16) & 0xFF
except ValueError:
return 0
def _decode_by_type(s: str, type_code: int) -> str:
"""Format a station-setting value per its type code (VB AddByType)."""
s = s.strip()
if type_code in (0,):
try:
return str(c.hex_signed16(s)) if s else ""
except ValueError:
return s
if 1 <= type_code <= 6:
v = c.decode_scaled(s, type_code)
return "" if v is None else f"{v:.{type_code}f}"
if type_code == 8: # IP: 4 octets, each 2 hex -> decimal
return ".".join(str(int(s[i : i + 2], 16)) for i in range(0, min(len(s), 8), 2))
if type_code == 9: # MAC: 6 octets joined with '-'
return "-".join(s[i : i + 2] for i in range(0, min(len(s), 12), 2)).upper()
if type_code == 10: # version: 'maj.min'
return ".".join(str(int(s[i : i + 2], 16)) for i in range(0, min(len(s), 4), 2))
# type 7 (string) and anything else: raw
return s
def decode(body: str) -> m.Message | None:
"""Decode a single message body. Returns None for unknown/blank types."""
if not body:
return None
reader = c.FieldReader(body)
t = reader.char()
if t == "A":
sid = _byte(reader)
serial = reader.next().strip()
name = reader.next().strip()
type_text = reader.next().strip()
scale = c.decode_scaled(reader.next(), 4)
offset = c.decode_scaled(reader.next(), 3)
time = c.decode_clock_time(reader.next())
average = c.decode_scaled(reader.next(), 1)
value = c.decode_scaled(reader.next(), 1)
alarm = c.decode_alarm(_byte(reader))
disp = _byte(reader)
dp = _byte(reader)
calc = _byte(reader)
return m.SensorRecord(
sid, serial, name, type_text, scale, offset, time,
average, value, alarm, disp, dp, calc,
)
if t == "B":
sid = _byte(reader)
time = c.decode_clock_time(reader.next())
average = c.decode_scaled(reader.next(), 1)
alarm = c.decode_alarm(_byte(reader))
return m.AverageUpdate(sid, time, average, alarm)
if t == "C":
sid = _byte(reader)
return m.CurrentValue(sid, c.decode_scaled(reader.next(), 1))
if t == "I":
sid = _byte(reader)
return m.CurrentValue(sid, c.decode_scaled(reader.next(), 3))
if t in ("D", "E"):
row = _byte(reader)
label = reader.next().strip()
type_code = _byte(reader)
value = _decode_by_type(reader.next(), type_code)
text = f"{label}\t{value}" if value else label
return m.SettingLine(row, text, type_code if t == "E" else -1)
if t == "F":
return _decode_status(reader)
if t == "G":
sid = _byte(reader)
enable = _byte(reader)
delay = _byte(reader)
lo_alarm = c.decode_scaled(reader.next(), 1)
lo_warn = c.decode_scaled(reader.next(), 1)
hi_warn = c.decode_scaled(reader.next(), 1)
hi_alarm = c.decode_scaled(reader.next(), 1)
return m.AlarmLimits(sid, enable, delay, lo_alarm, lo_warn, hi_warn, hi_alarm)
if t == "H":
sid = _byte(reader)
maximum = c.decode_scaled(reader.next(), 1)
max_time = c.decode_minutes(reader.next())
minimum = c.decode_scaled(reader.next(), 1)
min_time = c.decode_minutes(reader.next())
average = c.decode_scaled(reader.next(), 1)
calculated = c.decode_scaled(reader.next(), 1)
return m.Statistics(sid, maximum, max_time, minimum, min_time, average, calculated)
return None
def _decode_status(reader: c.FieldReader) -> m.StationStatus:
"""Decode the 'F' status frame. Counter/clock layout is FLAGGED for
hardware verification (legacy mixes 2-char byte reads with tab fields)."""
counters: list[int] = []
for _ in range(8):
b = reader.next_bytes(2)
if len(b) < 2:
break
try:
counters.append(int(b, 16))
except ValueError:
break
activity: int | None = None
ab = reader.next_bytes(2)
if len(ab) == 2:
try:
activity = int(ab, 16)
except ValueError:
activity = None
station_time: str | None = None
buffered: int | None = None
long_field = reader.next().strip()
if long_field:
station_time = c.decode_clock_datetime(long_field) or None
tail = reader.next_bytes(4)
if len(tail) == 4:
try:
lo, hi = int(tail[0:2], 16), int(tail[2:4], 16)
buffered = lo + hi * 256
except ValueError:
buffered = None
return m.StationStatus(counters, activity, station_time, buffered)

View File

@@ -0,0 +1,71 @@
"""Build outbound command strings (including braces) for the station.
Mirrors the legacy ``SendCommand`` / ``SendSetting`` (Main.frm). Every function
returns a ready-to-write string like ``"{Y}"``.
"""
from __future__ import annotations
from datetime import datetime
from . import codecs as c
EMPTY_PAYLOAD = "FFFF8000"
def _wrap(body: str) -> str:
return "{" + body + "}"
def refresh_all() -> str:
return _wrap("Y")
def refresh_stats() -> str:
return _wrap("X")
def station_reset() -> str:
return _wrap("V")
def set_clock(dt: datetime | None = None) -> str:
dt = dt or datetime.now()
return _wrap("T000" + c.station_time_hex8(dt))
def clear_average(sensor_id: int) -> str:
return _wrap("N0" + c.to_hex2(sensor_id))
def clear_stats(sensor_id: int) -> str:
return _wrap("O0" + c.to_hex2(sensor_id))
def remove_sensor(sensor_id: int) -> str:
return _wrap("R0" + c.to_hex2(sensor_id))
def add_sensors(serials: list[str]) -> list[str]:
return [_wrap("S700" + s.strip()) for s in serials if s.strip()]
def set_setting(letter: str, sensor_id: int, value: str, fmt: int) -> str:
"""Build a settings command: ``{<letter><fmt><id><payload>}``.
fmt 0-6 -> fixed-point scaled hex8; fmt 7 -> raw string; fmt 8 -> dotted IP.
An empty value sends the no-value payload.
"""
value = "" if value is None else str(value)
if value.strip() == "":
payload = EMPTY_PAYLOAD
elif 0 <= fmt <= 6:
payload = c.encode_scaled(float(value), fmt)
elif fmt == 7:
payload = value.strip()
elif fmt == 8:
octets = value.strip().split(".")
payload = "".join(c.to_hex2(int(o)) for o in octets)
else:
payload = value.strip()
return _wrap(letter + format(fmt, "X") + c.to_hex2(sensor_id) + payload)

View File

@@ -0,0 +1,41 @@
"""StreamFramer: extract complete ``{...}`` message bodies from a byte/char stream.
Mirrors the message-extraction half of the legacy ``MSComm1_OnComm``: the station
emits ``{`` ... ``}`` messages on a CR-terminated stream. We return the body
between each matched pair of braces (the type letter + fields). OP-05 (non-braced)
and debug (Chr 17/18) handling from the original are intentionally dropped.
"""
from __future__ import annotations
class StreamFramer:
"""Accumulates incoming text and yields complete ``{...}`` message bodies."""
def __init__(self, max_buffer: int = 4096) -> None:
self._buf = ""
self._max = max_buffer
def feed(self, data: str) -> list[str]:
self._buf += data
out: list[str] = []
while True:
i = self._buf.find("{")
if i < 0:
# No open brace pending; don't let noise grow unbounded.
if len(self._buf) > self._max:
self._buf = self._buf[-self._max :]
break
j = self._buf.find("}", i + 1)
if j < 0:
# Incomplete frame: keep from the open brace onward, cap runaway.
self._buf = self._buf[i:]
if len(self._buf) > self._max:
self._buf = self._buf[-self._max :]
break
out.append(self._buf[i + 1 : j])
self._buf = self._buf[j + 1 :]
return out
def reset(self) -> None:
self._buf = ""

View File

@@ -0,0 +1,89 @@
"""Structured message types decoded from station ``{...}`` bodies.
Field semantics follow the legacy ``UpdateService`` (Main.frm). ``None`` for a
numeric field means the station sent the no-value sentinel.
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class SensorRecord: # type 'A' — full sensor record
id: int
serial: str
name: str
type_text: str # raw Type field (string in legacy; exact code meaning FLAGGED)
scale: float | None # /10000
offset: float | None # /1000
time: str # 'HH:MM:SS' (station clock; FLAGGED)
average: float | None # /10
value: float | None # current scaled value, /10
alarm: str # OK/LA/LW/HW/HA/ER
disp: int
dp: int
calc: int
@dataclass
class AverageUpdate: # type 'B'
id: int
time: str
average: float | None
alarm: str
@dataclass
class CurrentValue: # types 'C' (/10) and 'I' (/1000)
id: int
value: float | None
@dataclass
class SettingLine: # types 'D' (type_code=-1) and 'E'
row: int
text: str
type_code: int
@dataclass
class StationStatus: # type 'F'
counters: list[int] = field(default_factory=list)
activity: int | None = None
station_time: str | None = None # 'MM/DD/YY HH:MM:SS' (FLAGGED)
buffered: int | None = None
@dataclass
class AlarmLimits: # type 'G'
id: int
enable: int
delay: int
lo_alarm: float | None
lo_warn: float | None
hi_warn: float | None
hi_alarm: float | None
@dataclass
class Statistics: # type 'H'
id: int
maximum: float | None
max_time: str
minimum: float | None
min_time: str
average: float | None
calculated: float | None
# Union of everything decode() can return.
Message = (
SensorRecord
| AverageUpdate
| CurrentValue
| SettingLine
| StationStatus
| AlarmLimits
| Statistics
)

26
pyproject.toml Normal file
View File

@@ -0,0 +1,26 @@
[project]
name = "da12-service"
version = "3.0.0"
description = "DA-12 Monitoring Station Service Tool (modern rebuild)"
requires-python = ">=3.11"
dependencies = ["PySide6>=6.7", "pyserial>=3.5"]
[project.optional-dependencies]
dev = ["pytest>=8", "pytest-qt>=4.4", "ruff>=0.6", "pyinstaller>=6.10"]
[project.scripts]
da12-service = "da12_service.app:main"
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
include = ["da12_service*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
qt_api = "pyside6"
[tool.ruff]
line-length = 100

0
tests/__init__.py Normal file
View File

66
tests/test_codecs.py Normal file
View File

@@ -0,0 +1,66 @@
from da12_service.protocol import codecs as c
def test_to_hex_widths():
assert c.to_hex2(0) == "00"
assert c.to_hex2(255) == "FF"
assert c.to_hex2(256) == "00" # wraps to a byte
assert c.to_hex4(1) == "0001"
assert c.to_hex8(0xABCD) == "0000ABCD"
def test_hex_signed16():
assert c.hex_signed16("FFFF") == -1
assert c.hex_signed16("8000") == -32768
assert c.hex_signed16("0064") == 100
def test_hex_signed32():
assert c.hex_signed32("FFFFFFFF") == -1
assert c.hex_signed32("000003E8") == 1000
def test_decode_scaled_sentinel_and_value():
assert c.decode_scaled("8000", 1) is None
assert c.decode_scaled("", 1) is None
assert c.decode_scaled("0064", 1) == 10.0 # 100 / 10
assert c.decode_scaled("000003E8", 3) == 1.0 # 1000 / 1000 (32-bit path)
assert c.decode_scaled("FFFF", 1) == -0.1 # signed-16: -1 / 10
def test_encode_scaled():
assert c.encode_scaled(10.0, 1) == "00000064" # 10*10 = 100 = 0x64
assert c.encode_scaled(1.0, 4) == "00002710" # 1*10000 = 10000 = 0x2710
def test_decode_alarm():
assert c.decode_alarm(0x00) == "OK"
assert c.decode_alarm(0x80) == "ER"
assert c.decode_alarm(0x08) == "HA"
assert c.decode_alarm(0x01) == "LA"
assert c.decode_alarm(0x02) == "LW"
assert c.decode_alarm(0x04) == "HW"
assert c.decode_alarm(0x88) == "ER" # ER (0x80) wins over HA (0x08)
assert c.decode_alarm(0x09) == "HA" # HA (0x08) wins over LA (0x01)
def test_decode_minutes():
assert c.decode_minutes("003C") == "01:00" # 60s
assert c.decode_minutes("0000") == "00:00"
assert c.decode_minutes("") == ""
def test_field_reader_adjacent_first_field():
# Wire format: type letter is adjacent to field0; tabs separate the rest.
r = c.FieldReader("A0064\thello")
assert r.char() == "A"
assert r.next() == "0064"
assert r.next() == "hello"
assert r.next() == "" # exhausted
def test_field_reader_next_bytes():
r = c.FieldReader("0001020304")
assert r.next_bytes(2) == "00"
assert r.next_bytes(2) == "01"
assert r.remaining() == "020304"

45
tests/test_encoder.py Normal file
View File

@@ -0,0 +1,45 @@
from datetime import datetime
from da12_service.protocol import encoder as e
def test_simple_commands():
assert e.refresh_all() == "{Y}"
assert e.refresh_stats() == "{X}"
assert e.station_reset() == "{V}"
def test_per_sensor_commands():
assert e.clear_average(5) == "{N005}"
assert e.clear_stats(5) == "{O005}"
assert e.remove_sensor(16) == "{R010}" # hex2(16) == '10'
def test_add_sensors():
assert e.add_sensors(["ABC", "DEF"]) == ["{S700ABC}", "{S700DEF}"]
assert e.add_sensors(["", " "]) == [] # blanks skipped
def test_set_setting_scaled():
# scale = letter C, fmt 4 (x10000), id 3, value 1.0
assert e.set_setting("C", 3, "1.0", 4) == "{C40300002710}"
def test_set_setting_offset():
# offset = letter D, fmt 3 (x1000), id 3, value 1.0
assert e.set_setting("D", 3, "1.0", 3) == "{D303000003E8}"
def test_set_setting_string_and_empty():
assert e.set_setting("B", 3, "Name", 7) == "{B703Name}"
assert e.set_setting("C", 3, "", 4) == "{C403FFFF8000}"
def test_set_setting_ip():
assert e.set_setting("A", 0, "192.168.1.10", 8) == "{A800C0A8010A}"
def test_set_clock_shape():
cmd = e.set_clock(datetime(2020, 1, 1, 12, 0, 0))
assert cmd.startswith("{T000") and cmd.endswith("}")
assert len(cmd) == 14 # '{T000' + 8 hex + '}'

34
tests/test_framing.py Normal file
View File

@@ -0,0 +1,34 @@
from da12_service.protocol.framing import StreamFramer
def test_single_frame():
f = StreamFramer()
assert f.feed("{Y}\r") == ["Y"]
def test_split_frame_across_feeds():
f = StreamFramer()
assert f.feed("{A0") == []
assert f.feed("1\tfoo}\r") == ["A01\tfoo"]
def test_multiple_frames_one_feed():
f = StreamFramer()
assert f.feed("{C051F}\r{C0620}\r") == ["C051F", "C0620"]
def test_ignores_noise_between_frames():
f = StreamFramer()
assert f.feed("garbage{B051F}\rmore") == ["B051F"]
def test_buffer_cap_resets_on_overflow():
f = StreamFramer(max_buffer=8)
assert f.feed("{" + "x" * 20) == [] # never closed
assert len(f._buf) <= 8
def test_noise_only_does_not_grow_unbounded():
f = StreamFramer(max_buffer=8)
assert f.feed("z" * 50) == []
assert len(f._buf) <= 8

View File

@@ -0,0 +1,78 @@
from da12_service.protocol import messages as m
from da12_service.protocol.decoder import decode
def test_decode_current_value_C():
msg = decode("C05\t0064")
assert isinstance(msg, m.CurrentValue)
assert msg.id == 5 and msg.value == 10.0
def test_decode_current_value_I_scaled():
msg = decode("I05\t000003E8")
assert isinstance(msg, m.CurrentValue)
assert msg.value == 1.0
def test_decode_alarm_limits_G():
msg = decode("G02\t01\t0A\t0032\t0064\t00C8\t012C")
assert isinstance(msg, m.AlarmLimits)
assert msg.id == 2 and msg.enable == 1 and msg.delay == 10
assert msg.lo_alarm == 5.0 and msg.lo_warn == 10.0
assert msg.hi_warn == 20.0 and msg.hi_alarm == 30.0
def test_decode_statistics_H():
msg = decode("H03\t00C8\t003C\t000A\t0078\t0064\t0064")
assert isinstance(msg, m.Statistics)
assert msg.id == 3
assert msg.maximum == 20.0 and msg.max_time == "01:00"
assert msg.minimum == 1.0 and msg.min_time == "02:00"
assert msg.average == 10.0 and msg.calculated == 10.0
def test_decode_sensor_record_A():
body = (
"A01\tABC123\tTemp\t01\t00002710\t000003E8\t"
"1A2B3C4D\t0064\t0065\t00\t01\t02\t00"
)
msg = decode(body)
assert isinstance(msg, m.SensorRecord)
assert msg.id == 1 and msg.serial == "ABC123" and msg.name == "Temp"
assert msg.type_text == "01"
assert msg.scale == 1.0 and msg.offset == 1.0
assert msg.average == 10.0 and msg.value == 10.1
assert msg.alarm == "OK"
assert msg.disp == 1 and msg.dp == 2 and msg.calc == 0
def test_decode_average_update_B():
msg = decode("B07\t1A2B3C4D\t00C8\t04")
assert isinstance(msg, m.AverageUpdate)
assert msg.id == 7 and msg.average == 20.0 and msg.alarm == "HW"
def test_decode_setting_line_E_editable():
msg = decode("E02\tSetpoint\t01\t00C8")
assert isinstance(msg, m.SettingLine)
assert msg.row == 2 and msg.type_code == 1
assert msg.text == "Setpoint\t20.0"
def test_decode_setting_line_D_readonly():
msg = decode("D02\tFirmware\t0A\t0103")
assert isinstance(msg, m.SettingLine)
assert msg.type_code == -1 # 'D' is read-only
assert msg.text == "Firmware\t1.3" # version type formats as maj.min
def test_decode_status_F_counters():
msg = decode("F000102030405060708")
assert isinstance(msg, m.StationStatus)
assert msg.counters == [0, 1, 2, 3, 4, 5, 6, 7]
assert msg.activity == 8
def test_decode_unknown_returns_none():
assert decode("Z00") is None
assert decode("") is None