Python project for analyzing alarm data from building monitoring systems. Includes alarm analyzer, plotting, tests, and source data files.
144 lines
6.7 KiB
Python
144 lines
6.7 KiB
Python
#!/usr/bin/env python
|
|
# Test script to verify enhanced plotting functionality without creating actual plots
|
|
|
|
from alarm_analyzer import AlarmAnalyzer
|
|
import pandas as pd
|
|
|
|
def test_enhanced_plotting():
|
|
print("Testing enhanced plotting functionality...")
|
|
|
|
# Create analyzer instance
|
|
analyzer = AlarmAnalyzer('CardinalAlarmsDec25.csv', 'SensorReport Cardinal 2025-12-23_processed.xlsx')
|
|
|
|
print("Loading data...")
|
|
# Load data
|
|
alarm_data, sensor_data = analyzer.load_data()
|
|
print(f"Loaded {len(alarm_data)} alarm records")
|
|
|
|
if analyzer.sensor_mapping:
|
|
print(f"Created sensor mapping for {len(analyzer.sensor_mapping)} sensors")
|
|
else:
|
|
print("No sensor mapping created - sensor report may not have been processed correctly")
|
|
|
|
print("Categorizing alarms...")
|
|
# Categorize alarms
|
|
categorized_data = analyzer.categorize_alarms()
|
|
|
|
print("Pairing events and calculating durations...")
|
|
# Pair events and calculate durations
|
|
paired_events = analyzer.pair_events_and_calculate_durations()
|
|
|
|
# Test the sensor name mapping logic without creating plots
|
|
print("\n--- TESTING ENHANCED PLOTTING LOGIC ---")
|
|
|
|
# Filter resolved events for testing
|
|
duration_events = analyzer.processed_events[analyzer.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) == 0:
|
|
print("No resolved events with duration data available for testing.")
|
|
return
|
|
|
|
# Extract time components for time-based analysis
|
|
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
|
|
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
|
|
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
|
|
|
|
print("\nTesting sensor name mapping for top sensors by alarm count...")
|
|
# Top 10 sensors by alarm count - with sensor names instead of IDs
|
|
top_sensors = duration_events['Sensor_Id'].value_counts().head(10)
|
|
sensor_names_for_plot = []
|
|
for sensor_id in top_sensors.index:
|
|
sensor_info = analyzer.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot.append(f"{sensor_name}\n({sensor_group})")
|
|
|
|
print("Sample of enhanced sensor labels for plotting:")
|
|
for i, (sensor_id, count) in enumerate(top_sensors.head(5).items()):
|
|
print(f" {sensor_names_for_plot[i]}: {count} alarms")
|
|
|
|
print("\nTesting sensor name mapping for average duration...")
|
|
# Top 10 sensors by average duration - with sensor names instead of IDs
|
|
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
|
|
sensor_names_for_plot_avg = []
|
|
for sensor_id in avg_duration_by_sensor.index:
|
|
sensor_info = analyzer.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot_avg.append(f"{sensor_name} (Group: {sensor_group})")
|
|
|
|
print("Sample of enhanced sensor labels for average duration plotting:")
|
|
for i, (sensor_id, avg_duration) in enumerate(avg_duration_by_sensor.head(5).items()):
|
|
print(f" {sensor_names_for_plot_avg[i]}: {avg_duration:.2f} minutes")
|
|
|
|
print("\nTesting group-based visualizations...")
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
print("Group-based visualizations would be created...")
|
|
|
|
# Test group composition analysis
|
|
if analyzer.sensor_mapping:
|
|
# Create a mapping of group to number of sensors
|
|
group_to_sensor_count = {}
|
|
for sensor_id, sensor_info in analyzer.sensor_mapping.items():
|
|
group = sensor_info.get('group', 'Unknown')
|
|
if group not in group_to_sensor_count:
|
|
group_to_sensor_count[group] = 0
|
|
group_to_sensor_count[group] += 1
|
|
|
|
# Convert to dataframe and sort
|
|
group_sensor_counts = pd.DataFrame(
|
|
list(group_to_sensor_count.items()),
|
|
columns=['Group', 'Sensor_Count']
|
|
).sort_values('Sensor_Count', ascending=False).head(10)
|
|
|
|
print("Sample of group composition data:")
|
|
for _, row in group_sensor_counts.head(5).iterrows():
|
|
print(f" {row['Group']}: {row['Sensor_Count']} sensors")
|
|
|
|
# Test alarm type distribution by group
|
|
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
|
|
top_10_groups = duration_events['Sensor_Group'].value_counts().head(10).index
|
|
alarm_type_by_group_top = alarm_type_by_group.loc[top_10_groups]
|
|
|
|
print("Sample of alarm type distribution by group:")
|
|
sample_groups = alarm_type_by_group_top.head(3)
|
|
for group in sample_groups.index:
|
|
print(f" {group}:")
|
|
for alarm_type in sample_groups.columns:
|
|
count = sample_groups.loc[group, alarm_type]
|
|
if count > 0:
|
|
print(f" {alarm_type}: {count} alarms")
|
|
|
|
# Test group alarm intensity
|
|
alarms_per_sensor_by_group = duration_events.groupby('Sensor_Group')['Sensor_Id'].nunique().to_dict()
|
|
|
|
# Calculate total sensors per group from mapping
|
|
group_to_sensor_count = {}
|
|
for sensor_id, sensor_info in analyzer.sensor_mapping.items():
|
|
group = sensor_info.get('group', 'Unknown')
|
|
if group not in group_to_sensor_count:
|
|
group_to_sensor_count[group] = 0
|
|
group_to_sensor_count[group] += 1
|
|
|
|
# Calculate alarms per sensor ratio
|
|
group_alarm_intensity = {}
|
|
for group in set(duration_events['Sensor_Group'].unique()):
|
|
total_alarms = len(duration_events[duration_events['Sensor_Group'] == group])
|
|
total_sensors = group_to_sensor_count.get(group, 1) # Avoid division by zero
|
|
group_alarm_intensity[group] = total_alarms / total_sensors
|
|
|
|
# Convert to DataFrame and sort
|
|
intensity_df = pd.DataFrame(
|
|
list(group_alarm_intensity.items()),
|
|
columns=['Group', 'Alarms_Per_Sensor']
|
|
).sort_values('Alarms_Per_Sensor', ascending=False).head(10)
|
|
|
|
print("Sample of group alarm intensity:")
|
|
for _, row in intensity_df.head(5).iterrows():
|
|
print(f" {row['Group']}: {row['Alarms_Per_Sensor']:.2f} alarms per sensor")
|
|
|
|
print("\nAll enhanced plotting logic tests passed!")
|
|
print("The enhanced plotting functionality is ready to use when matplotlib and seaborn are available.")
|
|
|
|
if __name__ == '__main__':
|
|
test_enhanced_plotting() |