Files
cimtechniques-service-suite/tests/test_simulator_integration.py
2026-06-05 09:22:08 -04:00

155 lines
4.7 KiB
Python

from cim_suite.modules.da12.protocol import messages as m
from cim_suite.modules.da12.protocol.decoder import decode
from cim_suite.modules.da12.protocol.framing import StreamFramer
from cim_suite.modules.da12.transport.simulator import SimulatedStation
def _bodies(chunk: str) -> list[str]:
out, buf, inframe = [], [], False
for ch in chunk:
if ch == "{":
buf, inframe = [], True
elif ch == "}" and inframe:
out.append("".join(buf))
inframe = False
elif inframe:
buf.append(ch)
return out
def _sink(sim):
"""Wire the simulator's output through a framer and collect decoded messages."""
framer = StreamFramer()
msgs = []
def on_data(text):
for body in framer.feed(text):
msg = decode(body)
if msg is not None:
msgs.append(msg)
sim.data_callback = on_data
return msgs
def test_refresh_emits_one_record_per_sensor():
sim = SimulatedStation(sensors=2)
msgs = _sink(sim)
sim.start()
sim.write("{Y}")
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
assert len(records) == 2
assert {r.id for r in records} == {1, 2}
def test_refresh_emits_limits_and_stats_and_status():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.write("{Y}")
assert any(isinstance(x, m.AlarmLimits) for x in msgs)
assert any(isinstance(x, m.Statistics) for x in msgs)
assert any(isinstance(x, m.StationStatus) for x in msgs)
def test_set_name_roundtrip():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.write("{B701Hello}") # set name of sensor 1
sim.write("{Y}")
rec = [x for x in msgs if isinstance(x, m.SensorRecord)][-1]
assert rec.name == "Hello"
def test_set_scale_roundtrip():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.write("{C40100002710}") # scale = 1.0 (x10000) for sensor 1
sim.write("{Y}")
rec = [x for x in msgs if isinstance(x, m.SensorRecord)][-1]
assert rec.scale == 1.0
def test_remove_sensor():
sim = SimulatedStation(sensors=3)
msgs = _sink(sim)
sim.start()
sim.write("{R002}") # remove sensor 2
sim.write("{Y}")
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
assert {r.id for r in records} == {1, 3}
def test_add_sensor_emits_record():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.write("{S700ABCDEF}")
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
assert any(r.serial == "ABCDEF" for r in records)
def test_tick_emits_current_values():
sim = SimulatedStation(sensors=2)
msgs = _sink(sim)
sim.start()
sim.tick()
currents = [x for x in msgs if isinstance(x, m.CurrentValue)]
assert len(currents) == 2
def test_status_frame_reflects_server_connected_flag():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.server_connected = True
sim._emit_status()
sim.server_connected = False
sim._emit_status()
statuses = [x for x in msgs if isinstance(x, m.StationStatus)]
assert len(statuses) == 2
assert statuses[0].incoming == 1
assert statuses[1].incoming == 0
# disconnected frame reports enough minutes-since-last to trip any timeout
assert (statuses[1].time_since_last_min or 0) * 60 > 120
def test_tick_streams_status_frame():
sim = SimulatedStation(sensors=1)
msgs = _sink(sim)
sim.start()
sim.tick()
assert any(isinstance(x, m.StationStatus) for x in msgs)
def test_status_frame_carries_named_health_fields():
sim = SimulatedStation(sensors=3)
msgs = _sink(sim)
sim.start()
msgs.clear()
sim.tick() # each tick emits exactly one {F}; advances comm_activity + drains backlog
sim.tick()
statuses = [x for x in msgs if isinstance(x, m.StationStatus)]
assert len(statuses) == 2
assert statuses[0].active_sensors == 3
assert statuses[0].record_count is not None
# comm_activity is free-running: it must advance between the two frames.
assert statuses[1].comm_activity != statuses[0].comm_activity
def test_simulator_replies_to_request_history_with_J():
captured: list[str] = []
station = SimulatedStation(sensors=3)
station.data_callback = captured.append
station.start()
station.write("{Y}") # ensure sensor 1 exists
captured.clear()
station.write("{c001000001F4}") # request 500 records for sensor 1
bodies = [b for chunk in captured for b in _bodies(chunk)]
plots = [decode(b) for b in bodies if b and b[0] == "J"]
assert plots and isinstance(plots[0], m.PlotData)
assert plots[0].points # non-empty synthesized history