155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
from cim_suite.modules.da12.protocol import messages as m
|
|
from cim_suite.modules.da12.protocol.decoder import decode
|
|
from cim_suite.modules.da12.protocol.framing import StreamFramer
|
|
from cim_suite.modules.da12.transport.simulator import SimulatedStation
|
|
|
|
|
|
def _bodies(chunk: str) -> list[str]:
|
|
out, buf, inframe = [], [], False
|
|
for ch in chunk:
|
|
if ch == "{":
|
|
buf, inframe = [], True
|
|
elif ch == "}" and inframe:
|
|
out.append("".join(buf))
|
|
inframe = False
|
|
elif inframe:
|
|
buf.append(ch)
|
|
return out
|
|
|
|
|
|
def _sink(sim):
|
|
"""Wire the simulator's output through a framer and collect decoded messages."""
|
|
framer = StreamFramer()
|
|
msgs = []
|
|
|
|
def on_data(text):
|
|
for body in framer.feed(text):
|
|
msg = decode(body)
|
|
if msg is not None:
|
|
msgs.append(msg)
|
|
|
|
sim.data_callback = on_data
|
|
return msgs
|
|
|
|
|
|
def test_refresh_emits_one_record_per_sensor():
|
|
sim = SimulatedStation(sensors=2)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{Y}")
|
|
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
|
|
assert len(records) == 2
|
|
assert {r.id for r in records} == {1, 2}
|
|
|
|
|
|
def test_refresh_emits_limits_and_stats_and_status():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{Y}")
|
|
assert any(isinstance(x, m.AlarmLimits) for x in msgs)
|
|
assert any(isinstance(x, m.Statistics) for x in msgs)
|
|
assert any(isinstance(x, m.StationStatus) for x in msgs)
|
|
|
|
|
|
def test_set_name_roundtrip():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{B701Hello}") # set name of sensor 1
|
|
sim.write("{Y}")
|
|
rec = [x for x in msgs if isinstance(x, m.SensorRecord)][-1]
|
|
assert rec.name == "Hello"
|
|
|
|
|
|
def test_set_scale_roundtrip():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{C40100002710}") # scale = 1.0 (x10000) for sensor 1
|
|
sim.write("{Y}")
|
|
rec = [x for x in msgs if isinstance(x, m.SensorRecord)][-1]
|
|
assert rec.scale == 1.0
|
|
|
|
|
|
def test_remove_sensor():
|
|
sim = SimulatedStation(sensors=3)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{R002}") # remove sensor 2
|
|
sim.write("{Y}")
|
|
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
|
|
assert {r.id for r in records} == {1, 3}
|
|
|
|
|
|
def test_add_sensor_emits_record():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.write("{S700ABCDEF}")
|
|
records = [x for x in msgs if isinstance(x, m.SensorRecord)]
|
|
assert any(r.serial == "ABCDEF" for r in records)
|
|
|
|
|
|
def test_tick_emits_current_values():
|
|
sim = SimulatedStation(sensors=2)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.tick()
|
|
currents = [x for x in msgs if isinstance(x, m.CurrentValue)]
|
|
assert len(currents) == 2
|
|
|
|
|
|
def test_status_frame_reflects_server_connected_flag():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.server_connected = True
|
|
sim._emit_status()
|
|
sim.server_connected = False
|
|
sim._emit_status()
|
|
statuses = [x for x in msgs if isinstance(x, m.StationStatus)]
|
|
assert len(statuses) == 2
|
|
assert statuses[0].incoming == 1
|
|
assert statuses[1].incoming == 0
|
|
# disconnected frame reports enough minutes-since-last to trip any timeout
|
|
assert (statuses[1].time_since_last_min or 0) * 60 > 120
|
|
|
|
|
|
def test_tick_streams_status_frame():
|
|
sim = SimulatedStation(sensors=1)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
sim.tick()
|
|
assert any(isinstance(x, m.StationStatus) for x in msgs)
|
|
|
|
|
|
def test_status_frame_carries_named_health_fields():
|
|
sim = SimulatedStation(sensors=3)
|
|
msgs = _sink(sim)
|
|
sim.start()
|
|
msgs.clear()
|
|
sim.tick() # each tick emits exactly one {F}; advances comm_activity + drains backlog
|
|
sim.tick()
|
|
statuses = [x for x in msgs if isinstance(x, m.StationStatus)]
|
|
assert len(statuses) == 2
|
|
assert statuses[0].active_sensors == 3
|
|
assert statuses[0].record_count is not None
|
|
# comm_activity is free-running: it must advance between the two frames.
|
|
assert statuses[1].comm_activity != statuses[0].comm_activity
|
|
|
|
|
|
def test_simulator_replies_to_request_history_with_J():
|
|
captured: list[str] = []
|
|
station = SimulatedStation(sensors=3)
|
|
station.data_callback = captured.append
|
|
station.start()
|
|
station.write("{Y}") # ensure sensor 1 exists
|
|
captured.clear()
|
|
station.write("{c001000001F4}") # request 500 records for sensor 1
|
|
|
|
bodies = [b for chunk in captured for b in _bodies(chunk)]
|
|
plots = [decode(b) for b in bodies if b and b[0] == "J"]
|
|
assert plots and isinstance(plots[0], m.PlotData)
|
|
assert plots[0].points # non-empty synthesized history
|