fix(da12): {J} decode drops empty/bad timestamps; rename _count_long (review)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-05 09:03:24 -04:00
parent 8e110df87b
commit 5f5fb211c0
2 changed files with 15 additions and 4 deletions

View File

@@ -22,7 +22,7 @@ def _byte(reader: c.FieldReader) -> int:
return 0
def _byte_long(s: str) -> int:
def _count_long(s: str) -> int:
"""Parse an unsigned big-endian count field (the {J} record count)."""
s = s.strip()
if not s:
@@ -132,7 +132,7 @@ def decode(body: str) -> m.Message | None:
# tenths (signed). FLAGGED: whether a large reply arrives as one {J} or
# several frames is hardware-unverified (docs/HARDWARE-VERIFICATION.md);
# we decode one frame = one batch and accumulate in the domain layer.
count = _byte_long(reader.next())
count = _count_long(reader.next())
points: list[m.PlotPoint] = []
while len(reader):
ts = reader.next().strip()
@@ -140,9 +140,11 @@ def decode(body: str) -> m.Message | None:
break # lone trailing timestamp with no value -> drop
val = c.decode_scaled(reader.next(), 1)
try:
timestamp = int(ts, 16) if ts else 0
timestamp = int(ts, 16) if ts else None
except ValueError:
continue
timestamp = None
if timestamp is None:
continue # empty/unparseable timestamp -> drop the point
points.append(m.PlotPoint(timestamp=timestamp, value=val))
return m.PlotData(count=count, points=points)

View File

@@ -114,6 +114,7 @@ def test_decode_plot_data_two_points():
assert len(msg.points) == 2
assert msg.points[0].timestamp == 0x60D54E80
assert abs(msg.points[0].value - 31.0) < 1e-9 # 0x0136 = 310 -> 31.0
assert msg.points[1].timestamp == 0x60D54EBC
assert abs(msg.points[1].value - 31.5) < 1e-9 # 0x013B = 315 -> 31.5
@@ -137,6 +138,14 @@ def test_decode_plot_data_truncated_pair_ignored():
assert len(msg.points) == 1
def test_decode_plot_data_empty_timestamp_dropped():
# An empty timestamp field (still followed by a value) drops that point
# rather than emitting a 1970 epoch-0 point.
msg = decode("J00000001\t\t0136")
assert isinstance(msg, m.PlotData)
assert msg.points == []
def test_decode_plot_data_long_frame_under_buffer():
# A long (but <4096) J frame decodes fully (framer cap is 4096).
pairs = "".join("\t60D54E80\t0136" for _ in range(100))