Files
AlarmAnalysis/alarm_analyzer.py
andy f08a1a9bf5 Initial commit: alarm analysis project
Python project for analyzing alarm data from building monitoring systems.
Includes alarm analyzer, plotting, tests, and source data files.
2026-02-26 09:03:54 -05:00

1884 lines
94 KiB
Python

import pandas as pd
import numpy as np
import re
from datetime import datetime
from openpyxl import load_workbook
import warnings
warnings.filterwarnings('ignore')
# Import matplotlib and seaborn only when needed for visualizations
def _import_viz_libs():
import matplotlib.pyplot as plt
import seaborn as sns
return plt, sns
class AlarmAnalyzer:
def __init__(self, csv_file_path, xlsx_file_path=None, exclusion_file_path=None):
"""
Initialize the Alarm Analyzer with CSV and optional XLSX file paths
"""
self.csv_file_path = csv_file_path
self.xlsx_file_path = xlsx_file_path
self.exclusion_file_path = exclusion_file_path
self.excluded_groups = set()
self.alarm_data = None
self.sensor_data = None
self.processed_events = None
self.sensor_mapping = None
# Load excluded groups if exclusion file is provided
if self.exclusion_file_path:
self.load_excluded_groups()
def load_excluded_groups(self):
"""
Load groups to exclude from analysis from a configuration file
Supports both JSON format: {"excluded_groups": ["Group1", "Group2"]}
and simple text format: one group name per line
"""
import json
import os
if not os.path.exists(self.exclusion_file_path):
print(f"Warning: Exclusion file {self.exclusion_file_path} does not exist. No groups will be excluded.")
return
print(f"Loading excluded groups from {self.exclusion_file_path}...")
try:
# Try to parse as JSON first
with open(self.exclusion_file_path, 'r') as f:
content = f.read().strip()
# Check if it's a JSON file by attempting to parse
if content.startswith('{') or content.startswith('['):
# It's a JSON file
with open(self.exclusion_file_path, 'r') as f:
config = json.load(f)
if isinstance(config, dict) and 'excluded_groups' in config:
self.excluded_groups = set(config['excluded_groups'])
elif isinstance(config, list):
# If the JSON is just an array of group names
self.excluded_groups = set(config)
else:
print(f"Warning: Invalid JSON format in {self.exclusion_file_path}. Expected object with 'excluded_groups' key or array of group names.")
return
else:
# It's a text file - read line by line
with open(self.exclusion_file_path, 'r') as f:
groups = [line.strip() for line in f if line.strip()]
self.excluded_groups = set(groups)
print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}")
except json.JSONDecodeError:
# If JSON parsing fails, treat as text file
print(f"JSON parsing failed, treating {self.exclusion_file_path} as text file...")
try:
with open(self.exclusion_file_path, 'r') as f:
groups = [line.strip() for line in f if line.strip()]
self.excluded_groups = set(groups)
print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}")
except Exception as e:
print(f"Error reading exclusion file: {e}")
except Exception as e:
print(f"Error loading exclusion file: {e}")
def load_data(self):
"""
Load alarm data from CSV and sensor descriptions from XLSX
"""
print("Loading alarm data from CSV...")
self.alarm_data = pd.read_csv(self.csv_file_path)
# Convert Date and LogTime to datetime
self.alarm_data['Date'] = pd.to_datetime(self.alarm_data['Date'])
self.alarm_data['LogTime'] = pd.to_datetime(self.alarm_data['LogTime'])
print(f"Loaded {len(self.alarm_data)} alarm records")
print(f"Date range: {self.alarm_data['Date'].min()} to {self.alarm_data['Date'].max()}")
# Load sensor descriptions if XLSX file is provided
if self.xlsx_file_path:
print("Loading sensor descriptions from XLSX...")
try:
# Read the sensor report - check if it's the new format (header=0) or old format (header=4)
# First, try to read with header=0 (new format)
temp_df = pd.read_excel(self.xlsx_file_path, header=0, nrows=5)
# Check if the first row contains expected column names (new format)
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
if has_expected_cols:
# New format - use header=0
self.sensor_data = pd.read_excel(self.xlsx_file_path, header=0)
print("Detected new sensor report format (header=0)")
# For new format, no hierarchical processing needed
processed_sensor_data = self.sensor_data
else:
# Old format - use header=4 with hierarchical processing
self.sensor_data = pd.read_excel(self.xlsx_file_path, header=4)
print("Detected old sensor report format (header=4)")
# Process the sensor data to handle hierarchical structure where group names
# apply to all rows below until the next group name
processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data)
print(f"Loaded sensor data with {len(self.sensor_data)} records")
print(f"Sensor data columns: {list(self.sensor_data.columns)}")
# Create a mapping from Sensor_Id to sensor details
# The 'ID' column appears to have numeric values that could match Sensor_Id in alarm data
if 'ID' in self.sensor_data.columns:
# Create a mapping from ID to other details
self.sensor_mapping = {}
for _, row in processed_sensor_data.iterrows():
sensor_id_raw = row['ID']
if pd.notna(sensor_id_raw): # Only map non-null values
# Convert to int if it's numeric to match the alarm data
try:
sensor_id = int(sensor_id_raw)
except (ValueError, TypeError):
continue # Skip if conversion fails
self.sensor_mapping[sensor_id] = {
'name': row['Name'] if pd.notna(row['Name']) else
(row['Remote'] if pd.notna(row['Remote']) else 'Unknown'),
'group': row['Group'] if pd.notna(row['Group']) else 'Unknown',
'type': row['Type'] if pd.notna(row['Type']) else 'Unknown',
'serial_no': row['Serial No'] if pd.notna(row['Serial No']) else 'Unknown'
}
print(f"Created sensor mapping for {len(self.sensor_mapping)} sensors")
# Add sensor information to alarm data
self.add_sensor_info_to_alarms()
# Filter out excluded groups if any are specified
self.filter_excluded_groups()
# Log summary of excluded groups
if self.excluded_groups:
print(f"Excluded groups summary: {len(self.excluded_groups)} groups were excluded from analysis")
print(f"Excluded groups: {', '.join(sorted(self.excluded_groups))}")
else:
print("Warning: 'ID' column not found in sensor report. Attempting to find alternative mapping...")
# Try to find other potential ID columns that might match Sensor_Id
for col in self.sensor_data.columns:
if 'SN' in str(col) or 'Remote' in str(col) or 'Id' in str(col):
print(f"Found potential ID column: {col}")
self.sensor_mapping = {}
except Exception as e:
print(f"Could not load XLSX file: {e}")
import traceback
traceback.print_exc()
self.sensor_data = None
self.sensor_mapping = {}
else:
print("No XLSX file provided for sensor descriptions")
self.sensor_data = None
self.sensor_mapping = {}
return self.alarm_data, self.sensor_data
def _process_hierarchical_sensor_data(self, sensor_df):
"""
Process sensor data to handle hierarchical structure where group names
apply to all rows below until the next group name is specified.
Same logic applies to other columns that follow this pattern.
"""
# Make a copy to avoid modifying the original dataframe
df = sensor_df.copy()
# Forward fill hierarchical columns to propagate values to empty cells below
# This handles the case where a group name applies to all rows below it
hierarchical_cols = ['Group', 'Remote', 'Name', 'Type', 'Serial No']
for col in hierarchical_cols:
if col in df.columns:
# Forward fill: propagate non-null values down until the next non-null value
df[col] = df[col].ffill()
return df
def add_sensor_info_to_alarms(self):
"""
Add sensor information (Name, Group, Type) to alarm data using the sensor mapping
"""
if not self.sensor_mapping:
print("No sensor mapping available, skipping sensor info addition")
return
# Add sensor name, group, and type to alarm data
self.alarm_data['Sensor_Name'] = self.alarm_data['Sensor_Id'].map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
self.alarm_data['Sensor_Group'] = self.alarm_data['Sensor_Id'].map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
self.alarm_data['Sensor_Type'] = self.alarm_data['Sensor_Id'].map(
lambda x: self.sensor_mapping.get(x, {}).get('type', 'Unknown')
)
print(f"Added sensor information to {len(self.alarm_data)} alarm records")
print(f"Found sensor names for {self.alarm_data['Sensor_Name'].nunique()} unique sensors")
print(f"Found sensor groups for {self.alarm_data['Sensor_Group'].nunique()} unique sensors")
def filter_excluded_groups(self):
"""
Filter out alarm data for excluded groups
"""
if not self.excluded_groups:
print("No groups to exclude, skipping filtering")
return
initial_count = len(self.alarm_data)
# Filter out rows where Sensor_Group is in excluded_groups
self.alarm_data = self.alarm_data[~self.alarm_data['Sensor_Group'].isin(self.excluded_groups)]
final_count = len(self.alarm_data)
excluded_count = initial_count - final_count
print(f"Filtered out {excluded_count} alarm records from excluded groups: {list(self.excluded_groups)}")
print(f"Remaining alarm records: {final_count}")
def parse_alarm_description(self, description):
"""
Parse alarm description to extract alarm type, value, threshold, and unit
"""
if pd.isna(description):
return {'type': 'Unknown', 'value': None, 'threshold': None, 'unit': None}
desc = str(description).strip()
# Determine alarm type
if 'Error' in desc:
alarm_type = 'Error'
elif 'Alarm' in desc and 'Warning' not in desc:
alarm_type = 'Alarm'
elif 'Warning' in desc:
alarm_type = 'Warning'
elif 'Normal' in desc:
alarm_type = 'Normal'
else:
alarm_type = 'Other'
# Extract value and threshold using regex
value = None
threshold = None
unit = None
# Pattern to match values in descriptions like "Hi Alarm: 51.3>=46.0F"
value_pattern = r'([+-]?\d*\.?\d+)'
unit_pattern = r'([CF%RH"|]+|min\.|%)'
# Extract all numeric values
numeric_matches = re.findall(value_pattern, desc)
numeric_values = [float(x) for x in numeric_matches if x]
# Extract unit
unit_match = re.search(unit_pattern, desc)
if unit_match:
unit = unit_match.group(1)
# Determine value and threshold based on alarm type
if alarm_type == 'Normal':
# For Normal events, the value is the current reading
if len(numeric_values) >= 1:
value = numeric_values[0]
elif alarm_type in ['Alarm', 'Warning', 'Error']:
# For alarm events, we typically have both current value and threshold
if len(numeric_values) >= 2:
value = numeric_values[0]
threshold = numeric_values[1]
elif len(numeric_values) == 1:
# Sometimes only threshold is provided
threshold = numeric_values[0]
return {
'type': alarm_type,
'value': value,
'threshold': threshold,
'unit': unit
}
def categorize_alarms(self):
"""
Add parsed alarm information to the dataset
"""
print("Categorizing alarms...")
# Apply parsing to each description
parsed_data = self.alarm_data['Description'].apply(self.parse_alarm_description)
# Create new columns for parsed information
self.alarm_data['AlarmType'] = parsed_data.apply(lambda x: x['type'])
self.alarm_data['Value'] = parsed_data.apply(lambda x: x['value'])
self.alarm_data['Threshold'] = parsed_data.apply(lambda x: x['threshold'])
self.alarm_data['Unit'] = parsed_data.apply(lambda x: x['unit'])
# Count alarm types
alarm_counts = self.alarm_data['AlarmType'].value_counts()
print("Alarm type distribution:")
for alarm_type, count in alarm_counts.items():
print(f" {alarm_type}: {count}")
# Add sensor name, group, and type if not already added
if 'Sensor_Name' not in self.alarm_data.columns and self.sensor_mapping:
self.add_sensor_info_to_alarms()
return self.alarm_data
def pair_events_and_calculate_durations(self):
"""
Pair alarm start events with corresponding end events and calculate durations
Updated to handle transitions between different alarm conditions (e.g., warning -> alarm)
"""
print("Pairing alarm events and calculating durations...")
# Sort the data by Sensor_Id and Date to ensure proper chronological order
self.alarm_data = self.alarm_data.sort_values(['Sensor_Id', 'Date']).reset_index(drop=True)
# Create a new DataFrame to store paired events
paired_events = []
# Group by Sensor_Id to process each sensor separately
for sensor_id in self.alarm_data['Sensor_Id'].unique():
sensor_data = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].copy()
sensor_data = sensor_data.sort_values('Date').reset_index(drop=True)
# Find alarm start events (Alarm, Warning, Error) and pair with next Normal event or different alarm condition
i = 0
while i < len(sensor_data):
current_event = sensor_data.iloc[i]
# Check if current event is an alarm start (not Normal or Other)
if current_event['AlarmType'] in ['Alarm', 'Warning', 'Error']:
start_time = current_event['Date']
start_type = current_event['AlarmType']
start_description = current_event['Description']
start_value = current_event['Value']
start_threshold = current_event['Threshold']
start_alarm_id = current_event['Alarm_Id']
# Get sensor information for the current sensor
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', 'Unknown')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_type = sensor_info.get('type', 'Unknown')
# Look for the next Normal event OR a different alarm condition for this sensor
j = i + 1
end_found = False
while j < len(sensor_data):
next_event = sensor_data.iloc[j]
# End condition 1: Normal event ends any alarm condition
if next_event['AlarmType'] == 'Normal':
end_time = next_event['Date']
end_description = next_event['Description']
end_value = next_event['Value']
end_alarm_id = next_event['Alarm_Id']
# Calculate duration
duration = end_time - start_time
duration_minutes = duration.total_seconds() / 60.0
# Add to paired events
paired_events.append({
'Sensor_Id': sensor_id,
'Sensor_Name': sensor_name,
'Sensor_Group': sensor_group,
'Sensor_Type': sensor_type,
'Start_Time': start_time,
'End_Time': end_time,
'Duration_Minutes': duration_minutes,
'Alarm_Type': start_type,
'Start_Description': start_description,
'End_Description': end_description,
'Start_Value': start_value,
'Threshold': start_threshold,
'End_Value': end_value,
'Start_Alarm_Id': start_alarm_id,
'End_Alarm_Id': end_alarm_id,
'End_Reason': 'Normal'
})
# Move index to the end event and break to find next start event
i = j
end_found = True
break
# End condition 2: Different alarm condition (transition from warning to alarm, etc.)
elif next_event['AlarmType'] in ['Alarm', 'Warning', 'Error']:
# If the next event is a different alarm condition, end the current one
# Calculate duration up to the next alarm condition
end_time = next_event['Date']
end_description = next_event['Description']
end_value = next_event['Value']
end_alarm_id = next_event['Alarm_Id']
# Calculate duration
duration = end_time - start_time
duration_minutes = duration.total_seconds() / 60.0
# Add to paired events
paired_events.append({
'Sensor_Id': sensor_id,
'Sensor_Name': sensor_name,
'Sensor_Group': sensor_group,
'Sensor_Type': sensor_type,
'Start_Time': start_time,
'End_Time': end_time,
'Duration_Minutes': duration_minutes,
'Alarm_Type': start_type,
'Start_Description': start_description,
'End_Description': end_description,
'Start_Value': start_value,
'Threshold': start_threshold,
'End_Value': end_value,
'Start_Alarm_Id': start_alarm_id,
'End_Alarm_Id': end_alarm_id,
'End_Reason': f'Transition to {next_event["AlarmType"]}'
})
# Move index to the next event (the new alarm condition becomes the start)
i = j
end_found = True
break
j += 1
# If no corresponding end event (Normal or different alarm condition) was found, record as unresolved
if not end_found:
duration = None
paired_events.append({
'Sensor_Id': sensor_id,
'Sensor_Name': sensor_name,
'Sensor_Group': sensor_group,
'Sensor_Type': sensor_type,
'Start_Time': start_time,
'End_Time': None,
'Duration_Minutes': None,
'Alarm_Type': start_type,
'Start_Description': start_description,
'End_Description': None,
'Start_Value': start_value,
'Threshold': start_threshold,
'End_Value': None,
'Start_Alarm_Id': start_alarm_id,
'End_Alarm_Id': None,
'End_Reason': 'Unresolved'
})
i += 1
# Convert to DataFrame
self.processed_events = pd.DataFrame(paired_events)
print(f"Paired {len(self.processed_events)} events")
if len(self.processed_events) > 0:
print(f"Events with duration: {len(self.processed_events[self.processed_events['Duration_Minutes'].notna()])}")
print(f"Unresolved events: {len(self.processed_events[self.processed_events['Duration_Minutes'].isna()])}")
return self.processed_events
def basic_analysis(self):
"""
Perform basic analysis: counts, min/max/average durations by sensor and alarm type
"""
print("Performing basic analysis...")
if self.processed_events is None or len(self.processed_events) == 0:
print("No processed events available for analysis. Run pair_events_and_calculate_durations first.")
return
# Analysis by alarm type and sensor
print("\n--- ALARM COUNTS BY TYPE AND SENSOR ---")
# Count events by alarm type and sensor
count_by_type_sensor = self.processed_events.groupby(['Alarm_Type', 'Sensor_Id']).size().reset_index(name='Count')
# Get top sensors by alarm count
count_by_sensor = self.processed_events.groupby(['Sensor_Id', 'Alarm_Type']).size().reset_index(name='Count')
print("\nTop 10 sensors with most alarms:")
sensor_totals = count_by_sensor.groupby('Sensor_Id')['Count'].sum().sort_values(ascending=False)
print(sensor_totals.head(10))
# If sensor groups are available, also analyze by group
if 'Sensor_Group' in self.processed_events.columns:
print("\n--- ALARM COUNTS BY GROUP ---")
count_by_group = self.processed_events.groupby(['Alarm_Type', 'Sensor_Group']).size().reset_index(name='Count')
print("\nTop 10 groups with most alarms:")
group_totals = self.processed_events.groupby('Sensor_Group')['Sensor_Id'].count().sort_values(ascending=False)
print(group_totals.head(10))
print("\n--- DURATION ANALYSIS ---")
# Filter out unresolved events for duration analysis
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) > 0:
# Calculate min, max, avg duration by alarm type and sensor
duration_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Id'])['Duration_Minutes'].agg([
'count', 'min', 'max', 'mean'
]).round(2).reset_index()
print("\nDuration statistics by alarm type and sensor (top 10 by count):")
print(duration_stats.sort_values('count', ascending=False).head(10))
# Calculate overall statistics by alarm type
overall_duration_stats = duration_events.groupby('Alarm_Type')['Duration_Minutes'].agg([
'count', 'min', 'max', 'mean'
]).round(2)
print("\nOverall duration statistics by alarm type:")
print(overall_duration_stats)
# Calculate statistics for all sensors combined
all_sensor_stats = duration_events.groupby('Sensor_Id')['Duration_Minutes'].agg([
'count', 'min', 'max', 'mean'
]).round(2).sort_values('count', ascending=False)
print("\nTop 10 sensors by alarm count with duration stats:")
print(all_sensor_stats.head(10))
# If sensor groups are available, calculate group statistics
if 'Sensor_Group' in duration_events.columns:
print("\n--- GROUP-BASED DURATION ANALYSIS ---")
# Calculate statistics by group
group_duration_stats = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([
'count', 'min', 'max', 'mean'
]).round(2).sort_values('count', ascending=False)
print("\nTop 10 groups by alarm count with duration stats:")
print(group_duration_stats.head(10))
# Calculate statistics by alarm type and group
type_group_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Group'])['Duration_Minutes'].agg([
'count', 'min', 'max', 'mean'
]).round(2).sort_values('count', ascending=False)
print("\nTop 10 alarm type and group combinations by count:")
print(type_group_stats.head(10))
return {
'count_by_type_sensor': count_by_type_sensor,
'count_by_sensor': count_by_sensor,
'count_by_group': count_by_group if 'Sensor_Group' in self.processed_events.columns else pd.DataFrame(),
'duration_stats': duration_stats,
'overall_duration_stats': overall_duration_stats,
'all_sensor_stats': all_sensor_stats,
'group_duration_stats': group_duration_stats if 'Sensor_Group' in duration_events.columns else pd.DataFrame()
}
else:
print("No resolved events with duration data available for analysis.")
return None
def advanced_analysis(self):
"""
Perform advanced analysis including:
- Time-based analysis
- MTBF (Mean Time Between Failures)
- Alarm correlation analysis
- Severity analysis
"""
print("\n--- ADVANCED ANALYSIS ---")
if self.processed_events is None or len(self.processed_events) == 0:
print("No processed events available for analysis. Run pair_events_and_calculate_durations first.")
return
# Filter resolved events for time-based analysis
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) == 0:
print("No resolved events with duration data available for advanced analysis.")
return
print("\n1. TIME-BASED ANALYSIS")
# Extract time components
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
# Alarm frequency by hour of day
hourly_freq = duration_events.groupby('Start_Hour').size()
print(f"\nAlarm frequency by hour of day (Top 5):")
print(hourly_freq.sort_values(ascending=False).head())
# Alarm frequency by day of week
daily_freq = duration_events.groupby('Start_DayOfWeek').size()
print(f"\nAlarm frequency by day of week:")
print(daily_freq)
# Alarm frequency by date
daily_count = duration_events.groupby('Start_Date').size()
print(f"\nTotal alarms per day (last 5 days):")
print(daily_count.tail())
print("\n2. MTBF (MEAN TIME BETWEEN FAILURES) ANALYSIS")
# Calculate MTBF for each sensor
# MTBF = Total operational time / Number of failures
# For each sensor, we'll calculate the time between consecutive alarm starts
mtbf_data = []
for sensor_id in duration_events['Sensor_Id'].unique():
sensor_events = duration_events[duration_events['Sensor_Id'] == sensor_id].sort_values('Start_Time')
if len(sensor_events) > 1:
# Calculate time between consecutive alarm starts
sensor_events = sensor_events.copy()
sensor_events['Time_Between_Alerts'] = sensor_events['Start_Time'].diff().dt.total_seconds() / 3600 # in hours
# Calculate MTBF (mean time between consecutive alarms)
mtbf_hours = sensor_events['Time_Between_Alerts'].mean()
# Calculate total operational time (from first to last alarm)
total_op_time = (sensor_events['Start_Time'].max() - sensor_events['Start_Time'].min()).total_seconds() / 3600
num_alarms = len(sensor_events)
# MTBF = total operational time / number of alarms
mtbf_by_total_time = total_op_time / num_alarms if num_alarms > 0 else 0
mtbf_data.append({
'Sensor_Id': sensor_id,
'MTBF_Hours_By_Consecutive': mtbf_hours,
'MTBF_Hours_By_Total_Time': mtbf_by_total_time,
'Total_Alerts': num_alarms,
'Total_Op_Time_Hours': total_op_time
})
mtbf_df = pd.DataFrame(mtbf_data).sort_values('MTBF_Hours_By_Total_Time', ascending=False)
print("\nTop 10 sensors by MTBF (Mean Time Between Failures):")
print(mtbf_df.head(10))
print("\n3. ALARM CORRELATION ANALYSIS")
# Find sensors that frequently alarm together (within a time window)
# Group events by time windows (e.g., 1 hour) and see which sensors alarm together
duration_events['Time_Window'] = duration_events['Start_Time'].dt.floor('H')
time_window_groups = duration_events.groupby('Time_Window')['Sensor_Id'].apply(list).reset_index()
# Count how many times each pair of sensors alarms together
correlation_data = []
for _, row in time_window_groups.iterrows():
sensors = row['Sensor_Id']
if len(sensors) > 1:
# Get all pairs of sensors that alarmed in this time window
for i in range(len(sensors)):
for j in range(i+1, len(sensors)):
sensor1, sensor2 = sensors[i], sensors[j]
correlation_data.append({
'Sensor1': sensor1,
'Sensor2': sensor2,
'Time_Window': row['Time_Window']
})
if correlation_data:
correlation_df = pd.DataFrame(correlation_data)
correlation_counts = correlation_df.groupby(['Sensor1', 'Sensor2']).size().reset_index(name='Count')
correlation_counts = correlation_counts.sort_values('Count', ascending=False)
print("\nTop 10 sensor pairs that alarm together frequently:")
print(correlation_counts.head(10))
else:
print("\nNo correlated alarms found in the same time windows.")
print("\n4. SEVERITY ANALYSIS")
# Weighted scoring based on alarm type and duration
# Error: weight 3, Alarm: weight 2, Warning: weight 1
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
duration_events['Severity_Score'] = duration_events.apply(
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
)
# Total severity by sensor
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].agg([
'sum', 'mean', 'count'
]).round(2).reset_index()
severity_by_sensor = severity_by_sensor.sort_values('sum', ascending=False)
print("\nTop 10 sensors by total severity score:")
print(severity_by_sensor.head(10))
print("\n5. ALARM ESCALATION ANALYSIS")
# Count how many warnings escalate to alarms for each sensor
# For this analysis, we'll look for cases where a warning is followed by an alarm for the same sensor
escalation_data = []
# Group by sensor and sort by time
for sensor_id in self.alarm_data['Sensor_Id'].unique():
sensor_alarms = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].sort_values('Date')
for i in range(len(sensor_alarms) - 1):
current = sensor_alarms.iloc[i]
next_event = sensor_alarms.iloc[i + 1]
# Check if current is warning and next is alarm (not Normal)
if current['AlarmType'] == 'Warning' and next_event['AlarmType'] in ['Alarm', 'Error']:
time_diff = (next_event['Date'] - current['Date']).total_seconds() / 60 # in minutes
if time_diff <= 60: # Within 1 hour
# Get sensor group information
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_group = sensor_info.get('group', 'Unknown')
escalation_data.append({
'Sensor_Id': sensor_id,
'Sensor_Group': sensor_group,
'Warning_Time': current['Date'],
'Escalation_Type': next_event['AlarmType'],
'Time_To_Escalation_Minutes': time_diff
})
if escalation_data:
escalation_df = pd.DataFrame(escalation_data)
escalation_counts = escalation_df.groupby('Sensor_Id').size().reset_index(name='Escalation_Count')
escalation_counts = escalation_counts.sort_values('Escalation_Count', ascending=False)
print(f"\nTotal escalations found: {len(escalation_df)}")
print("Top 10 sensors with most escalations (Warning -> Alarm/Error):")
print(escalation_counts.head(10))
# Group-based escalation analysis
if 'Sensor_Group' in escalation_df.columns:
escalation_by_group = escalation_df.groupby('Sensor_Group').size().reset_index(name='Escalation_Count')
escalation_by_group = escalation_by_group.sort_values('Escalation_Count', ascending=False)
print("\nTop 10 groups with most escalations (Warning -> Alarm/Error):")
print(escalation_by_group.head(10))
else:
print("\nNo alarm escalations found in the data.")
print("\n6. GROUP-BASED ANALYSIS")
# Group-based statistics if sensor groups are available
if 'Sensor_Group' in duration_events.columns:
print("\n--- GROUP-BASED ANALYSIS ---")
# Count alarms by group
group_counts = duration_events.groupby('Sensor_Group').size().reset_index(name='Alarm_Count')
group_counts = group_counts.sort_values('Alarm_Count', ascending=False)
print("\nTop 10 groups by alarm count:")
print(group_counts.head(10))
# Calculate MTBF by group
mtbf_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([
'count', 'mean', 'min', 'max'
]).round(2).sort_values('count', ascending=False)
print("\nMTBF statistics by group (top 10 by count):")
print(mtbf_by_group.head(10))
# Severity by group
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].agg([
'sum', 'mean', 'count'
]).round(2).sort_values('sum', ascending=False)
print("\nTop 10 groups by total severity score:")
print(severity_by_group.head(10))
# Alarm types by group
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().reset_index(name='Count')
alarm_type_by_group = alarm_type_by_group.sort_values(['Sensor_Group', 'Count'], ascending=[True, False])
print("\nTop alarm types by group:")
print(alarm_type_by_group.groupby('Sensor_Group').head(3))
return {
'hourly_frequency': hourly_freq,
'daily_frequency': daily_freq,
'mtbf_data': mtbf_df,
'correlation_data': correlation_counts if correlation_data else pd.DataFrame(),
'severity_analysis': severity_by_sensor,
'escalation_analysis': escalation_df if escalation_data else pd.DataFrame(),
'group_analysis': {
'group_counts': group_counts if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
'mtbf_by_group': mtbf_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
'severity_by_group': severity_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
'alarm_type_by_group': alarm_type_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame()
} if 'Sensor_Group' in duration_events.columns else {}
}
def create_visualizations(self, save_plots=False, output_dir="plots"):
"""
Create visualizations for the alarm analysis
"""
print("\n--- CREATING VISUALIZATIONS ---")
# Import visualization libraries only when needed
try:
plt, sns = _import_viz_libs()
except ImportError:
print("Matplotlib or seaborn not available. Skipping visualizations.")
return
if self.processed_events is None or len(self.processed_events) == 0:
print("No processed events available for visualization. Run pair_events_and_calculate_durations first.")
return
# Create output directory if needed
if save_plots:
import os
os.makedirs(output_dir, exist_ok=True)
# Filter resolved events for visualization
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) == 0:
print("No resolved events with duration data available for visualization.")
return
# Extract time components for time-based analysis
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
# Set up the plotting style
plt.style.use('default')
sns.set_palette("husl")
# 1. Alarm count by type
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Alarm Analysis Dashboard', fontsize=16, fontweight='bold')
# Alarm count by type
alarm_type_counts = duration_events['Alarm_Type'].value_counts()
axes[0, 0].bar(alarm_type_counts.index, alarm_type_counts.values)
axes[0, 0].set_title('Alarm Count by Type')
axes[0, 0].set_ylabel('Count')
for i, v in enumerate(alarm_type_counts.values):
axes[0, 0].text(i, v + v*0.01, str(v), ha='center', va='bottom')
# Top 10 sensors by alarm count - with sensor names instead of IDs
top_sensors = duration_events['Sensor_Id'].value_counts().head(10)
sensor_names_for_plot = []
for sensor_id in top_sensors.index:
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot.append(f"{sensor_name}\n({sensor_group})")
axes[0, 1].bar(range(len(top_sensors)), top_sensors.values)
axes[0, 1].set_title('Top 10 Sensors by Alarm Count')
axes[0, 1].set_ylabel('Count')
axes[0, 1].set_xticks(range(len(top_sensors)))
axes[0, 1].set_xticklabels(sensor_names_for_plot, rotation=45)
for i, v in enumerate(top_sensors.values):
axes[0, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom')
# Alarm frequency by hour of day
hourly_freq = duration_events.groupby('Start_Hour').size()
axes[1, 0].plot(hourly_freq.index, hourly_freq.values, marker='o')
axes[1, 0].set_title('Alarm Frequency by Hour of Day')
axes[1, 0].set_xlabel('Hour of Day')
axes[1, 0].set_ylabel('Number of Alarms')
axes[1, 0].grid(True, alpha=0.3)
# Alarm frequency by day of week
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0)
axes[1, 1].bar(range(len(daily_freq)), daily_freq.values)
axes[1, 1].set_title('Alarm Frequency by Day of Week')
axes[1, 1].set_ylabel('Number of Alarms')
axes[1, 1].set_xticks(range(len(daily_freq)))
axes[1, 1].set_xticklabels([d[:3] for d in daily_freq.index], rotation=45)
for i, v in enumerate(daily_freq.values):
axes[1, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom')
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/alarm_dashboard.png", dpi=300, bbox_inches='tight')
plt.show()
# 2. Duration analysis by alarm type
plt.figure(figsize=(12, 6))
# Box plot of durations by alarm type
plt.subplot(1, 2, 1)
sns.boxplot(data=duration_events, x='Alarm_Type', y='Duration_Minutes')
plt.title('Distribution of Alarm Durations by Type')
plt.xlabel('Alarm Type')
plt.ylabel('Duration (Minutes)')
plt.yscale('log') # Log scale to better visualize the wide range of durations
# Histogram of durations by alarm type
plt.subplot(1, 2, 2)
for alarm_type in duration_events['Alarm_Type'].unique():
subset = duration_events[duration_events['Alarm_Type'] == alarm_type]
plt.hist(subset['Duration_Minutes'], alpha=0.6, label=alarm_type, bins=30)
plt.title('Distribution of Alarm Durations by Type')
plt.xlabel('Duration (Minutes)')
plt.ylabel('Frequency')
plt.legend()
plt.yscale('log') # Log scale to better visualize the wide range of durations
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/duration_analysis.png", dpi=300, bbox_inches='tight')
plt.show()
# 3. Top sensors by various metrics
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Top Sensors Analysis', fontsize=16, fontweight='bold')
# Top 10 sensors by total alarms - with sensor names instead of IDs
top_sensors_by_count = duration_events['Sensor_Id'].value_counts().head(10)
sensor_names_for_plot = []
for sensor_id in top_sensors_by_count.index:
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot.append(f"{sensor_name} (Group: {sensor_group})")
axes[0, 0].barh(range(len(top_sensors_by_count)), top_sensors_by_count.values)
axes[0, 0].set_title('Top 10 Sensors by Total Alarm Count')
axes[0, 0].set_xlabel('Number of Alarms')
axes[0, 0].set_yticks(range(len(top_sensors_by_count)))
axes[0, 0].set_yticklabels(sensor_names_for_plot)
# Top 10 sensors by average duration - with sensor names instead of IDs
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
sensor_names_for_plot_avg = []
for sensor_id in avg_duration_by_sensor.index:
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot_avg.append(f"{sensor_name} (Group: {sensor_group})")
axes[0, 1].barh(range(len(avg_duration_by_sensor)), avg_duration_by_sensor.values)
axes[0, 1].set_title('Top 10 Sensors by Average Duration')
axes[0, 1].set_xlabel('Average Duration (Minutes)')
axes[0, 1].set_yticks(range(len(avg_duration_by_sensor)))
axes[0, 1].set_yticklabels(sensor_names_for_plot_avg)
# Top 10 sensors by max duration - with sensor names instead of IDs
max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).head(10)
sensor_names_for_plot_max = []
for sensor_id in max_duration_by_sensor.index:
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot_max.append(f"{sensor_name} (Group: {sensor_group})")
axes[1, 0].barh(range(len(max_duration_by_sensor)), max_duration_by_sensor.values)
axes[1, 0].set_title('Top 10 Sensors by Maximum Duration')
axes[1, 0].set_xlabel('Maximum Duration (Minutes)')
axes[1, 0].set_yticks(range(len(max_duration_by_sensor)))
axes[1, 0].set_yticklabels(sensor_names_for_plot_max)
# Top 10 sensors by total severity score - with sensor names instead of IDs
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
duration_events['Severity_Score'] = duration_events.apply(
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
)
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).head(10)
sensor_names_for_plot_severity = []
for sensor_id in severity_by_sensor.index:
sensor_info = self.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot_severity.append(f"{sensor_name} (Group: {sensor_group})")
axes[1, 1].barh(range(len(severity_by_sensor)), severity_by_sensor.values)
axes[1, 1].set_title('Top 10 Sensors by Total Severity Score')
axes[1, 1].set_xlabel('Total Severity Score')
axes[1, 1].set_yticks(range(len(severity_by_sensor)))
axes[1, 1].set_yticklabels(sensor_names_for_plot_severity)
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/sensor_analysis.png", dpi=300, bbox_inches='tight')
plt.show()
# 4. Group-based visualizations if sensor groups are available
if 'Sensor_Group' in duration_events.columns:
print("\nCreating group-based visualizations...")
# First group-based visualization - Dashboard with 4 plots
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Group-Based Analysis Dashboard', fontsize=16, fontweight='bold')
# Top 10 groups by alarm count
top_groups_by_count = duration_events['Sensor_Group'].value_counts().head(10)
axes[0, 0].barh(range(len(top_groups_by_count)), top_groups_by_count.values)
axes[0, 0].set_title('Top 10 Groups by Total Alarm Count')
axes[0, 0].set_xlabel('Number of Alarms')
axes[0, 0].set_yticks(range(len(top_groups_by_count)))
axes[0, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in top_groups_by_count.index])
# Top 10 groups by average duration
avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
axes[0, 1].barh(range(len(avg_duration_by_group)), avg_duration_by_group.values)
axes[0, 1].set_title('Top 10 Groups by Average Duration')
axes[0, 1].set_xlabel('Average Duration (Minutes)')
axes[0, 1].set_yticks(range(len(avg_duration_by_group)))
axes[0, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in avg_duration_by_group.index])
# Top 10 groups by max duration
max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).head(10)
axes[1, 0].barh(range(len(max_duration_by_group)), max_duration_by_group.values)
axes[1, 0].set_title('Top 10 Groups by Maximum Duration')
axes[1, 0].set_xlabel('Maximum Duration (Minutes)')
axes[1, 0].set_yticks(range(len(max_duration_by_group)))
axes[1, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in max_duration_by_group.index])
# Top 10 groups by total severity score
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
duration_events['Severity_Score'] = duration_events.apply(
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
)
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).head(10)
axes[1, 1].barh(range(len(severity_by_group)), severity_by_group.values)
axes[1, 1].set_title('Top 10 Groups by Total Severity Score')
axes[1, 1].set_xlabel('Total Severity Score')
axes[1, 1].set_yticks(range(len(severity_by_group)))
axes[1, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in severity_by_group.index])
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/group_analysis.png", dpi=300, bbox_inches='tight')
plt.show()
# Additional group-based visualizations
print("Creating additional group-based visualizations...")
# Group composition analysis - showing number of sensors per group
if self.sensor_mapping:
# Create a mapping of group to number of sensors
group_to_sensor_count = {}
for sensor_id, sensor_info in self.sensor_mapping.items():
group = sensor_info.get('group', 'Unknown')
if group not in group_to_sensor_count:
group_to_sensor_count[group] = 0
group_to_sensor_count[group] += 1
# Convert to dataframe and sort
group_sensor_counts = pd.DataFrame(
list(group_to_sensor_count.items()),
columns=['Group', 'Sensor_Count']
).sort_values('Sensor_Count', ascending=False).head(15)
# Plot group composition
plt.figure(figsize=(14, 8))
plt.barh(range(len(group_sensor_counts)), group_sensor_counts['Sensor_Count'])
plt.title('Sensor Count by Group (Top 15 Groups)')
plt.xlabel('Number of Sensors in Group')
plt.ylabel('Group')
plt.yticks(range(len(group_sensor_counts)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in group_sensor_counts['Group']])
for i, v in enumerate(group_sensor_counts['Sensor_Count']):
plt.text(v + v*0.01, i, str(v), va='center')
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/group_composition.png", dpi=300, bbox_inches='tight')
plt.show()
# Alarm type distribution by group (stacked bar chart)
if len(duration_events) > 0:
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
top_10_groups = duration_events['Sensor_Group'].value_counts().head(10).index
alarm_type_by_group_top = alarm_type_by_group.loc[top_10_groups]
# Create stacked bar chart
ax = alarm_type_by_group_top.plot(kind='barh', stacked=True, figsize=(14, 8))
plt.title('Alarm Type Distribution by Group (Top 10 Groups)')
plt.xlabel('Number of Alarms')
plt.ylabel('Group')
plt.legend(title='Alarm Type', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/alarm_type_by_group.png", dpi=300, bbox_inches='tight')
plt.show()
# Group alarm intensity: alarms per sensor in each group
if self.sensor_mapping:
# Calculate alarms per sensor for each group
alarms_per_sensor_by_group = duration_events.groupby('Sensor_Group')['Sensor_Id'].nunique().to_dict()
# Calculate total sensors per group from mapping
group_to_sensor_count = {}
for sensor_id, sensor_info in self.sensor_mapping.items():
group = sensor_info.get('group', 'Unknown')
if group not in group_to_sensor_count:
group_to_sensor_count[group] = 0
group_to_sensor_count[group] += 1
# Calculate alarms per sensor ratio
group_alarm_intensity = {}
for group in set(duration_events['Sensor_Group'].unique()):
total_alarms = len(duration_events[duration_events['Sensor_Group'] == group])
total_sensors = group_to_sensor_count.get(group, 1) # Avoid division by zero
group_alarm_intensity[group] = total_alarms / total_sensors
# Convert to DataFrame and sort
intensity_df = pd.DataFrame(
list(group_alarm_intensity.items()),
columns=['Group', 'Alarms_Per_Sensor']
).sort_values('Alarms_Per_Sensor', ascending=False).head(15)
# Plot alarm intensity
plt.figure(figsize=(14, 8))
plt.barh(range(len(intensity_df)), intensity_df['Alarms_Per_Sensor'])
plt.title('Alarm Intensity: Alarms per Sensor by Group (Top 15 Groups)')
plt.xlabel('Average Alarms per Sensor')
plt.ylabel('Group')
plt.yticks(range(len(intensity_df)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in intensity_df['Group']])
for i, v in enumerate(intensity_df['Alarms_Per_Sensor']):
plt.text(v + v*0.01, i, f"{v:.2f}", va='center')
plt.tight_layout()
if save_plots:
plt.savefig(f"{output_dir}/group_alarm_intensity.png", dpi=300, bbox_inches='tight')
plt.show()
print("Visualizations created successfully!")
if save_plots:
print(f"Plots saved to '{output_dir}' directory.")
def calculate_uptime_metrics(self):
"""
Calculate uptime/downtime metrics based on two approaches:
1. Error duration as downtime (communication errors)
2. Alarm/Warning duration as downtime (operational issues)
"""
print("\n--- CALCULATING UPTIME/DOWNTIME METRICS ---")
if self.processed_events is None or len(self.processed_events) == 0:
print("No processed events available for uptime calculation. Run pair_events_and_calculate_durations first.")
return None
# Filter out unresolved events for duration analysis
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) == 0:
print("No resolved events with duration data available for uptime calculation.")
return None
# Calculate total time span from the alarm data (not just processed events)
total_start_time = self.alarm_data['Date'].min()
total_end_time = self.alarm_data['Date'].max()
total_time_span_minutes = (total_end_time - total_start_time).total_seconds() / 60.0
print(f"Total time period: {total_start_time} to {total_end_time}")
print(f"Total time span: {total_time_span_minutes:.2f} minutes ({total_time_span_minutes/60:.2f} hours)")
# 1. Calculate error-based downtime (communication errors)
error_events = duration_events[duration_events['Alarm_Type'] == 'Error'].copy()
total_error_duration = error_events['Duration_Minutes'].sum()
# Calculate error-based system downtime percentage (for communication errors)
# This represents the total time spent in error state across all sensors
error_downtime_percentage = (total_error_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
print(f"\nError-based downtime (communication errors):")
print(f" Total error duration across all sensors: {total_error_duration:.2f} minutes")
print(f" Error downtime percentage (cumulative): {error_downtime_percentage:.4f}%")
print(f" Error uptime percentage (communication): {100 - error_downtime_percentage:.4f}%")
# 2. Calculate alarm/warning-based downtime (operational issues)
alarm_warning_events = duration_events[duration_events['Alarm_Type'].isin(['Alarm', 'Warning'])].copy()
total_alarm_warning_duration = alarm_warning_events['Duration_Minutes'].sum()
# Calculate alarm/warning-based system downtime percentage
alarm_warning_downtime_percentage = (total_alarm_warning_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
print(f"\nAlarm/Warning-based downtime (operational issues):")
print(f" Total alarm/warning duration across all sensors: {total_alarm_warning_duration:.2f} minutes")
print(f" Alarm/Warning downtime percentage (cumulative): {alarm_warning_downtime_percentage:.4f}%")
print(f" Alarm/Warning uptime percentage (operational): {100 - alarm_warning_downtime_percentage:.4f}%")
# 3. Combined downtime (Error + Alarm + Warning)
total_operational_duration = total_error_duration + total_alarm_warning_duration
combined_downtime_percentage = (total_operational_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
print(f"\nCombined system downtime:")
print(f" Total combined duration across all sensors: {total_operational_duration:.2f} minutes")
print(f" Combined downtime percentage (cumulative): {combined_downtime_percentage:.4f}%")
print(f" Combined uptime percentage: {100 - combined_downtime_percentage:.4f}%")
# Calculate more meaningful system-level uptime metrics
# For this, we'll calculate the percentage of sensors in error/alarm state over time
print(f"\n--- ADDITIONAL SYSTEM-LEVEL UPTIME METRICS ---")
# Calculate time-bucketed system uptime (in 1-hour intervals)
try:
# Create time buckets and determine if any sensor was in error/alarm state in each bucket
all_events = self.alarm_data.copy()
all_events = all_events.sort_values('Date')
# Create time buckets (1 hour each)
time_buckets = pd.date_range(
start=total_start_time.floor('H'),
end=total_end_time.ceil('H'),
freq='H'
)
# For each time bucket, calculate if there were any errors or alarm/warnings active
error_buckets = []
alarm_warning_buckets = []
for i in range(len(time_buckets)-1):
bucket_start = time_buckets[i]
bucket_end = time_buckets[i+1]
# Find events that overlap with this time bucket
bucket_events = all_events[
(all_events['Date'] >= bucket_start) &
(all_events['Date'] < bucket_end)
]
# Count how many error and alarm/warning events occurred in this bucket
error_count = len(bucket_events[bucket_events['AlarmType'] == 'Error'])
alarm_warning_count = len(bucket_events[bucket_events['AlarmType'].isin(['Alarm', 'Warning'])])
error_buckets.append(1 if error_count > 0 else 0)
alarm_warning_buckets.append(1 if alarm_warning_count > 0 else 0)
# Calculate percentage of time buckets with errors or alarm/warnings
if len(error_buckets) > 0:
system_error_uptime_percentage = 100 - (sum(error_buckets) / len(error_buckets) * 100)
else:
system_error_uptime_percentage = 100.0
if len(alarm_warning_buckets) > 0:
system_alarm_warning_uptime_percentage = 100 - (sum(alarm_warning_buckets) / len(alarm_warning_buckets) * 100)
else:
system_alarm_warning_uptime_percentage = 100.0
print(f"System-level error uptime (time-based): {system_error_uptime_percentage:.4f}%")
print(f"System-level alarm/warning uptime (time-based): {system_alarm_warning_uptime_percentage:.4f}%")
except Exception as e:
print(f"Could not calculate time-based system uptime metrics: {e}")
system_error_uptime_percentage = None
system_alarm_warning_uptime_percentage = None
# For more meaningful individual sensor uptime, calculate based on the total monitoring time for each sensor
# Calculate total monitoring time per sensor based on first and last alarm occurrence
sensor_monitoring_time = self.alarm_data.groupby('Sensor_Id').agg({
'Date': ['min', 'max']
})
sensor_monitoring_time.columns = ['First_Alarm_Time', 'Last_Alarm_Time']
sensor_monitoring_time['Total_Monitoring_Minutes'] = (
(sensor_monitoring_time['Last_Alarm_Time'] - sensor_monitoring_time['First_Alarm_Time']).dt.total_seconds() / 60.0
)
# Ensure minimum monitoring time (at least the time between first and last alarm for that sensor)
sensor_monitoring_time['Total_Monitoring_Minutes'] = sensor_monitoring_time['Total_Monitoring_Minutes'].apply(
lambda x: max(x, total_time_span_minutes / len(self.alarm_data['Sensor_Id'].unique())) # fallback to avg if needed
)
# Calculate per-sensor metrics with proper uptime percentages
print(f"\n--- PER-SENSOR UPTIME/DOWNTIME METRICS ---")
# Error-based per-sensor metrics
error_by_sensor = error_events.groupby('Sensor_Id').agg({
'Duration_Minutes': ['sum', 'count', 'mean'],
'Start_Time': ['min', 'max']
}).round(2)
error_by_sensor.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'First_Error_Time', 'Last_Error_Time']
# Get all unique sensor IDs from the alarm data
all_sensors = set(self.alarm_data['Sensor_Id'].unique())
# Calculate downtime percentage based on the total time span across ALL data
# Create a complete dataframe with all sensors, including those with 0 errors
all_sensors_df = pd.DataFrame(index=list(all_sensors))
error_by_sensor_complete = all_sensors_df.join(error_by_sensor, how='left')
# Fill NaN values with 0 for sensors with no errors
error_by_sensor_complete = error_by_sensor_complete.fillna({
'Total_Error_Duration': 0,
'Error_Count': 0,
'Avg_Error_Duration': 0,
'First_Error_Time': pd.NaT,
'Last_Error_Time': pd.NaT
})
# Calculate downtime percentage using the total time span for all data
error_by_sensor_complete['Error_Downtime_Percentage'] = (
(error_by_sensor_complete['Total_Error_Duration'] / total_time_span_minutes) * 100
).round(4)
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
error_by_sensor_complete['Error_Downtime_Percentage'] = error_by_sensor_complete['Error_Downtime_Percentage'].apply(
lambda x: min(100.0, x)
)
error_by_sensor_complete['Error_Uptime_Percentage'] = (
100 - error_by_sensor_complete['Error_Downtime_Percentage']
).round(4)
# Ensure uptime doesn't go below 0
error_by_sensor_complete['Error_Uptime_Percentage'] = error_by_sensor_complete['Error_Uptime_Percentage'].apply(
lambda x: max(0, round(x, 4))
)
# Update error_by_sensor with the complete data
error_by_sensor = error_by_sensor_complete
# Alarm/Warning-based per-sensor metrics
alarm_warning_by_sensor_raw = alarm_warning_events.groupby('Sensor_Id').agg({
'Duration_Minutes': ['sum', 'count', 'mean'],
'Start_Time': ['min', 'max']
}).round(2)
alarm_warning_by_sensor_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'First_Alarm_Warning_Time', 'Last_Alarm_Warning_Time']
# Create a complete dataframe with all sensors, including those with 0 alarm/warnings
alarm_warning_by_sensor = all_sensors_df.join(alarm_warning_by_sensor_raw, how='left')
# Fill NaN values with 0 for sensors with no alarm/warnings
alarm_warning_by_sensor = alarm_warning_by_sensor.fillna({
'Total_Alarm_Warning_Duration': 0,
'Alarm_Warning_Count': 0,
'Avg_Alarm_Warning_Duration': 0,
'First_Alarm_Warning_Time': pd.NaT,
'Last_Alarm_Warning_Time': pd.NaT
})
# Calculate downtime percentage based on the total time span across ALL data
# Calculate downtime percentage using the total time span for all data
alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = (
(alarm_warning_by_sensor['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100
).round(4)
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'].apply(
lambda x: min(100.0, x)
)
alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = (
100 - alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage']
).round(4)
# Ensure uptime doesn't go below 0
alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'].apply(
lambda x: max(0, round(x, 4))
)
# Add sensor names and groups to per-sensor metrics
if self.sensor_mapping:
for df in [error_by_sensor, alarm_warning_by_sensor]:
if len(df) > 0:
df['Sensor_Name'] = df.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
df['Sensor_Group'] = df.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in df.columns if col not in ['Sensor_Name', 'Sensor_Group']]
df = df[cols]
# 5. Per-group metrics
print(f"\n--- PER-GROUP UPTIME/DOWNTIME METRICS ---")
if 'Sensor_Group' in duration_events.columns:
# Get all unique sensor groups from the alarm data
all_groups = set(self.alarm_data['Sensor_Group'].unique())
# Error-based per-group metrics
error_by_group_raw = error_events.groupby('Sensor_Group').agg({
'Duration_Minutes': ['sum', 'count', 'mean'],
'Sensor_Id': 'nunique'
}).round(2)
error_by_group_raw.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'Unique_Sensors_With_Errors']
# Create a complete dataframe with all groups, including those with 0 errors
all_groups_df = pd.DataFrame(index=list(all_groups))
error_by_group = all_groups_df.join(error_by_group_raw, how='left')
# Fill NaN values with 0 for groups with no errors
error_by_group = error_by_group.fillna({
'Total_Error_Duration': 0,
'Error_Count': 0,
'Avg_Error_Duration': 0,
'Unique_Sensors_With_Errors': 0
})
# Calculate downtime percentage based on the total time span across ALL groups
error_by_group['Error_Downtime_Percentage'] = (
(error_by_group['Total_Error_Duration'] / total_time_span_minutes) * 100
).round(4)
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
error_by_group['Error_Downtime_Percentage'] = error_by_group['Error_Downtime_Percentage'].apply(
lambda x: min(100.0, x)
)
error_by_group['Error_Uptime_Percentage'] = (
100 - error_by_group['Error_Downtime_Percentage']
).round(4)
# Ensure uptime doesn't go below 0
error_by_group['Error_Uptime_Percentage'] = error_by_group['Error_Uptime_Percentage'].apply(
lambda x: max(0, round(x, 4))
)
# Alarm/Warning-based per-group metrics
alarm_warning_by_group_raw = alarm_warning_events.groupby('Sensor_Group').agg({
'Duration_Minutes': ['sum', 'count', 'mean'],
'Sensor_Id': 'nunique'
}).round(2)
alarm_warning_by_group_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'Unique_Sensors_With_Alarm_Warning']
# Create a complete dataframe with all groups, including those with no alarm/warnings
alarm_warning_by_group = all_groups_df.join(alarm_warning_by_group_raw, how='left')
# Fill NaN values with 0 for groups with no alarm/warnings
alarm_warning_by_group = alarm_warning_by_group.fillna({
'Total_Alarm_Warning_Duration': 0,
'Alarm_Warning_Count': 0,
'Avg_Alarm_Warning_Duration': 0,
'Unique_Sensors_With_Alarm_Warning': 0
})
# Calculate downtime percentage based on the total time span across ALL groups
alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = (
(alarm_warning_by_group['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100
).round(4)
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'].apply(
lambda x: min(100.0, x)
)
alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = (
100 - alarm_warning_by_group['Alarm_Warning_Downtime_Percentage']
).round(4)
# Ensure uptime doesn't go below 0
alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'].apply(
lambda x: max(0, round(x, 4))
)
else:
error_by_group = pd.DataFrame()
alarm_warning_by_group = pd.DataFrame()
# Compile all results
uptime_results = {
'total_time_span_minutes': total_time_span_minutes,
'total_time_span_hours': total_time_span_minutes / 60,
'total_start_time': total_start_time,
'total_end_time': total_end_time,
# System-wide metrics
'error_downtime_minutes': total_error_duration,
'error_downtime_percentage': error_downtime_percentage,
'error_uptime_percentage': 100 - error_downtime_percentage,
'alarm_warning_downtime_minutes': total_alarm_warning_duration,
'alarm_warning_downtime_percentage': alarm_warning_downtime_percentage,
'alarm_warning_uptime_percentage': 100 - alarm_warning_downtime_percentage,
'combined_downtime_minutes': total_operational_duration,
'combined_downtime_percentage': combined_downtime_percentage,
'combined_uptime_percentage': 100 - combined_downtime_percentage,
# System-level metrics
'system_error_uptime_percentage': system_error_uptime_percentage,
'system_alarm_warning_uptime_percentage': system_alarm_warning_uptime_percentage,
# Per-sensor metrics
'error_by_sensor': error_by_sensor,
'alarm_warning_by_sensor': alarm_warning_by_sensor,
# Per-group metrics
'error_by_group': error_by_group,
'alarm_warning_by_group': alarm_warning_by_group
}
return uptime_results
def export_results(self, output_dir="output"):
"""
Export analysis results to CSV files
"""
print("\n--- EXPORTING RESULTS ---")
import os
os.makedirs(output_dir, exist_ok=True)
if self.processed_events is None or len(self.processed_events) == 0:
print("No processed events available for export. Run pair_events_and_calculate_durations first.")
return
# Filter resolved events for export
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) == 0:
print("No resolved events with duration data available for export.")
return
# 1. Export all paired events
paired_events_path = os.path.join(output_dir, "paired_alarm_events.csv")
duration_events.to_csv(paired_events_path, index=False)
print(f"Exported paired alarm events to: {paired_events_path}")
# 2. Export summary by alarm type
summary_by_type = duration_events.groupby('Alarm_Type').agg({
'Duration_Minutes': ['count', 'min', 'max', 'mean'],
'Sensor_Id': 'nunique'
}).round(2)
summary_by_type.columns = ['Event_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Unique_Sensors']
summary_by_type_path = os.path.join(output_dir, "summary_by_alarm_type.csv")
summary_by_type.to_csv(summary_by_type_path)
print(f"Exported summary by alarm type to: {summary_by_type_path}")
# 3. Export sensor statistics
sensor_stats = duration_events.groupby('Sensor_Id').agg({
'Alarm_Type': ['count', 'nunique'],
'Duration_Minutes': ['min', 'max', 'mean'],
'Start_Time': ['min', 'max']
}).round(2)
sensor_stats.columns = ['Total_Alarm_Count', 'Alarm_Type_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'First_Alarm', 'Last_Alarm']
sensor_stats = sensor_stats.sort_values('Total_Alarm_Count', ascending=False)
# Add Group and Name information to sensor statistics
if self.sensor_mapping:
sensor_stats['Sensor_Name'] = sensor_stats.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
sensor_stats['Sensor_Group'] = sensor_stats.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in sensor_stats.columns if col not in ['Sensor_Name', 'Sensor_Group']]
sensor_stats = sensor_stats[cols]
sensor_stats_path = os.path.join(output_dir, "sensor_statistics.csv")
sensor_stats.to_csv(sensor_stats_path)
print(f"Exported sensor statistics to: {sensor_stats_path}")
# 4. Export top sensors by various metrics
# Top sensors by alarm count
top_by_count = duration_events['Sensor_Id'].value_counts().to_frame('Alarm_Count')
# Add Group and Name information to top sensors by alarm count
if self.sensor_mapping:
top_by_count['Sensor_Name'] = top_by_count.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
top_by_count['Sensor_Group'] = top_by_count.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group', 'Alarm_Count']
top_by_count = top_by_count[cols]
top_by_count_path = os.path.join(output_dir, "top_sensors_by_alarm_count.csv")
top_by_count.to_csv(top_by_count_path)
print(f"Exported top sensors by alarm count to: {top_by_count_path}")
# Top sensors by average duration
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration')
# Add Group and Name information to top sensors by average duration
if self.sensor_mapping:
avg_duration_by_sensor['Sensor_Name'] = avg_duration_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
avg_duration_by_sensor['Sensor_Group'] = avg_duration_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group', 'Avg_Duration']
avg_duration_by_sensor = avg_duration_by_sensor[cols]
avg_duration_path = os.path.join(output_dir, "top_sensors_by_avg_duration.csv")
avg_duration_by_sensor.to_csv(avg_duration_path)
print(f"Exported top sensors by average duration to: {avg_duration_path}")
# Top sensors by max duration
max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration')
# Add Group and Name information to top sensors by max duration
if self.sensor_mapping:
max_duration_by_sensor['Sensor_Name'] = max_duration_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
max_duration_by_sensor['Sensor_Group'] = max_duration_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group', 'Max_Duration']
max_duration_by_sensor = max_duration_by_sensor[cols]
max_duration_path = os.path.join(output_dir, "top_sensors_by_max_duration.csv")
max_duration_by_sensor.to_csv(max_duration_path)
print(f"Exported top sensors by max duration to: {max_duration_path}")
# Top sensors by total severity score
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
duration_events['Severity_Score'] = duration_events.apply(
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
)
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score')
# Add Group and Name information to top sensors by severity score
if self.sensor_mapping:
severity_by_sensor['Sensor_Name'] = severity_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
)
severity_by_sensor['Sensor_Group'] = severity_by_sensor.index.map(
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
)
# Reorder columns to put Sensor_Id, Name, Group first
cols = ['Sensor_Name', 'Sensor_Group', 'Total_Severity_Score']
severity_by_sensor = severity_by_sensor[cols]
severity_path = os.path.join(output_dir, "top_sensors_by_severity_score.csv")
severity_by_sensor.to_csv(severity_path)
print(f"Exported top sensors by severity score to: {severity_path}")
# 5. Export time-based analysis
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
# Hourly frequency
hourly_freq = duration_events.groupby('Start_Hour').size().to_frame('Alarm_Count')
hourly_path = os.path.join(output_dir, "alarm_frequency_by_hour.csv")
hourly_freq.to_csv(hourly_path)
print(f"Exported alarm frequency by hour to: {hourly_path}")
# Daily frequency
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0).to_frame('Alarm_Count')
daily_path = os.path.join(output_dir, "alarm_frequency_by_day.csv")
daily_freq.to_csv(daily_path)
print(f"Exported alarm frequency by day to: {daily_path}")
# 6. Export group-based analysis if sensor groups are available
if 'Sensor_Group' in duration_events.columns:
print("\nExporting group-based analysis...")
# Group statistics
group_stats = duration_events.groupby('Sensor_Group').agg({
'Sensor_Id': ['count', 'nunique'],
'Duration_Minutes': ['min', 'max', 'mean', 'sum'],
'Start_Time': ['min', 'max']
}).round(2)
group_stats.columns = ['Total_Alarm_Count', 'Unique_Sensors', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Total_Duration', 'First_Alarm', 'Last_Alarm']
group_stats = group_stats.sort_values('Total_Alarm_Count', ascending=False)
# Calculate total sensors per group from sensor report
if self.sensor_data is not None and 'Group' in self.sensor_data.columns:
# Get the total number of sensors in each group from the sensor report
# Process the sensor data to handle hierarchical structure properly
processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data)
# Count unique sensors per group (where ID is not null)
sensor_counts_by_group = processed_sensor_data[processed_sensor_data['ID'].notna()].groupby('Group')['ID'].nunique().fillna(0).astype(int)
# Add Total_Sensors_In_Group column
group_stats['Total_Sensors_In_Group'] = group_stats.index.map(
lambda x: sensor_counts_by_group.get(x, 0) if x != 'Unknown' else
len(processed_sensor_data[((processed_sensor_data['Group'].isna()) | (processed_sensor_data['Group'] == 'Unknown')) & (processed_sensor_data['ID'].notna())])
).fillna(0).astype(int)
# Calculate percentage of monitoring points that experienced alarms
# Avoid division by zero
group_stats['Percentage_Monitoring_Points_Alarmed'] = (
(group_stats['Unique_Sensors'] / group_stats['Total_Sensors_In_Group']) * 100
).round(2)
group_stats['Percentage_Monitoring_Points_Alarmed'] = group_stats['Percentage_Monitoring_Points_Alarmed'].fillna(0).replace([np.inf, -np.inf], 0)
# Calculate alarm time percentage for each group
# Get the overall time range from alarm data
first_alarm_overall = self.alarm_data['Date'].min()
last_alarm_overall = self.alarm_data['Date'].max()
if pd.notna(first_alarm_overall) and pd.notna(last_alarm_overall):
total_time_span_hours = (last_alarm_overall - first_alarm_overall).total_seconds() / 3600.0
# Calculate the percentage of total possible sensor-hours that were in alarm
# Total possible sensor-hours = total sensors in group * total time span
total_possible_sensor_hours = group_stats['Total_Sensors_In_Group'] * total_time_span_hours
# Actual alarm-hours = total alarm duration in hours
actual_alarm_hours = group_stats['Total_Duration'] / 60.0 # Convert minutes to hours
# Calculate percentage
group_stats['Alarm_Time_Percentage'] = (
(actual_alarm_hours / total_possible_sensor_hours) * 100
).round(2)
group_stats['Alarm_Time_Percentage'] = group_stats['Alarm_Time_Percentage'].fillna(0).replace([np.inf, -np.inf], 0)
else:
group_stats['Alarm_Time_Percentage'] = 0.0
group_stats_path = os.path.join(output_dir, "group_statistics.csv")
group_stats.to_csv(group_stats_path)
print(f"Exported group statistics to: {group_stats_path}")
# Top groups by various metrics
# Top groups by alarm count
top_groups_by_count = duration_events['Sensor_Group'].value_counts().to_frame('Alarm_Count')
top_groups_count_path = os.path.join(output_dir, "top_groups_by_alarm_count.csv")
top_groups_by_count.to_csv(top_groups_count_path)
print(f"Exported top groups by alarm count to: {top_groups_count_path}")
# Top groups by average duration
avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration')
avg_duration_group_path = os.path.join(output_dir, "top_groups_by_avg_duration.csv")
avg_duration_by_group.to_csv(avg_duration_group_path)
print(f"Exported top groups by average duration to: {avg_duration_group_path}")
# Top groups by max duration
max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration')
max_duration_group_path = os.path.join(output_dir, "top_groups_by_max_duration.csv")
max_duration_by_group.to_csv(max_duration_group_path)
print(f"Exported top groups by max duration to: {max_duration_group_path}")
# Top groups by total severity score
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score')
severity_group_path = os.path.join(output_dir, "top_groups_by_severity_score.csv")
severity_by_group.to_csv(severity_group_path)
print(f"Exported top groups by severity score to: {severity_group_path}")
# Alarm type distribution by group
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
alarm_type_group_path = os.path.join(output_dir, "alarm_type_distribution_by_group.csv")
alarm_type_by_group.to_csv(alarm_type_group_path)
print(f"Exported alarm type distribution by group to: {alarm_type_group_path}")
print(f"\nAll results exported to '{output_dir}' directory successfully!")
def export_uptime_metrics(self, output_dir="output", uptime_results=None):
"""
Export uptime/downtime metrics to new output files
"""
print("\n--- EXPORTING UPTIME/DOWNTIME METRICS ---")
import os
os.makedirs(output_dir, exist_ok=True)
# Calculate uptime metrics if not provided
if uptime_results is None:
uptime_results = self.calculate_uptime_metrics()
if uptime_results is None:
print("No uptime results to export.")
return
# Export system-wide summary
summary_data = {
'Metric': [
'Total_Time_Span_Minutes',
'Total_Time_Span_Hours',
'Total_Start_Time',
'Total_End_Time',
'Total_Error_Duration_Minutes',
'Error_Downtime_Percentage',
'Error_Uptime_Percentage',
'Total_Alarm_Warning_Duration_Minutes',
'Alarm_Warning_Downtime_Percentage',
'Alarm_Warning_Uptime_Percentage',
'Total_Combined_Duration_Minutes',
'Combined_Downtime_Percentage',
'Combined_Uptime_Percentage',
'System_Error_Uptime_Percentage_Time_Based',
'System_Alarm_Warning_Uptime_Percentage_Time_Based'
],
'Value': [
uptime_results['total_time_span_minutes'],
uptime_results['total_time_span_hours'],
uptime_results['total_start_time'],
uptime_results['total_end_time'],
uptime_results['error_downtime_minutes'],
uptime_results['error_downtime_percentage'],
uptime_results['error_uptime_percentage'],
uptime_results['alarm_warning_downtime_minutes'],
uptime_results['alarm_warning_downtime_percentage'],
uptime_results['alarm_warning_uptime_percentage'],
uptime_results['combined_downtime_minutes'],
uptime_results['combined_downtime_percentage'],
uptime_results['combined_uptime_percentage'],
uptime_results.get('system_error_uptime_percentage', 'N/A'),
uptime_results.get('system_alarm_warning_uptime_percentage', 'N/A')
]
}
summary_df = pd.DataFrame(summary_data)
summary_path = os.path.join(output_dir, "system_uptime_summary.csv")
summary_df.to_csv(summary_path, index=False)
print(f"Exported system uptime summary to: {summary_path}")
# Export per-sensor error metrics
if not uptime_results['error_by_sensor'].empty:
error_sensor_path = os.path.join(output_dir, "sensor_error_uptime_metrics.csv")
uptime_results['error_by_sensor'].to_csv(error_sensor_path)
print(f"Exported per-sensor error uptime metrics to: {error_sensor_path}")
# Export per-sensor alarm/warning metrics
if not uptime_results['alarm_warning_by_sensor'].empty:
alarm_warning_sensor_path = os.path.join(output_dir, "sensor_alarm_warning_uptime_metrics.csv")
uptime_results['alarm_warning_by_sensor'].to_csv(alarm_warning_sensor_path)
print(f"Exported per-sensor alarm/warning uptime metrics to: {alarm_warning_sensor_path}")
# Export per-group error metrics
if not uptime_results['error_by_group'].empty:
error_group_path = os.path.join(output_dir, "group_error_uptime_metrics.csv")
# Reset index to make Sensor_Group a regular column
error_by_group_df = uptime_results['error_by_group'].reset_index()
# Make sure the index column is named properly
error_by_group_df.columns = ['Sensor_Group'] + list(error_by_group_df.columns[1:])
error_by_group_df.to_csv(error_group_path, index=False)
print(f"Exported per-group error uptime metrics to: {error_group_path}")
# Export per-group alarm/warning metrics
if not uptime_results['alarm_warning_by_group'].empty:
alarm_warning_group_path = os.path.join(output_dir, "group_alarm_warning_uptime_metrics.csv")
# Reset index to make Sensor_Group a regular column
alarm_warning_by_group_df = uptime_results['alarm_warning_by_group'].reset_index()
# Make sure the index column is named properly
alarm_warning_by_group_df.columns = ['Sensor_Group'] + list(alarm_warning_by_group_df.columns[1:])
alarm_warning_by_group_df.to_csv(alarm_warning_group_path, index=False)
print(f"Exported per-group alarm/warning uptime metrics to: {alarm_warning_group_path}")
print(f"\nUptime/downtime metrics exported to '{output_dir}' directory successfully!")
# Example usage
if __name__ == "__main__":
# Define file paths
csv_file = "CardinalAlarmsDec25.csv"
xlsx_file = "SensorReport Cardinal 2025-12-23_processed.xlsx" # Updated to the new file name
exclusion_file = "exclusion_config.json" # Optional: specify groups to exclude
# Create analyzer instance with exclusion file
analyzer = AlarmAnalyzer(csv_file, xlsx_file, exclusion_file_path=exclusion_file)
# Load data
alarm_data, sensor_data = analyzer.load_data()
# Categorize alarms
categorized_data = analyzer.categorize_alarms()
print("\nFirst few rows of categorized data:")
print(categorized_data.head())
# Pair events and calculate durations
paired_events = analyzer.pair_events_and_calculate_durations()
print("\nFirst few rows of paired events:")
print(paired_events.head())
# Perform basic analysis
basic_results = analyzer.basic_analysis()
# Perform advanced analysis
advanced_results = analyzer.advanced_analysis()
# Create visualizations
analyzer.create_visualizations(save_plots=True)
# Perform uptime analysis
uptime_results = analyzer.calculate_uptime_metrics()
# Export results
analyzer.export_results(output_dir="output")
# Export uptime metrics to new files
analyzer.export_uptime_metrics(output_dir="output", uptime_results=uptime_results)