Python project for analyzing alarm data from building monitoring systems. Includes alarm analyzer, plotting, tests, and source data files.
1884 lines
94 KiB
Python
1884 lines
94 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
import re
|
|
from datetime import datetime
|
|
from openpyxl import load_workbook
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Import matplotlib and seaborn only when needed for visualizations
|
|
def _import_viz_libs():
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
return plt, sns
|
|
|
|
class AlarmAnalyzer:
|
|
def __init__(self, csv_file_path, xlsx_file_path=None, exclusion_file_path=None):
|
|
"""
|
|
Initialize the Alarm Analyzer with CSV and optional XLSX file paths
|
|
"""
|
|
self.csv_file_path = csv_file_path
|
|
self.xlsx_file_path = xlsx_file_path
|
|
self.exclusion_file_path = exclusion_file_path
|
|
self.excluded_groups = set()
|
|
self.alarm_data = None
|
|
self.sensor_data = None
|
|
self.processed_events = None
|
|
self.sensor_mapping = None
|
|
|
|
# Load excluded groups if exclusion file is provided
|
|
if self.exclusion_file_path:
|
|
self.load_excluded_groups()
|
|
|
|
def load_excluded_groups(self):
|
|
"""
|
|
Load groups to exclude from analysis from a configuration file
|
|
Supports both JSON format: {"excluded_groups": ["Group1", "Group2"]}
|
|
and simple text format: one group name per line
|
|
"""
|
|
import json
|
|
import os
|
|
|
|
if not os.path.exists(self.exclusion_file_path):
|
|
print(f"Warning: Exclusion file {self.exclusion_file_path} does not exist. No groups will be excluded.")
|
|
return
|
|
|
|
print(f"Loading excluded groups from {self.exclusion_file_path}...")
|
|
|
|
try:
|
|
# Try to parse as JSON first
|
|
with open(self.exclusion_file_path, 'r') as f:
|
|
content = f.read().strip()
|
|
|
|
# Check if it's a JSON file by attempting to parse
|
|
if content.startswith('{') or content.startswith('['):
|
|
# It's a JSON file
|
|
with open(self.exclusion_file_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
if isinstance(config, dict) and 'excluded_groups' in config:
|
|
self.excluded_groups = set(config['excluded_groups'])
|
|
elif isinstance(config, list):
|
|
# If the JSON is just an array of group names
|
|
self.excluded_groups = set(config)
|
|
else:
|
|
print(f"Warning: Invalid JSON format in {self.exclusion_file_path}. Expected object with 'excluded_groups' key or array of group names.")
|
|
return
|
|
else:
|
|
# It's a text file - read line by line
|
|
with open(self.exclusion_file_path, 'r') as f:
|
|
groups = [line.strip() for line in f if line.strip()]
|
|
self.excluded_groups = set(groups)
|
|
|
|
print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}")
|
|
|
|
except json.JSONDecodeError:
|
|
# If JSON parsing fails, treat as text file
|
|
print(f"JSON parsing failed, treating {self.exclusion_file_path} as text file...")
|
|
try:
|
|
with open(self.exclusion_file_path, 'r') as f:
|
|
groups = [line.strip() for line in f if line.strip()]
|
|
self.excluded_groups = set(groups)
|
|
print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}")
|
|
except Exception as e:
|
|
print(f"Error reading exclusion file: {e}")
|
|
except Exception as e:
|
|
print(f"Error loading exclusion file: {e}")
|
|
|
|
def load_data(self):
|
|
"""
|
|
Load alarm data from CSV and sensor descriptions from XLSX
|
|
"""
|
|
print("Loading alarm data from CSV...")
|
|
self.alarm_data = pd.read_csv(self.csv_file_path)
|
|
|
|
# Convert Date and LogTime to datetime
|
|
self.alarm_data['Date'] = pd.to_datetime(self.alarm_data['Date'])
|
|
self.alarm_data['LogTime'] = pd.to_datetime(self.alarm_data['LogTime'])
|
|
|
|
print(f"Loaded {len(self.alarm_data)} alarm records")
|
|
print(f"Date range: {self.alarm_data['Date'].min()} to {self.alarm_data['Date'].max()}")
|
|
|
|
# Load sensor descriptions if XLSX file is provided
|
|
if self.xlsx_file_path:
|
|
print("Loading sensor descriptions from XLSX...")
|
|
try:
|
|
# Read the sensor report - check if it's the new format (header=0) or old format (header=4)
|
|
# First, try to read with header=0 (new format)
|
|
temp_df = pd.read_excel(self.xlsx_file_path, header=0, nrows=5)
|
|
|
|
# Check if the first row contains expected column names (new format)
|
|
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
|
|
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
|
|
|
|
if has_expected_cols:
|
|
# New format - use header=0
|
|
self.sensor_data = pd.read_excel(self.xlsx_file_path, header=0)
|
|
print("Detected new sensor report format (header=0)")
|
|
# For new format, no hierarchical processing needed
|
|
processed_sensor_data = self.sensor_data
|
|
else:
|
|
# Old format - use header=4 with hierarchical processing
|
|
self.sensor_data = pd.read_excel(self.xlsx_file_path, header=4)
|
|
print("Detected old sensor report format (header=4)")
|
|
# Process the sensor data to handle hierarchical structure where group names
|
|
# apply to all rows below until the next group name
|
|
processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data)
|
|
|
|
print(f"Loaded sensor data with {len(self.sensor_data)} records")
|
|
print(f"Sensor data columns: {list(self.sensor_data.columns)}")
|
|
|
|
# Create a mapping from Sensor_Id to sensor details
|
|
# The 'ID' column appears to have numeric values that could match Sensor_Id in alarm data
|
|
if 'ID' in self.sensor_data.columns:
|
|
|
|
# Create a mapping from ID to other details
|
|
self.sensor_mapping = {}
|
|
for _, row in processed_sensor_data.iterrows():
|
|
sensor_id_raw = row['ID']
|
|
if pd.notna(sensor_id_raw): # Only map non-null values
|
|
# Convert to int if it's numeric to match the alarm data
|
|
try:
|
|
sensor_id = int(sensor_id_raw)
|
|
except (ValueError, TypeError):
|
|
continue # Skip if conversion fails
|
|
|
|
self.sensor_mapping[sensor_id] = {
|
|
'name': row['Name'] if pd.notna(row['Name']) else
|
|
(row['Remote'] if pd.notna(row['Remote']) else 'Unknown'),
|
|
'group': row['Group'] if pd.notna(row['Group']) else 'Unknown',
|
|
'type': row['Type'] if pd.notna(row['Type']) else 'Unknown',
|
|
'serial_no': row['Serial No'] if pd.notna(row['Serial No']) else 'Unknown'
|
|
}
|
|
|
|
print(f"Created sensor mapping for {len(self.sensor_mapping)} sensors")
|
|
|
|
# Add sensor information to alarm data
|
|
self.add_sensor_info_to_alarms()
|
|
|
|
# Filter out excluded groups if any are specified
|
|
self.filter_excluded_groups()
|
|
|
|
# Log summary of excluded groups
|
|
if self.excluded_groups:
|
|
print(f"Excluded groups summary: {len(self.excluded_groups)} groups were excluded from analysis")
|
|
print(f"Excluded groups: {', '.join(sorted(self.excluded_groups))}")
|
|
else:
|
|
print("Warning: 'ID' column not found in sensor report. Attempting to find alternative mapping...")
|
|
# Try to find other potential ID columns that might match Sensor_Id
|
|
for col in self.sensor_data.columns:
|
|
if 'SN' in str(col) or 'Remote' in str(col) or 'Id' in str(col):
|
|
print(f"Found potential ID column: {col}")
|
|
|
|
self.sensor_mapping = {}
|
|
|
|
except Exception as e:
|
|
print(f"Could not load XLSX file: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
self.sensor_data = None
|
|
self.sensor_mapping = {}
|
|
else:
|
|
print("No XLSX file provided for sensor descriptions")
|
|
self.sensor_data = None
|
|
self.sensor_mapping = {}
|
|
|
|
return self.alarm_data, self.sensor_data
|
|
|
|
def _process_hierarchical_sensor_data(self, sensor_df):
|
|
"""
|
|
Process sensor data to handle hierarchical structure where group names
|
|
apply to all rows below until the next group name is specified.
|
|
Same logic applies to other columns that follow this pattern.
|
|
"""
|
|
# Make a copy to avoid modifying the original dataframe
|
|
df = sensor_df.copy()
|
|
|
|
# Forward fill hierarchical columns to propagate values to empty cells below
|
|
# This handles the case where a group name applies to all rows below it
|
|
hierarchical_cols = ['Group', 'Remote', 'Name', 'Type', 'Serial No']
|
|
|
|
for col in hierarchical_cols:
|
|
if col in df.columns:
|
|
# Forward fill: propagate non-null values down until the next non-null value
|
|
df[col] = df[col].ffill()
|
|
|
|
return df
|
|
|
|
def add_sensor_info_to_alarms(self):
|
|
"""
|
|
Add sensor information (Name, Group, Type) to alarm data using the sensor mapping
|
|
"""
|
|
if not self.sensor_mapping:
|
|
print("No sensor mapping available, skipping sensor info addition")
|
|
return
|
|
|
|
# Add sensor name, group, and type to alarm data
|
|
self.alarm_data['Sensor_Name'] = self.alarm_data['Sensor_Id'].map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
self.alarm_data['Sensor_Group'] = self.alarm_data['Sensor_Id'].map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
self.alarm_data['Sensor_Type'] = self.alarm_data['Sensor_Id'].map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('type', 'Unknown')
|
|
)
|
|
|
|
print(f"Added sensor information to {len(self.alarm_data)} alarm records")
|
|
print(f"Found sensor names for {self.alarm_data['Sensor_Name'].nunique()} unique sensors")
|
|
print(f"Found sensor groups for {self.alarm_data['Sensor_Group'].nunique()} unique sensors")
|
|
|
|
def filter_excluded_groups(self):
|
|
"""
|
|
Filter out alarm data for excluded groups
|
|
"""
|
|
if not self.excluded_groups:
|
|
print("No groups to exclude, skipping filtering")
|
|
return
|
|
|
|
initial_count = len(self.alarm_data)
|
|
|
|
# Filter out rows where Sensor_Group is in excluded_groups
|
|
self.alarm_data = self.alarm_data[~self.alarm_data['Sensor_Group'].isin(self.excluded_groups)]
|
|
|
|
final_count = len(self.alarm_data)
|
|
excluded_count = initial_count - final_count
|
|
|
|
print(f"Filtered out {excluded_count} alarm records from excluded groups: {list(self.excluded_groups)}")
|
|
print(f"Remaining alarm records: {final_count}")
|
|
|
|
def parse_alarm_description(self, description):
|
|
"""
|
|
Parse alarm description to extract alarm type, value, threshold, and unit
|
|
"""
|
|
if pd.isna(description):
|
|
return {'type': 'Unknown', 'value': None, 'threshold': None, 'unit': None}
|
|
|
|
desc = str(description).strip()
|
|
|
|
# Determine alarm type
|
|
if 'Error' in desc:
|
|
alarm_type = 'Error'
|
|
elif 'Alarm' in desc and 'Warning' not in desc:
|
|
alarm_type = 'Alarm'
|
|
elif 'Warning' in desc:
|
|
alarm_type = 'Warning'
|
|
elif 'Normal' in desc:
|
|
alarm_type = 'Normal'
|
|
else:
|
|
alarm_type = 'Other'
|
|
|
|
# Extract value and threshold using regex
|
|
value = None
|
|
threshold = None
|
|
unit = None
|
|
|
|
# Pattern to match values in descriptions like "Hi Alarm: 51.3>=46.0F"
|
|
value_pattern = r'([+-]?\d*\.?\d+)'
|
|
unit_pattern = r'([CF%RH"|]+|min\.|%)'
|
|
|
|
# Extract all numeric values
|
|
numeric_matches = re.findall(value_pattern, desc)
|
|
numeric_values = [float(x) for x in numeric_matches if x]
|
|
|
|
# Extract unit
|
|
unit_match = re.search(unit_pattern, desc)
|
|
if unit_match:
|
|
unit = unit_match.group(1)
|
|
|
|
# Determine value and threshold based on alarm type
|
|
if alarm_type == 'Normal':
|
|
# For Normal events, the value is the current reading
|
|
if len(numeric_values) >= 1:
|
|
value = numeric_values[0]
|
|
elif alarm_type in ['Alarm', 'Warning', 'Error']:
|
|
# For alarm events, we typically have both current value and threshold
|
|
if len(numeric_values) >= 2:
|
|
value = numeric_values[0]
|
|
threshold = numeric_values[1]
|
|
elif len(numeric_values) == 1:
|
|
# Sometimes only threshold is provided
|
|
threshold = numeric_values[0]
|
|
|
|
return {
|
|
'type': alarm_type,
|
|
'value': value,
|
|
'threshold': threshold,
|
|
'unit': unit
|
|
}
|
|
|
|
def categorize_alarms(self):
|
|
"""
|
|
Add parsed alarm information to the dataset
|
|
"""
|
|
print("Categorizing alarms...")
|
|
|
|
# Apply parsing to each description
|
|
parsed_data = self.alarm_data['Description'].apply(self.parse_alarm_description)
|
|
|
|
# Create new columns for parsed information
|
|
self.alarm_data['AlarmType'] = parsed_data.apply(lambda x: x['type'])
|
|
self.alarm_data['Value'] = parsed_data.apply(lambda x: x['value'])
|
|
self.alarm_data['Threshold'] = parsed_data.apply(lambda x: x['threshold'])
|
|
self.alarm_data['Unit'] = parsed_data.apply(lambda x: x['unit'])
|
|
|
|
# Count alarm types
|
|
alarm_counts = self.alarm_data['AlarmType'].value_counts()
|
|
print("Alarm type distribution:")
|
|
for alarm_type, count in alarm_counts.items():
|
|
print(f" {alarm_type}: {count}")
|
|
|
|
# Add sensor name, group, and type if not already added
|
|
if 'Sensor_Name' not in self.alarm_data.columns and self.sensor_mapping:
|
|
self.add_sensor_info_to_alarms()
|
|
|
|
return self.alarm_data
|
|
|
|
def pair_events_and_calculate_durations(self):
|
|
"""
|
|
Pair alarm start events with corresponding end events and calculate durations
|
|
Updated to handle transitions between different alarm conditions (e.g., warning -> alarm)
|
|
"""
|
|
print("Pairing alarm events and calculating durations...")
|
|
|
|
# Sort the data by Sensor_Id and Date to ensure proper chronological order
|
|
self.alarm_data = self.alarm_data.sort_values(['Sensor_Id', 'Date']).reset_index(drop=True)
|
|
|
|
# Create a new DataFrame to store paired events
|
|
paired_events = []
|
|
|
|
# Group by Sensor_Id to process each sensor separately
|
|
for sensor_id in self.alarm_data['Sensor_Id'].unique():
|
|
sensor_data = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].copy()
|
|
sensor_data = sensor_data.sort_values('Date').reset_index(drop=True)
|
|
|
|
# Find alarm start events (Alarm, Warning, Error) and pair with next Normal event or different alarm condition
|
|
i = 0
|
|
while i < len(sensor_data):
|
|
current_event = sensor_data.iloc[i]
|
|
|
|
# Check if current event is an alarm start (not Normal or Other)
|
|
if current_event['AlarmType'] in ['Alarm', 'Warning', 'Error']:
|
|
start_time = current_event['Date']
|
|
start_type = current_event['AlarmType']
|
|
start_description = current_event['Description']
|
|
start_value = current_event['Value']
|
|
start_threshold = current_event['Threshold']
|
|
start_alarm_id = current_event['Alarm_Id']
|
|
|
|
# Get sensor information for the current sensor
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', 'Unknown')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_type = sensor_info.get('type', 'Unknown')
|
|
|
|
# Look for the next Normal event OR a different alarm condition for this sensor
|
|
j = i + 1
|
|
end_found = False
|
|
while j < len(sensor_data):
|
|
next_event = sensor_data.iloc[j]
|
|
|
|
# End condition 1: Normal event ends any alarm condition
|
|
if next_event['AlarmType'] == 'Normal':
|
|
end_time = next_event['Date']
|
|
end_description = next_event['Description']
|
|
end_value = next_event['Value']
|
|
end_alarm_id = next_event['Alarm_Id']
|
|
|
|
# Calculate duration
|
|
duration = end_time - start_time
|
|
duration_minutes = duration.total_seconds() / 60.0
|
|
|
|
# Add to paired events
|
|
paired_events.append({
|
|
'Sensor_Id': sensor_id,
|
|
'Sensor_Name': sensor_name,
|
|
'Sensor_Group': sensor_group,
|
|
'Sensor_Type': sensor_type,
|
|
'Start_Time': start_time,
|
|
'End_Time': end_time,
|
|
'Duration_Minutes': duration_minutes,
|
|
'Alarm_Type': start_type,
|
|
'Start_Description': start_description,
|
|
'End_Description': end_description,
|
|
'Start_Value': start_value,
|
|
'Threshold': start_threshold,
|
|
'End_Value': end_value,
|
|
'Start_Alarm_Id': start_alarm_id,
|
|
'End_Alarm_Id': end_alarm_id,
|
|
'End_Reason': 'Normal'
|
|
})
|
|
|
|
# Move index to the end event and break to find next start event
|
|
i = j
|
|
end_found = True
|
|
break
|
|
# End condition 2: Different alarm condition (transition from warning to alarm, etc.)
|
|
elif next_event['AlarmType'] in ['Alarm', 'Warning', 'Error']:
|
|
# If the next event is a different alarm condition, end the current one
|
|
# Calculate duration up to the next alarm condition
|
|
end_time = next_event['Date']
|
|
end_description = next_event['Description']
|
|
end_value = next_event['Value']
|
|
end_alarm_id = next_event['Alarm_Id']
|
|
|
|
# Calculate duration
|
|
duration = end_time - start_time
|
|
duration_minutes = duration.total_seconds() / 60.0
|
|
|
|
# Add to paired events
|
|
paired_events.append({
|
|
'Sensor_Id': sensor_id,
|
|
'Sensor_Name': sensor_name,
|
|
'Sensor_Group': sensor_group,
|
|
'Sensor_Type': sensor_type,
|
|
'Start_Time': start_time,
|
|
'End_Time': end_time,
|
|
'Duration_Minutes': duration_minutes,
|
|
'Alarm_Type': start_type,
|
|
'Start_Description': start_description,
|
|
'End_Description': end_description,
|
|
'Start_Value': start_value,
|
|
'Threshold': start_threshold,
|
|
'End_Value': end_value,
|
|
'Start_Alarm_Id': start_alarm_id,
|
|
'End_Alarm_Id': end_alarm_id,
|
|
'End_Reason': f'Transition to {next_event["AlarmType"]}'
|
|
})
|
|
|
|
# Move index to the next event (the new alarm condition becomes the start)
|
|
i = j
|
|
end_found = True
|
|
break
|
|
j += 1
|
|
|
|
# If no corresponding end event (Normal or different alarm condition) was found, record as unresolved
|
|
if not end_found:
|
|
duration = None
|
|
paired_events.append({
|
|
'Sensor_Id': sensor_id,
|
|
'Sensor_Name': sensor_name,
|
|
'Sensor_Group': sensor_group,
|
|
'Sensor_Type': sensor_type,
|
|
'Start_Time': start_time,
|
|
'End_Time': None,
|
|
'Duration_Minutes': None,
|
|
'Alarm_Type': start_type,
|
|
'Start_Description': start_description,
|
|
'End_Description': None,
|
|
'Start_Value': start_value,
|
|
'Threshold': start_threshold,
|
|
'End_Value': None,
|
|
'Start_Alarm_Id': start_alarm_id,
|
|
'End_Alarm_Id': None,
|
|
'End_Reason': 'Unresolved'
|
|
})
|
|
|
|
i += 1
|
|
|
|
# Convert to DataFrame
|
|
self.processed_events = pd.DataFrame(paired_events)
|
|
|
|
print(f"Paired {len(self.processed_events)} events")
|
|
if len(self.processed_events) > 0:
|
|
print(f"Events with duration: {len(self.processed_events[self.processed_events['Duration_Minutes'].notna()])}")
|
|
print(f"Unresolved events: {len(self.processed_events[self.processed_events['Duration_Minutes'].isna()])}")
|
|
|
|
return self.processed_events
|
|
|
|
def basic_analysis(self):
|
|
"""
|
|
Perform basic analysis: counts, min/max/average durations by sensor and alarm type
|
|
"""
|
|
print("Performing basic analysis...")
|
|
|
|
if self.processed_events is None or len(self.processed_events) == 0:
|
|
print("No processed events available for analysis. Run pair_events_and_calculate_durations first.")
|
|
return
|
|
|
|
# Analysis by alarm type and sensor
|
|
print("\n--- ALARM COUNTS BY TYPE AND SENSOR ---")
|
|
|
|
# Count events by alarm type and sensor
|
|
count_by_type_sensor = self.processed_events.groupby(['Alarm_Type', 'Sensor_Id']).size().reset_index(name='Count')
|
|
|
|
# Get top sensors by alarm count
|
|
count_by_sensor = self.processed_events.groupby(['Sensor_Id', 'Alarm_Type']).size().reset_index(name='Count')
|
|
print("\nTop 10 sensors with most alarms:")
|
|
sensor_totals = count_by_sensor.groupby('Sensor_Id')['Count'].sum().sort_values(ascending=False)
|
|
print(sensor_totals.head(10))
|
|
|
|
# If sensor groups are available, also analyze by group
|
|
if 'Sensor_Group' in self.processed_events.columns:
|
|
print("\n--- ALARM COUNTS BY GROUP ---")
|
|
count_by_group = self.processed_events.groupby(['Alarm_Type', 'Sensor_Group']).size().reset_index(name='Count')
|
|
print("\nTop 10 groups with most alarms:")
|
|
group_totals = self.processed_events.groupby('Sensor_Group')['Sensor_Id'].count().sort_values(ascending=False)
|
|
print(group_totals.head(10))
|
|
|
|
print("\n--- DURATION ANALYSIS ---")
|
|
|
|
# Filter out unresolved events for duration analysis
|
|
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) > 0:
|
|
# Calculate min, max, avg duration by alarm type and sensor
|
|
duration_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Id'])['Duration_Minutes'].agg([
|
|
'count', 'min', 'max', 'mean'
|
|
]).round(2).reset_index()
|
|
|
|
print("\nDuration statistics by alarm type and sensor (top 10 by count):")
|
|
print(duration_stats.sort_values('count', ascending=False).head(10))
|
|
|
|
# Calculate overall statistics by alarm type
|
|
overall_duration_stats = duration_events.groupby('Alarm_Type')['Duration_Minutes'].agg([
|
|
'count', 'min', 'max', 'mean'
|
|
]).round(2)
|
|
|
|
print("\nOverall duration statistics by alarm type:")
|
|
print(overall_duration_stats)
|
|
|
|
# Calculate statistics for all sensors combined
|
|
all_sensor_stats = duration_events.groupby('Sensor_Id')['Duration_Minutes'].agg([
|
|
'count', 'min', 'max', 'mean'
|
|
]).round(2).sort_values('count', ascending=False)
|
|
|
|
print("\nTop 10 sensors by alarm count with duration stats:")
|
|
print(all_sensor_stats.head(10))
|
|
|
|
# If sensor groups are available, calculate group statistics
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
print("\n--- GROUP-BASED DURATION ANALYSIS ---")
|
|
|
|
# Calculate statistics by group
|
|
group_duration_stats = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([
|
|
'count', 'min', 'max', 'mean'
|
|
]).round(2).sort_values('count', ascending=False)
|
|
|
|
print("\nTop 10 groups by alarm count with duration stats:")
|
|
print(group_duration_stats.head(10))
|
|
|
|
# Calculate statistics by alarm type and group
|
|
type_group_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Group'])['Duration_Minutes'].agg([
|
|
'count', 'min', 'max', 'mean'
|
|
]).round(2).sort_values('count', ascending=False)
|
|
|
|
print("\nTop 10 alarm type and group combinations by count:")
|
|
print(type_group_stats.head(10))
|
|
|
|
return {
|
|
'count_by_type_sensor': count_by_type_sensor,
|
|
'count_by_sensor': count_by_sensor,
|
|
'count_by_group': count_by_group if 'Sensor_Group' in self.processed_events.columns else pd.DataFrame(),
|
|
'duration_stats': duration_stats,
|
|
'overall_duration_stats': overall_duration_stats,
|
|
'all_sensor_stats': all_sensor_stats,
|
|
'group_duration_stats': group_duration_stats if 'Sensor_Group' in duration_events.columns else pd.DataFrame()
|
|
}
|
|
else:
|
|
print("No resolved events with duration data available for analysis.")
|
|
return None
|
|
|
|
def advanced_analysis(self):
|
|
"""
|
|
Perform advanced analysis including:
|
|
- Time-based analysis
|
|
- MTBF (Mean Time Between Failures)
|
|
- Alarm correlation analysis
|
|
- Severity analysis
|
|
"""
|
|
print("\n--- ADVANCED ANALYSIS ---")
|
|
|
|
if self.processed_events is None or len(self.processed_events) == 0:
|
|
print("No processed events available for analysis. Run pair_events_and_calculate_durations first.")
|
|
return
|
|
|
|
# Filter resolved events for time-based analysis
|
|
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) == 0:
|
|
print("No resolved events with duration data available for advanced analysis.")
|
|
return
|
|
|
|
print("\n1. TIME-BASED ANALYSIS")
|
|
|
|
# Extract time components
|
|
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
|
|
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
|
|
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
|
|
|
|
# Alarm frequency by hour of day
|
|
hourly_freq = duration_events.groupby('Start_Hour').size()
|
|
print(f"\nAlarm frequency by hour of day (Top 5):")
|
|
print(hourly_freq.sort_values(ascending=False).head())
|
|
|
|
# Alarm frequency by day of week
|
|
daily_freq = duration_events.groupby('Start_DayOfWeek').size()
|
|
print(f"\nAlarm frequency by day of week:")
|
|
print(daily_freq)
|
|
|
|
# Alarm frequency by date
|
|
daily_count = duration_events.groupby('Start_Date').size()
|
|
print(f"\nTotal alarms per day (last 5 days):")
|
|
print(daily_count.tail())
|
|
|
|
print("\n2. MTBF (MEAN TIME BETWEEN FAILURES) ANALYSIS")
|
|
|
|
# Calculate MTBF for each sensor
|
|
# MTBF = Total operational time / Number of failures
|
|
# For each sensor, we'll calculate the time between consecutive alarm starts
|
|
|
|
mtbf_data = []
|
|
for sensor_id in duration_events['Sensor_Id'].unique():
|
|
sensor_events = duration_events[duration_events['Sensor_Id'] == sensor_id].sort_values('Start_Time')
|
|
|
|
if len(sensor_events) > 1:
|
|
# Calculate time between consecutive alarm starts
|
|
sensor_events = sensor_events.copy()
|
|
sensor_events['Time_Between_Alerts'] = sensor_events['Start_Time'].diff().dt.total_seconds() / 3600 # in hours
|
|
|
|
# Calculate MTBF (mean time between consecutive alarms)
|
|
mtbf_hours = sensor_events['Time_Between_Alerts'].mean()
|
|
|
|
# Calculate total operational time (from first to last alarm)
|
|
total_op_time = (sensor_events['Start_Time'].max() - sensor_events['Start_Time'].min()).total_seconds() / 3600
|
|
num_alarms = len(sensor_events)
|
|
|
|
# MTBF = total operational time / number of alarms
|
|
mtbf_by_total_time = total_op_time / num_alarms if num_alarms > 0 else 0
|
|
|
|
mtbf_data.append({
|
|
'Sensor_Id': sensor_id,
|
|
'MTBF_Hours_By_Consecutive': mtbf_hours,
|
|
'MTBF_Hours_By_Total_Time': mtbf_by_total_time,
|
|
'Total_Alerts': num_alarms,
|
|
'Total_Op_Time_Hours': total_op_time
|
|
})
|
|
|
|
mtbf_df = pd.DataFrame(mtbf_data).sort_values('MTBF_Hours_By_Total_Time', ascending=False)
|
|
print("\nTop 10 sensors by MTBF (Mean Time Between Failures):")
|
|
print(mtbf_df.head(10))
|
|
|
|
print("\n3. ALARM CORRELATION ANALYSIS")
|
|
|
|
# Find sensors that frequently alarm together (within a time window)
|
|
# Group events by time windows (e.g., 1 hour) and see which sensors alarm together
|
|
duration_events['Time_Window'] = duration_events['Start_Time'].dt.floor('H')
|
|
time_window_groups = duration_events.groupby('Time_Window')['Sensor_Id'].apply(list).reset_index()
|
|
|
|
# Count how many times each pair of sensors alarms together
|
|
correlation_data = []
|
|
for _, row in time_window_groups.iterrows():
|
|
sensors = row['Sensor_Id']
|
|
if len(sensors) > 1:
|
|
# Get all pairs of sensors that alarmed in this time window
|
|
for i in range(len(sensors)):
|
|
for j in range(i+1, len(sensors)):
|
|
sensor1, sensor2 = sensors[i], sensors[j]
|
|
correlation_data.append({
|
|
'Sensor1': sensor1,
|
|
'Sensor2': sensor2,
|
|
'Time_Window': row['Time_Window']
|
|
})
|
|
|
|
if correlation_data:
|
|
correlation_df = pd.DataFrame(correlation_data)
|
|
correlation_counts = correlation_df.groupby(['Sensor1', 'Sensor2']).size().reset_index(name='Count')
|
|
correlation_counts = correlation_counts.sort_values('Count', ascending=False)
|
|
|
|
print("\nTop 10 sensor pairs that alarm together frequently:")
|
|
print(correlation_counts.head(10))
|
|
else:
|
|
print("\nNo correlated alarms found in the same time windows.")
|
|
|
|
print("\n4. SEVERITY ANALYSIS")
|
|
|
|
# Weighted scoring based on alarm type and duration
|
|
# Error: weight 3, Alarm: weight 2, Warning: weight 1
|
|
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
|
|
duration_events['Severity_Score'] = duration_events.apply(
|
|
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
|
|
)
|
|
|
|
# Total severity by sensor
|
|
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].agg([
|
|
'sum', 'mean', 'count'
|
|
]).round(2).reset_index()
|
|
severity_by_sensor = severity_by_sensor.sort_values('sum', ascending=False)
|
|
|
|
print("\nTop 10 sensors by total severity score:")
|
|
print(severity_by_sensor.head(10))
|
|
|
|
print("\n5. ALARM ESCALATION ANALYSIS")
|
|
|
|
# Count how many warnings escalate to alarms for each sensor
|
|
# For this analysis, we'll look for cases where a warning is followed by an alarm for the same sensor
|
|
escalation_data = []
|
|
|
|
# Group by sensor and sort by time
|
|
for sensor_id in self.alarm_data['Sensor_Id'].unique():
|
|
sensor_alarms = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].sort_values('Date')
|
|
|
|
for i in range(len(sensor_alarms) - 1):
|
|
current = sensor_alarms.iloc[i]
|
|
next_event = sensor_alarms.iloc[i + 1]
|
|
|
|
# Check if current is warning and next is alarm (not Normal)
|
|
if current['AlarmType'] == 'Warning' and next_event['AlarmType'] in ['Alarm', 'Error']:
|
|
time_diff = (next_event['Date'] - current['Date']).total_seconds() / 60 # in minutes
|
|
|
|
if time_diff <= 60: # Within 1 hour
|
|
# Get sensor group information
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
|
|
escalation_data.append({
|
|
'Sensor_Id': sensor_id,
|
|
'Sensor_Group': sensor_group,
|
|
'Warning_Time': current['Date'],
|
|
'Escalation_Type': next_event['AlarmType'],
|
|
'Time_To_Escalation_Minutes': time_diff
|
|
})
|
|
|
|
if escalation_data:
|
|
escalation_df = pd.DataFrame(escalation_data)
|
|
escalation_counts = escalation_df.groupby('Sensor_Id').size().reset_index(name='Escalation_Count')
|
|
escalation_counts = escalation_counts.sort_values('Escalation_Count', ascending=False)
|
|
|
|
print(f"\nTotal escalations found: {len(escalation_df)}")
|
|
print("Top 10 sensors with most escalations (Warning -> Alarm/Error):")
|
|
print(escalation_counts.head(10))
|
|
|
|
# Group-based escalation analysis
|
|
if 'Sensor_Group' in escalation_df.columns:
|
|
escalation_by_group = escalation_df.groupby('Sensor_Group').size().reset_index(name='Escalation_Count')
|
|
escalation_by_group = escalation_by_group.sort_values('Escalation_Count', ascending=False)
|
|
|
|
print("\nTop 10 groups with most escalations (Warning -> Alarm/Error):")
|
|
print(escalation_by_group.head(10))
|
|
else:
|
|
print("\nNo alarm escalations found in the data.")
|
|
|
|
print("\n6. GROUP-BASED ANALYSIS")
|
|
|
|
# Group-based statistics if sensor groups are available
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
print("\n--- GROUP-BASED ANALYSIS ---")
|
|
|
|
# Count alarms by group
|
|
group_counts = duration_events.groupby('Sensor_Group').size().reset_index(name='Alarm_Count')
|
|
group_counts = group_counts.sort_values('Alarm_Count', ascending=False)
|
|
print("\nTop 10 groups by alarm count:")
|
|
print(group_counts.head(10))
|
|
|
|
# Calculate MTBF by group
|
|
mtbf_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([
|
|
'count', 'mean', 'min', 'max'
|
|
]).round(2).sort_values('count', ascending=False)
|
|
print("\nMTBF statistics by group (top 10 by count):")
|
|
print(mtbf_by_group.head(10))
|
|
|
|
# Severity by group
|
|
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].agg([
|
|
'sum', 'mean', 'count'
|
|
]).round(2).sort_values('sum', ascending=False)
|
|
print("\nTop 10 groups by total severity score:")
|
|
print(severity_by_group.head(10))
|
|
|
|
# Alarm types by group
|
|
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().reset_index(name='Count')
|
|
alarm_type_by_group = alarm_type_by_group.sort_values(['Sensor_Group', 'Count'], ascending=[True, False])
|
|
print("\nTop alarm types by group:")
|
|
print(alarm_type_by_group.groupby('Sensor_Group').head(3))
|
|
|
|
return {
|
|
'hourly_frequency': hourly_freq,
|
|
'daily_frequency': daily_freq,
|
|
'mtbf_data': mtbf_df,
|
|
'correlation_data': correlation_counts if correlation_data else pd.DataFrame(),
|
|
'severity_analysis': severity_by_sensor,
|
|
'escalation_analysis': escalation_df if escalation_data else pd.DataFrame(),
|
|
'group_analysis': {
|
|
'group_counts': group_counts if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
|
|
'mtbf_by_group': mtbf_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
|
|
'severity_by_group': severity_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(),
|
|
'alarm_type_by_group': alarm_type_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame()
|
|
} if 'Sensor_Group' in duration_events.columns else {}
|
|
}
|
|
|
|
def create_visualizations(self, save_plots=False, output_dir="plots"):
|
|
"""
|
|
Create visualizations for the alarm analysis
|
|
"""
|
|
print("\n--- CREATING VISUALIZATIONS ---")
|
|
|
|
# Import visualization libraries only when needed
|
|
try:
|
|
plt, sns = _import_viz_libs()
|
|
except ImportError:
|
|
print("Matplotlib or seaborn not available. Skipping visualizations.")
|
|
return
|
|
|
|
if self.processed_events is None or len(self.processed_events) == 0:
|
|
print("No processed events available for visualization. Run pair_events_and_calculate_durations first.")
|
|
return
|
|
|
|
# Create output directory if needed
|
|
if save_plots:
|
|
import os
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Filter resolved events for visualization
|
|
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) == 0:
|
|
print("No resolved events with duration data available for visualization.")
|
|
return
|
|
|
|
# Extract time components for time-based analysis
|
|
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
|
|
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
|
|
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
|
|
|
|
# Set up the plotting style
|
|
plt.style.use('default')
|
|
sns.set_palette("husl")
|
|
|
|
# 1. Alarm count by type
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
|
|
fig.suptitle('Alarm Analysis Dashboard', fontsize=16, fontweight='bold')
|
|
|
|
# Alarm count by type
|
|
alarm_type_counts = duration_events['Alarm_Type'].value_counts()
|
|
axes[0, 0].bar(alarm_type_counts.index, alarm_type_counts.values)
|
|
axes[0, 0].set_title('Alarm Count by Type')
|
|
axes[0, 0].set_ylabel('Count')
|
|
for i, v in enumerate(alarm_type_counts.values):
|
|
axes[0, 0].text(i, v + v*0.01, str(v), ha='center', va='bottom')
|
|
|
|
# Top 10 sensors by alarm count - with sensor names instead of IDs
|
|
top_sensors = duration_events['Sensor_Id'].value_counts().head(10)
|
|
sensor_names_for_plot = []
|
|
for sensor_id in top_sensors.index:
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot.append(f"{sensor_name}\n({sensor_group})")
|
|
|
|
axes[0, 1].bar(range(len(top_sensors)), top_sensors.values)
|
|
axes[0, 1].set_title('Top 10 Sensors by Alarm Count')
|
|
axes[0, 1].set_ylabel('Count')
|
|
axes[0, 1].set_xticks(range(len(top_sensors)))
|
|
axes[0, 1].set_xticklabels(sensor_names_for_plot, rotation=45)
|
|
for i, v in enumerate(top_sensors.values):
|
|
axes[0, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom')
|
|
|
|
# Alarm frequency by hour of day
|
|
hourly_freq = duration_events.groupby('Start_Hour').size()
|
|
axes[1, 0].plot(hourly_freq.index, hourly_freq.values, marker='o')
|
|
axes[1, 0].set_title('Alarm Frequency by Hour of Day')
|
|
axes[1, 0].set_xlabel('Hour of Day')
|
|
axes[1, 0].set_ylabel('Number of Alarms')
|
|
axes[1, 0].grid(True, alpha=0.3)
|
|
|
|
# Alarm frequency by day of week
|
|
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0)
|
|
axes[1, 1].bar(range(len(daily_freq)), daily_freq.values)
|
|
axes[1, 1].set_title('Alarm Frequency by Day of Week')
|
|
axes[1, 1].set_ylabel('Number of Alarms')
|
|
axes[1, 1].set_xticks(range(len(daily_freq)))
|
|
axes[1, 1].set_xticklabels([d[:3] for d in daily_freq.index], rotation=45)
|
|
for i, v in enumerate(daily_freq.values):
|
|
axes[1, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom')
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/alarm_dashboard.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# 2. Duration analysis by alarm type
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
# Box plot of durations by alarm type
|
|
plt.subplot(1, 2, 1)
|
|
sns.boxplot(data=duration_events, x='Alarm_Type', y='Duration_Minutes')
|
|
plt.title('Distribution of Alarm Durations by Type')
|
|
plt.xlabel('Alarm Type')
|
|
plt.ylabel('Duration (Minutes)')
|
|
plt.yscale('log') # Log scale to better visualize the wide range of durations
|
|
|
|
# Histogram of durations by alarm type
|
|
plt.subplot(1, 2, 2)
|
|
for alarm_type in duration_events['Alarm_Type'].unique():
|
|
subset = duration_events[duration_events['Alarm_Type'] == alarm_type]
|
|
plt.hist(subset['Duration_Minutes'], alpha=0.6, label=alarm_type, bins=30)
|
|
plt.title('Distribution of Alarm Durations by Type')
|
|
plt.xlabel('Duration (Minutes)')
|
|
plt.ylabel('Frequency')
|
|
plt.legend()
|
|
plt.yscale('log') # Log scale to better visualize the wide range of durations
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/duration_analysis.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# 3. Top sensors by various metrics
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
|
|
fig.suptitle('Top Sensors Analysis', fontsize=16, fontweight='bold')
|
|
|
|
# Top 10 sensors by total alarms - with sensor names instead of IDs
|
|
top_sensors_by_count = duration_events['Sensor_Id'].value_counts().head(10)
|
|
sensor_names_for_plot = []
|
|
for sensor_id in top_sensors_by_count.index:
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot.append(f"{sensor_name} (Group: {sensor_group})")
|
|
|
|
axes[0, 0].barh(range(len(top_sensors_by_count)), top_sensors_by_count.values)
|
|
axes[0, 0].set_title('Top 10 Sensors by Total Alarm Count')
|
|
axes[0, 0].set_xlabel('Number of Alarms')
|
|
axes[0, 0].set_yticks(range(len(top_sensors_by_count)))
|
|
axes[0, 0].set_yticklabels(sensor_names_for_plot)
|
|
|
|
# Top 10 sensors by average duration - with sensor names instead of IDs
|
|
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
|
|
sensor_names_for_plot_avg = []
|
|
for sensor_id in avg_duration_by_sensor.index:
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot_avg.append(f"{sensor_name} (Group: {sensor_group})")
|
|
|
|
axes[0, 1].barh(range(len(avg_duration_by_sensor)), avg_duration_by_sensor.values)
|
|
axes[0, 1].set_title('Top 10 Sensors by Average Duration')
|
|
axes[0, 1].set_xlabel('Average Duration (Minutes)')
|
|
axes[0, 1].set_yticks(range(len(avg_duration_by_sensor)))
|
|
axes[0, 1].set_yticklabels(sensor_names_for_plot_avg)
|
|
|
|
# Top 10 sensors by max duration - with sensor names instead of IDs
|
|
max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).head(10)
|
|
sensor_names_for_plot_max = []
|
|
for sensor_id in max_duration_by_sensor.index:
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot_max.append(f"{sensor_name} (Group: {sensor_group})")
|
|
|
|
axes[1, 0].barh(range(len(max_duration_by_sensor)), max_duration_by_sensor.values)
|
|
axes[1, 0].set_title('Top 10 Sensors by Maximum Duration')
|
|
axes[1, 0].set_xlabel('Maximum Duration (Minutes)')
|
|
axes[1, 0].set_yticks(range(len(max_duration_by_sensor)))
|
|
axes[1, 0].set_yticklabels(sensor_names_for_plot_max)
|
|
|
|
# Top 10 sensors by total severity score - with sensor names instead of IDs
|
|
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
|
|
duration_events['Severity_Score'] = duration_events.apply(
|
|
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
|
|
)
|
|
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).head(10)
|
|
sensor_names_for_plot_severity = []
|
|
for sensor_id in severity_by_sensor.index:
|
|
sensor_info = self.sensor_mapping.get(sensor_id, {})
|
|
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
|
|
sensor_group = sensor_info.get('group', 'Unknown')
|
|
sensor_names_for_plot_severity.append(f"{sensor_name} (Group: {sensor_group})")
|
|
|
|
axes[1, 1].barh(range(len(severity_by_sensor)), severity_by_sensor.values)
|
|
axes[1, 1].set_title('Top 10 Sensors by Total Severity Score')
|
|
axes[1, 1].set_xlabel('Total Severity Score')
|
|
axes[1, 1].set_yticks(range(len(severity_by_sensor)))
|
|
axes[1, 1].set_yticklabels(sensor_names_for_plot_severity)
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/sensor_analysis.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# 4. Group-based visualizations if sensor groups are available
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
print("\nCreating group-based visualizations...")
|
|
|
|
# First group-based visualization - Dashboard with 4 plots
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
|
|
fig.suptitle('Group-Based Analysis Dashboard', fontsize=16, fontweight='bold')
|
|
|
|
# Top 10 groups by alarm count
|
|
top_groups_by_count = duration_events['Sensor_Group'].value_counts().head(10)
|
|
axes[0, 0].barh(range(len(top_groups_by_count)), top_groups_by_count.values)
|
|
axes[0, 0].set_title('Top 10 Groups by Total Alarm Count')
|
|
axes[0, 0].set_xlabel('Number of Alarms')
|
|
axes[0, 0].set_yticks(range(len(top_groups_by_count)))
|
|
axes[0, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in top_groups_by_count.index])
|
|
|
|
# Top 10 groups by average duration
|
|
avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
|
|
axes[0, 1].barh(range(len(avg_duration_by_group)), avg_duration_by_group.values)
|
|
axes[0, 1].set_title('Top 10 Groups by Average Duration')
|
|
axes[0, 1].set_xlabel('Average Duration (Minutes)')
|
|
axes[0, 1].set_yticks(range(len(avg_duration_by_group)))
|
|
axes[0, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in avg_duration_by_group.index])
|
|
|
|
# Top 10 groups by max duration
|
|
max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).head(10)
|
|
axes[1, 0].barh(range(len(max_duration_by_group)), max_duration_by_group.values)
|
|
axes[1, 0].set_title('Top 10 Groups by Maximum Duration')
|
|
axes[1, 0].set_xlabel('Maximum Duration (Minutes)')
|
|
axes[1, 0].set_yticks(range(len(max_duration_by_group)))
|
|
axes[1, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in max_duration_by_group.index])
|
|
|
|
# Top 10 groups by total severity score
|
|
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
|
|
duration_events['Severity_Score'] = duration_events.apply(
|
|
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
|
|
)
|
|
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).head(10)
|
|
axes[1, 1].barh(range(len(severity_by_group)), severity_by_group.values)
|
|
axes[1, 1].set_title('Top 10 Groups by Total Severity Score')
|
|
axes[1, 1].set_xlabel('Total Severity Score')
|
|
axes[1, 1].set_yticks(range(len(severity_by_group)))
|
|
axes[1, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in severity_by_group.index])
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/group_analysis.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# Additional group-based visualizations
|
|
print("Creating additional group-based visualizations...")
|
|
|
|
# Group composition analysis - showing number of sensors per group
|
|
if self.sensor_mapping:
|
|
# Create a mapping of group to number of sensors
|
|
group_to_sensor_count = {}
|
|
for sensor_id, sensor_info in self.sensor_mapping.items():
|
|
group = sensor_info.get('group', 'Unknown')
|
|
if group not in group_to_sensor_count:
|
|
group_to_sensor_count[group] = 0
|
|
group_to_sensor_count[group] += 1
|
|
|
|
# Convert to dataframe and sort
|
|
group_sensor_counts = pd.DataFrame(
|
|
list(group_to_sensor_count.items()),
|
|
columns=['Group', 'Sensor_Count']
|
|
).sort_values('Sensor_Count', ascending=False).head(15)
|
|
|
|
# Plot group composition
|
|
plt.figure(figsize=(14, 8))
|
|
plt.barh(range(len(group_sensor_counts)), group_sensor_counts['Sensor_Count'])
|
|
plt.title('Sensor Count by Group (Top 15 Groups)')
|
|
plt.xlabel('Number of Sensors in Group')
|
|
plt.ylabel('Group')
|
|
plt.yticks(range(len(group_sensor_counts)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in group_sensor_counts['Group']])
|
|
|
|
for i, v in enumerate(group_sensor_counts['Sensor_Count']):
|
|
plt.text(v + v*0.01, i, str(v), va='center')
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/group_composition.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# Alarm type distribution by group (stacked bar chart)
|
|
if len(duration_events) > 0:
|
|
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
|
|
top_10_groups = duration_events['Sensor_Group'].value_counts().head(10).index
|
|
alarm_type_by_group_top = alarm_type_by_group.loc[top_10_groups]
|
|
|
|
# Create stacked bar chart
|
|
ax = alarm_type_by_group_top.plot(kind='barh', stacked=True, figsize=(14, 8))
|
|
plt.title('Alarm Type Distribution by Group (Top 10 Groups)')
|
|
plt.xlabel('Number of Alarms')
|
|
plt.ylabel('Group')
|
|
plt.legend(title='Alarm Type', bbox_to_anchor=(1.05, 1), loc='upper left')
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/alarm_type_by_group.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
# Group alarm intensity: alarms per sensor in each group
|
|
if self.sensor_mapping:
|
|
# Calculate alarms per sensor for each group
|
|
alarms_per_sensor_by_group = duration_events.groupby('Sensor_Group')['Sensor_Id'].nunique().to_dict()
|
|
|
|
# Calculate total sensors per group from mapping
|
|
group_to_sensor_count = {}
|
|
for sensor_id, sensor_info in self.sensor_mapping.items():
|
|
group = sensor_info.get('group', 'Unknown')
|
|
if group not in group_to_sensor_count:
|
|
group_to_sensor_count[group] = 0
|
|
group_to_sensor_count[group] += 1
|
|
|
|
# Calculate alarms per sensor ratio
|
|
group_alarm_intensity = {}
|
|
for group in set(duration_events['Sensor_Group'].unique()):
|
|
total_alarms = len(duration_events[duration_events['Sensor_Group'] == group])
|
|
total_sensors = group_to_sensor_count.get(group, 1) # Avoid division by zero
|
|
group_alarm_intensity[group] = total_alarms / total_sensors
|
|
|
|
# Convert to DataFrame and sort
|
|
intensity_df = pd.DataFrame(
|
|
list(group_alarm_intensity.items()),
|
|
columns=['Group', 'Alarms_Per_Sensor']
|
|
).sort_values('Alarms_Per_Sensor', ascending=False).head(15)
|
|
|
|
# Plot alarm intensity
|
|
plt.figure(figsize=(14, 8))
|
|
plt.barh(range(len(intensity_df)), intensity_df['Alarms_Per_Sensor'])
|
|
plt.title('Alarm Intensity: Alarms per Sensor by Group (Top 15 Groups)')
|
|
plt.xlabel('Average Alarms per Sensor')
|
|
plt.ylabel('Group')
|
|
plt.yticks(range(len(intensity_df)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in intensity_df['Group']])
|
|
|
|
for i, v in enumerate(intensity_df['Alarms_Per_Sensor']):
|
|
plt.text(v + v*0.01, i, f"{v:.2f}", va='center')
|
|
|
|
plt.tight_layout()
|
|
if save_plots:
|
|
plt.savefig(f"{output_dir}/group_alarm_intensity.png", dpi=300, bbox_inches='tight')
|
|
plt.show()
|
|
|
|
print("Visualizations created successfully!")
|
|
if save_plots:
|
|
print(f"Plots saved to '{output_dir}' directory.")
|
|
|
|
def calculate_uptime_metrics(self):
|
|
"""
|
|
Calculate uptime/downtime metrics based on two approaches:
|
|
1. Error duration as downtime (communication errors)
|
|
2. Alarm/Warning duration as downtime (operational issues)
|
|
"""
|
|
print("\n--- CALCULATING UPTIME/DOWNTIME METRICS ---")
|
|
|
|
if self.processed_events is None or len(self.processed_events) == 0:
|
|
print("No processed events available for uptime calculation. Run pair_events_and_calculate_durations first.")
|
|
return None
|
|
|
|
# Filter out unresolved events for duration analysis
|
|
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) == 0:
|
|
print("No resolved events with duration data available for uptime calculation.")
|
|
return None
|
|
|
|
# Calculate total time span from the alarm data (not just processed events)
|
|
total_start_time = self.alarm_data['Date'].min()
|
|
total_end_time = self.alarm_data['Date'].max()
|
|
total_time_span_minutes = (total_end_time - total_start_time).total_seconds() / 60.0
|
|
|
|
print(f"Total time period: {total_start_time} to {total_end_time}")
|
|
print(f"Total time span: {total_time_span_minutes:.2f} minutes ({total_time_span_minutes/60:.2f} hours)")
|
|
|
|
# 1. Calculate error-based downtime (communication errors)
|
|
error_events = duration_events[duration_events['Alarm_Type'] == 'Error'].copy()
|
|
total_error_duration = error_events['Duration_Minutes'].sum()
|
|
|
|
# Calculate error-based system downtime percentage (for communication errors)
|
|
# This represents the total time spent in error state across all sensors
|
|
error_downtime_percentage = (total_error_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
|
|
|
|
print(f"\nError-based downtime (communication errors):")
|
|
print(f" Total error duration across all sensors: {total_error_duration:.2f} minutes")
|
|
print(f" Error downtime percentage (cumulative): {error_downtime_percentage:.4f}%")
|
|
print(f" Error uptime percentage (communication): {100 - error_downtime_percentage:.4f}%")
|
|
|
|
# 2. Calculate alarm/warning-based downtime (operational issues)
|
|
alarm_warning_events = duration_events[duration_events['Alarm_Type'].isin(['Alarm', 'Warning'])].copy()
|
|
total_alarm_warning_duration = alarm_warning_events['Duration_Minutes'].sum()
|
|
|
|
# Calculate alarm/warning-based system downtime percentage
|
|
alarm_warning_downtime_percentage = (total_alarm_warning_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
|
|
|
|
print(f"\nAlarm/Warning-based downtime (operational issues):")
|
|
print(f" Total alarm/warning duration across all sensors: {total_alarm_warning_duration:.2f} minutes")
|
|
print(f" Alarm/Warning downtime percentage (cumulative): {alarm_warning_downtime_percentage:.4f}%")
|
|
print(f" Alarm/Warning uptime percentage (operational): {100 - alarm_warning_downtime_percentage:.4f}%")
|
|
|
|
# 3. Combined downtime (Error + Alarm + Warning)
|
|
total_operational_duration = total_error_duration + total_alarm_warning_duration
|
|
combined_downtime_percentage = (total_operational_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0
|
|
|
|
print(f"\nCombined system downtime:")
|
|
print(f" Total combined duration across all sensors: {total_operational_duration:.2f} minutes")
|
|
print(f" Combined downtime percentage (cumulative): {combined_downtime_percentage:.4f}%")
|
|
print(f" Combined uptime percentage: {100 - combined_downtime_percentage:.4f}%")
|
|
|
|
# Calculate more meaningful system-level uptime metrics
|
|
# For this, we'll calculate the percentage of sensors in error/alarm state over time
|
|
print(f"\n--- ADDITIONAL SYSTEM-LEVEL UPTIME METRICS ---")
|
|
|
|
# Calculate time-bucketed system uptime (in 1-hour intervals)
|
|
try:
|
|
# Create time buckets and determine if any sensor was in error/alarm state in each bucket
|
|
all_events = self.alarm_data.copy()
|
|
all_events = all_events.sort_values('Date')
|
|
|
|
# Create time buckets (1 hour each)
|
|
time_buckets = pd.date_range(
|
|
start=total_start_time.floor('H'),
|
|
end=total_end_time.ceil('H'),
|
|
freq='H'
|
|
)
|
|
|
|
# For each time bucket, calculate if there were any errors or alarm/warnings active
|
|
error_buckets = []
|
|
alarm_warning_buckets = []
|
|
|
|
for i in range(len(time_buckets)-1):
|
|
bucket_start = time_buckets[i]
|
|
bucket_end = time_buckets[i+1]
|
|
|
|
# Find events that overlap with this time bucket
|
|
bucket_events = all_events[
|
|
(all_events['Date'] >= bucket_start) &
|
|
(all_events['Date'] < bucket_end)
|
|
]
|
|
|
|
# Count how many error and alarm/warning events occurred in this bucket
|
|
error_count = len(bucket_events[bucket_events['AlarmType'] == 'Error'])
|
|
alarm_warning_count = len(bucket_events[bucket_events['AlarmType'].isin(['Alarm', 'Warning'])])
|
|
|
|
error_buckets.append(1 if error_count > 0 else 0)
|
|
alarm_warning_buckets.append(1 if alarm_warning_count > 0 else 0)
|
|
|
|
# Calculate percentage of time buckets with errors or alarm/warnings
|
|
if len(error_buckets) > 0:
|
|
system_error_uptime_percentage = 100 - (sum(error_buckets) / len(error_buckets) * 100)
|
|
else:
|
|
system_error_uptime_percentage = 100.0
|
|
|
|
if len(alarm_warning_buckets) > 0:
|
|
system_alarm_warning_uptime_percentage = 100 - (sum(alarm_warning_buckets) / len(alarm_warning_buckets) * 100)
|
|
else:
|
|
system_alarm_warning_uptime_percentage = 100.0
|
|
|
|
print(f"System-level error uptime (time-based): {system_error_uptime_percentage:.4f}%")
|
|
print(f"System-level alarm/warning uptime (time-based): {system_alarm_warning_uptime_percentage:.4f}%")
|
|
|
|
except Exception as e:
|
|
print(f"Could not calculate time-based system uptime metrics: {e}")
|
|
system_error_uptime_percentage = None
|
|
system_alarm_warning_uptime_percentage = None
|
|
|
|
# For more meaningful individual sensor uptime, calculate based on the total monitoring time for each sensor
|
|
# Calculate total monitoring time per sensor based on first and last alarm occurrence
|
|
sensor_monitoring_time = self.alarm_data.groupby('Sensor_Id').agg({
|
|
'Date': ['min', 'max']
|
|
})
|
|
sensor_monitoring_time.columns = ['First_Alarm_Time', 'Last_Alarm_Time']
|
|
sensor_monitoring_time['Total_Monitoring_Minutes'] = (
|
|
(sensor_monitoring_time['Last_Alarm_Time'] - sensor_monitoring_time['First_Alarm_Time']).dt.total_seconds() / 60.0
|
|
)
|
|
# Ensure minimum monitoring time (at least the time between first and last alarm for that sensor)
|
|
sensor_monitoring_time['Total_Monitoring_Minutes'] = sensor_monitoring_time['Total_Monitoring_Minutes'].apply(
|
|
lambda x: max(x, total_time_span_minutes / len(self.alarm_data['Sensor_Id'].unique())) # fallback to avg if needed
|
|
)
|
|
|
|
# Calculate per-sensor metrics with proper uptime percentages
|
|
print(f"\n--- PER-SENSOR UPTIME/DOWNTIME METRICS ---")
|
|
|
|
# Error-based per-sensor metrics
|
|
error_by_sensor = error_events.groupby('Sensor_Id').agg({
|
|
'Duration_Minutes': ['sum', 'count', 'mean'],
|
|
'Start_Time': ['min', 'max']
|
|
}).round(2)
|
|
error_by_sensor.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'First_Error_Time', 'Last_Error_Time']
|
|
|
|
# Get all unique sensor IDs from the alarm data
|
|
all_sensors = set(self.alarm_data['Sensor_Id'].unique())
|
|
|
|
# Calculate downtime percentage based on the total time span across ALL data
|
|
# Create a complete dataframe with all sensors, including those with 0 errors
|
|
all_sensors_df = pd.DataFrame(index=list(all_sensors))
|
|
error_by_sensor_complete = all_sensors_df.join(error_by_sensor, how='left')
|
|
|
|
# Fill NaN values with 0 for sensors with no errors
|
|
error_by_sensor_complete = error_by_sensor_complete.fillna({
|
|
'Total_Error_Duration': 0,
|
|
'Error_Count': 0,
|
|
'Avg_Error_Duration': 0,
|
|
'First_Error_Time': pd.NaT,
|
|
'Last_Error_Time': pd.NaT
|
|
})
|
|
|
|
# Calculate downtime percentage using the total time span for all data
|
|
error_by_sensor_complete['Error_Downtime_Percentage'] = (
|
|
(error_by_sensor_complete['Total_Error_Duration'] / total_time_span_minutes) * 100
|
|
).round(4)
|
|
|
|
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
|
|
error_by_sensor_complete['Error_Downtime_Percentage'] = error_by_sensor_complete['Error_Downtime_Percentage'].apply(
|
|
lambda x: min(100.0, x)
|
|
)
|
|
|
|
error_by_sensor_complete['Error_Uptime_Percentage'] = (
|
|
100 - error_by_sensor_complete['Error_Downtime_Percentage']
|
|
).round(4)
|
|
|
|
# Ensure uptime doesn't go below 0
|
|
error_by_sensor_complete['Error_Uptime_Percentage'] = error_by_sensor_complete['Error_Uptime_Percentage'].apply(
|
|
lambda x: max(0, round(x, 4))
|
|
)
|
|
|
|
# Update error_by_sensor with the complete data
|
|
error_by_sensor = error_by_sensor_complete
|
|
|
|
# Alarm/Warning-based per-sensor metrics
|
|
alarm_warning_by_sensor_raw = alarm_warning_events.groupby('Sensor_Id').agg({
|
|
'Duration_Minutes': ['sum', 'count', 'mean'],
|
|
'Start_Time': ['min', 'max']
|
|
}).round(2)
|
|
alarm_warning_by_sensor_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'First_Alarm_Warning_Time', 'Last_Alarm_Warning_Time']
|
|
|
|
# Create a complete dataframe with all sensors, including those with 0 alarm/warnings
|
|
alarm_warning_by_sensor = all_sensors_df.join(alarm_warning_by_sensor_raw, how='left')
|
|
|
|
# Fill NaN values with 0 for sensors with no alarm/warnings
|
|
alarm_warning_by_sensor = alarm_warning_by_sensor.fillna({
|
|
'Total_Alarm_Warning_Duration': 0,
|
|
'Alarm_Warning_Count': 0,
|
|
'Avg_Alarm_Warning_Duration': 0,
|
|
'First_Alarm_Warning_Time': pd.NaT,
|
|
'Last_Alarm_Warning_Time': pd.NaT
|
|
})
|
|
|
|
# Calculate downtime percentage based on the total time span across ALL data
|
|
# Calculate downtime percentage using the total time span for all data
|
|
alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = (
|
|
(alarm_warning_by_sensor['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100
|
|
).round(4)
|
|
|
|
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
|
|
alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'].apply(
|
|
lambda x: min(100.0, x)
|
|
)
|
|
|
|
alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = (
|
|
100 - alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage']
|
|
).round(4)
|
|
|
|
# Ensure uptime doesn't go below 0
|
|
alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'].apply(
|
|
lambda x: max(0, round(x, 4))
|
|
)
|
|
|
|
# Add sensor names and groups to per-sensor metrics
|
|
if self.sensor_mapping:
|
|
for df in [error_by_sensor, alarm_warning_by_sensor]:
|
|
if len(df) > 0:
|
|
df['Sensor_Name'] = df.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
df['Sensor_Group'] = df.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in df.columns if col not in ['Sensor_Name', 'Sensor_Group']]
|
|
df = df[cols]
|
|
|
|
# 5. Per-group metrics
|
|
print(f"\n--- PER-GROUP UPTIME/DOWNTIME METRICS ---")
|
|
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
# Get all unique sensor groups from the alarm data
|
|
all_groups = set(self.alarm_data['Sensor_Group'].unique())
|
|
|
|
# Error-based per-group metrics
|
|
error_by_group_raw = error_events.groupby('Sensor_Group').agg({
|
|
'Duration_Minutes': ['sum', 'count', 'mean'],
|
|
'Sensor_Id': 'nunique'
|
|
}).round(2)
|
|
error_by_group_raw.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'Unique_Sensors_With_Errors']
|
|
|
|
# Create a complete dataframe with all groups, including those with 0 errors
|
|
all_groups_df = pd.DataFrame(index=list(all_groups))
|
|
error_by_group = all_groups_df.join(error_by_group_raw, how='left')
|
|
|
|
# Fill NaN values with 0 for groups with no errors
|
|
error_by_group = error_by_group.fillna({
|
|
'Total_Error_Duration': 0,
|
|
'Error_Count': 0,
|
|
'Avg_Error_Duration': 0,
|
|
'Unique_Sensors_With_Errors': 0
|
|
})
|
|
|
|
# Calculate downtime percentage based on the total time span across ALL groups
|
|
error_by_group['Error_Downtime_Percentage'] = (
|
|
(error_by_group['Total_Error_Duration'] / total_time_span_minutes) * 100
|
|
).round(4)
|
|
|
|
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
|
|
error_by_group['Error_Downtime_Percentage'] = error_by_group['Error_Downtime_Percentage'].apply(
|
|
lambda x: min(100.0, x)
|
|
)
|
|
|
|
error_by_group['Error_Uptime_Percentage'] = (
|
|
100 - error_by_group['Error_Downtime_Percentage']
|
|
).round(4)
|
|
|
|
# Ensure uptime doesn't go below 0
|
|
error_by_group['Error_Uptime_Percentage'] = error_by_group['Error_Uptime_Percentage'].apply(
|
|
lambda x: max(0, round(x, 4))
|
|
)
|
|
|
|
# Alarm/Warning-based per-group metrics
|
|
alarm_warning_by_group_raw = alarm_warning_events.groupby('Sensor_Group').agg({
|
|
'Duration_Minutes': ['sum', 'count', 'mean'],
|
|
'Sensor_Id': 'nunique'
|
|
}).round(2)
|
|
alarm_warning_by_group_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'Unique_Sensors_With_Alarm_Warning']
|
|
|
|
# Create a complete dataframe with all groups, including those with no alarm/warnings
|
|
alarm_warning_by_group = all_groups_df.join(alarm_warning_by_group_raw, how='left')
|
|
|
|
# Fill NaN values with 0 for groups with no alarm/warnings
|
|
alarm_warning_by_group = alarm_warning_by_group.fillna({
|
|
'Total_Alarm_Warning_Duration': 0,
|
|
'Alarm_Warning_Count': 0,
|
|
'Avg_Alarm_Warning_Duration': 0,
|
|
'Unique_Sensors_With_Alarm_Warning': 0
|
|
})
|
|
|
|
# Calculate downtime percentage based on the total time span across ALL groups
|
|
alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = (
|
|
(alarm_warning_by_group['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100
|
|
).round(4)
|
|
|
|
# Cap the downtime percentage at 100% to avoid impossible negative uptime values
|
|
alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'].apply(
|
|
lambda x: min(100.0, x)
|
|
)
|
|
|
|
alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = (
|
|
100 - alarm_warning_by_group['Alarm_Warning_Downtime_Percentage']
|
|
).round(4)
|
|
|
|
# Ensure uptime doesn't go below 0
|
|
alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'].apply(
|
|
lambda x: max(0, round(x, 4))
|
|
)
|
|
else:
|
|
error_by_group = pd.DataFrame()
|
|
alarm_warning_by_group = pd.DataFrame()
|
|
|
|
# Compile all results
|
|
uptime_results = {
|
|
'total_time_span_minutes': total_time_span_minutes,
|
|
'total_time_span_hours': total_time_span_minutes / 60,
|
|
'total_start_time': total_start_time,
|
|
'total_end_time': total_end_time,
|
|
# System-wide metrics
|
|
'error_downtime_minutes': total_error_duration,
|
|
'error_downtime_percentage': error_downtime_percentage,
|
|
'error_uptime_percentage': 100 - error_downtime_percentage,
|
|
'alarm_warning_downtime_minutes': total_alarm_warning_duration,
|
|
'alarm_warning_downtime_percentage': alarm_warning_downtime_percentage,
|
|
'alarm_warning_uptime_percentage': 100 - alarm_warning_downtime_percentage,
|
|
'combined_downtime_minutes': total_operational_duration,
|
|
'combined_downtime_percentage': combined_downtime_percentage,
|
|
'combined_uptime_percentage': 100 - combined_downtime_percentage,
|
|
# System-level metrics
|
|
'system_error_uptime_percentage': system_error_uptime_percentage,
|
|
'system_alarm_warning_uptime_percentage': system_alarm_warning_uptime_percentage,
|
|
# Per-sensor metrics
|
|
'error_by_sensor': error_by_sensor,
|
|
'alarm_warning_by_sensor': alarm_warning_by_sensor,
|
|
# Per-group metrics
|
|
'error_by_group': error_by_group,
|
|
'alarm_warning_by_group': alarm_warning_by_group
|
|
}
|
|
|
|
return uptime_results
|
|
|
|
def export_results(self, output_dir="output"):
|
|
"""
|
|
Export analysis results to CSV files
|
|
"""
|
|
print("\n--- EXPORTING RESULTS ---")
|
|
|
|
import os
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
if self.processed_events is None or len(self.processed_events) == 0:
|
|
print("No processed events available for export. Run pair_events_and_calculate_durations first.")
|
|
return
|
|
|
|
# Filter resolved events for export
|
|
duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy()
|
|
|
|
if len(duration_events) == 0:
|
|
print("No resolved events with duration data available for export.")
|
|
return
|
|
|
|
# 1. Export all paired events
|
|
paired_events_path = os.path.join(output_dir, "paired_alarm_events.csv")
|
|
duration_events.to_csv(paired_events_path, index=False)
|
|
print(f"Exported paired alarm events to: {paired_events_path}")
|
|
|
|
# 2. Export summary by alarm type
|
|
summary_by_type = duration_events.groupby('Alarm_Type').agg({
|
|
'Duration_Minutes': ['count', 'min', 'max', 'mean'],
|
|
'Sensor_Id': 'nunique'
|
|
}).round(2)
|
|
summary_by_type.columns = ['Event_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Unique_Sensors']
|
|
summary_by_type_path = os.path.join(output_dir, "summary_by_alarm_type.csv")
|
|
summary_by_type.to_csv(summary_by_type_path)
|
|
print(f"Exported summary by alarm type to: {summary_by_type_path}")
|
|
|
|
# 3. Export sensor statistics
|
|
sensor_stats = duration_events.groupby('Sensor_Id').agg({
|
|
'Alarm_Type': ['count', 'nunique'],
|
|
'Duration_Minutes': ['min', 'max', 'mean'],
|
|
'Start_Time': ['min', 'max']
|
|
}).round(2)
|
|
sensor_stats.columns = ['Total_Alarm_Count', 'Alarm_Type_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'First_Alarm', 'Last_Alarm']
|
|
sensor_stats = sensor_stats.sort_values('Total_Alarm_Count', ascending=False)
|
|
|
|
# Add Group and Name information to sensor statistics
|
|
if self.sensor_mapping:
|
|
sensor_stats['Sensor_Name'] = sensor_stats.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
sensor_stats['Sensor_Group'] = sensor_stats.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in sensor_stats.columns if col not in ['Sensor_Name', 'Sensor_Group']]
|
|
sensor_stats = sensor_stats[cols]
|
|
|
|
sensor_stats_path = os.path.join(output_dir, "sensor_statistics.csv")
|
|
sensor_stats.to_csv(sensor_stats_path)
|
|
print(f"Exported sensor statistics to: {sensor_stats_path}")
|
|
|
|
# 4. Export top sensors by various metrics
|
|
# Top sensors by alarm count
|
|
top_by_count = duration_events['Sensor_Id'].value_counts().to_frame('Alarm_Count')
|
|
|
|
# Add Group and Name information to top sensors by alarm count
|
|
if self.sensor_mapping:
|
|
top_by_count['Sensor_Name'] = top_by_count.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
top_by_count['Sensor_Group'] = top_by_count.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group', 'Alarm_Count']
|
|
top_by_count = top_by_count[cols]
|
|
|
|
top_by_count_path = os.path.join(output_dir, "top_sensors_by_alarm_count.csv")
|
|
top_by_count.to_csv(top_by_count_path)
|
|
print(f"Exported top sensors by alarm count to: {top_by_count_path}")
|
|
|
|
# Top sensors by average duration
|
|
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration')
|
|
|
|
# Add Group and Name information to top sensors by average duration
|
|
if self.sensor_mapping:
|
|
avg_duration_by_sensor['Sensor_Name'] = avg_duration_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
avg_duration_by_sensor['Sensor_Group'] = avg_duration_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group', 'Avg_Duration']
|
|
avg_duration_by_sensor = avg_duration_by_sensor[cols]
|
|
|
|
avg_duration_path = os.path.join(output_dir, "top_sensors_by_avg_duration.csv")
|
|
avg_duration_by_sensor.to_csv(avg_duration_path)
|
|
print(f"Exported top sensors by average duration to: {avg_duration_path}")
|
|
|
|
# Top sensors by max duration
|
|
max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration')
|
|
|
|
# Add Group and Name information to top sensors by max duration
|
|
if self.sensor_mapping:
|
|
max_duration_by_sensor['Sensor_Name'] = max_duration_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
max_duration_by_sensor['Sensor_Group'] = max_duration_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group', 'Max_Duration']
|
|
max_duration_by_sensor = max_duration_by_sensor[cols]
|
|
|
|
max_duration_path = os.path.join(output_dir, "top_sensors_by_max_duration.csv")
|
|
max_duration_by_sensor.to_csv(max_duration_path)
|
|
print(f"Exported top sensors by max duration to: {max_duration_path}")
|
|
|
|
# Top sensors by total severity score
|
|
severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1}
|
|
duration_events['Severity_Score'] = duration_events.apply(
|
|
lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1
|
|
)
|
|
severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score')
|
|
|
|
# Add Group and Name information to top sensors by severity score
|
|
if self.sensor_mapping:
|
|
severity_by_sensor['Sensor_Name'] = severity_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown')
|
|
)
|
|
severity_by_sensor['Sensor_Group'] = severity_by_sensor.index.map(
|
|
lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown')
|
|
)
|
|
# Reorder columns to put Sensor_Id, Name, Group first
|
|
cols = ['Sensor_Name', 'Sensor_Group', 'Total_Severity_Score']
|
|
severity_by_sensor = severity_by_sensor[cols]
|
|
|
|
severity_path = os.path.join(output_dir, "top_sensors_by_severity_score.csv")
|
|
severity_by_sensor.to_csv(severity_path)
|
|
print(f"Exported top sensors by severity score to: {severity_path}")
|
|
|
|
# 5. Export time-based analysis
|
|
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
|
|
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
|
|
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
|
|
|
|
# Hourly frequency
|
|
hourly_freq = duration_events.groupby('Start_Hour').size().to_frame('Alarm_Count')
|
|
hourly_path = os.path.join(output_dir, "alarm_frequency_by_hour.csv")
|
|
hourly_freq.to_csv(hourly_path)
|
|
print(f"Exported alarm frequency by hour to: {hourly_path}")
|
|
|
|
# Daily frequency
|
|
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
|
daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0).to_frame('Alarm_Count')
|
|
daily_path = os.path.join(output_dir, "alarm_frequency_by_day.csv")
|
|
daily_freq.to_csv(daily_path)
|
|
print(f"Exported alarm frequency by day to: {daily_path}")
|
|
|
|
# 6. Export group-based analysis if sensor groups are available
|
|
if 'Sensor_Group' in duration_events.columns:
|
|
print("\nExporting group-based analysis...")
|
|
|
|
# Group statistics
|
|
group_stats = duration_events.groupby('Sensor_Group').agg({
|
|
'Sensor_Id': ['count', 'nunique'],
|
|
'Duration_Minutes': ['min', 'max', 'mean', 'sum'],
|
|
'Start_Time': ['min', 'max']
|
|
}).round(2)
|
|
group_stats.columns = ['Total_Alarm_Count', 'Unique_Sensors', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Total_Duration', 'First_Alarm', 'Last_Alarm']
|
|
group_stats = group_stats.sort_values('Total_Alarm_Count', ascending=False)
|
|
|
|
# Calculate total sensors per group from sensor report
|
|
if self.sensor_data is not None and 'Group' in self.sensor_data.columns:
|
|
# Get the total number of sensors in each group from the sensor report
|
|
# Process the sensor data to handle hierarchical structure properly
|
|
processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data)
|
|
|
|
# Count unique sensors per group (where ID is not null)
|
|
sensor_counts_by_group = processed_sensor_data[processed_sensor_data['ID'].notna()].groupby('Group')['ID'].nunique().fillna(0).astype(int)
|
|
|
|
# Add Total_Sensors_In_Group column
|
|
group_stats['Total_Sensors_In_Group'] = group_stats.index.map(
|
|
lambda x: sensor_counts_by_group.get(x, 0) if x != 'Unknown' else
|
|
len(processed_sensor_data[((processed_sensor_data['Group'].isna()) | (processed_sensor_data['Group'] == 'Unknown')) & (processed_sensor_data['ID'].notna())])
|
|
).fillna(0).astype(int)
|
|
|
|
# Calculate percentage of monitoring points that experienced alarms
|
|
# Avoid division by zero
|
|
group_stats['Percentage_Monitoring_Points_Alarmed'] = (
|
|
(group_stats['Unique_Sensors'] / group_stats['Total_Sensors_In_Group']) * 100
|
|
).round(2)
|
|
group_stats['Percentage_Monitoring_Points_Alarmed'] = group_stats['Percentage_Monitoring_Points_Alarmed'].fillna(0).replace([np.inf, -np.inf], 0)
|
|
|
|
# Calculate alarm time percentage for each group
|
|
# Get the overall time range from alarm data
|
|
first_alarm_overall = self.alarm_data['Date'].min()
|
|
last_alarm_overall = self.alarm_data['Date'].max()
|
|
|
|
if pd.notna(first_alarm_overall) and pd.notna(last_alarm_overall):
|
|
total_time_span_hours = (last_alarm_overall - first_alarm_overall).total_seconds() / 3600.0
|
|
|
|
# Calculate the percentage of total possible sensor-hours that were in alarm
|
|
# Total possible sensor-hours = total sensors in group * total time span
|
|
total_possible_sensor_hours = group_stats['Total_Sensors_In_Group'] * total_time_span_hours
|
|
|
|
# Actual alarm-hours = total alarm duration in hours
|
|
actual_alarm_hours = group_stats['Total_Duration'] / 60.0 # Convert minutes to hours
|
|
|
|
# Calculate percentage
|
|
group_stats['Alarm_Time_Percentage'] = (
|
|
(actual_alarm_hours / total_possible_sensor_hours) * 100
|
|
).round(2)
|
|
group_stats['Alarm_Time_Percentage'] = group_stats['Alarm_Time_Percentage'].fillna(0).replace([np.inf, -np.inf], 0)
|
|
else:
|
|
group_stats['Alarm_Time_Percentage'] = 0.0
|
|
|
|
group_stats_path = os.path.join(output_dir, "group_statistics.csv")
|
|
group_stats.to_csv(group_stats_path)
|
|
print(f"Exported group statistics to: {group_stats_path}")
|
|
|
|
# Top groups by various metrics
|
|
# Top groups by alarm count
|
|
top_groups_by_count = duration_events['Sensor_Group'].value_counts().to_frame('Alarm_Count')
|
|
top_groups_count_path = os.path.join(output_dir, "top_groups_by_alarm_count.csv")
|
|
top_groups_by_count.to_csv(top_groups_count_path)
|
|
print(f"Exported top groups by alarm count to: {top_groups_count_path}")
|
|
|
|
# Top groups by average duration
|
|
avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration')
|
|
avg_duration_group_path = os.path.join(output_dir, "top_groups_by_avg_duration.csv")
|
|
avg_duration_by_group.to_csv(avg_duration_group_path)
|
|
print(f"Exported top groups by average duration to: {avg_duration_group_path}")
|
|
|
|
# Top groups by max duration
|
|
max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration')
|
|
max_duration_group_path = os.path.join(output_dir, "top_groups_by_max_duration.csv")
|
|
max_duration_by_group.to_csv(max_duration_group_path)
|
|
print(f"Exported top groups by max duration to: {max_duration_group_path}")
|
|
|
|
# Top groups by total severity score
|
|
severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score')
|
|
severity_group_path = os.path.join(output_dir, "top_groups_by_severity_score.csv")
|
|
severity_by_group.to_csv(severity_group_path)
|
|
print(f"Exported top groups by severity score to: {severity_group_path}")
|
|
|
|
# Alarm type distribution by group
|
|
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
|
|
alarm_type_group_path = os.path.join(output_dir, "alarm_type_distribution_by_group.csv")
|
|
alarm_type_by_group.to_csv(alarm_type_group_path)
|
|
print(f"Exported alarm type distribution by group to: {alarm_type_group_path}")
|
|
|
|
print(f"\nAll results exported to '{output_dir}' directory successfully!")
|
|
|
|
def export_uptime_metrics(self, output_dir="output", uptime_results=None):
|
|
"""
|
|
Export uptime/downtime metrics to new output files
|
|
"""
|
|
print("\n--- EXPORTING UPTIME/DOWNTIME METRICS ---")
|
|
|
|
import os
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Calculate uptime metrics if not provided
|
|
if uptime_results is None:
|
|
uptime_results = self.calculate_uptime_metrics()
|
|
|
|
if uptime_results is None:
|
|
print("No uptime results to export.")
|
|
return
|
|
|
|
# Export system-wide summary
|
|
summary_data = {
|
|
'Metric': [
|
|
'Total_Time_Span_Minutes',
|
|
'Total_Time_Span_Hours',
|
|
'Total_Start_Time',
|
|
'Total_End_Time',
|
|
'Total_Error_Duration_Minutes',
|
|
'Error_Downtime_Percentage',
|
|
'Error_Uptime_Percentage',
|
|
'Total_Alarm_Warning_Duration_Minutes',
|
|
'Alarm_Warning_Downtime_Percentage',
|
|
'Alarm_Warning_Uptime_Percentage',
|
|
'Total_Combined_Duration_Minutes',
|
|
'Combined_Downtime_Percentage',
|
|
'Combined_Uptime_Percentage',
|
|
'System_Error_Uptime_Percentage_Time_Based',
|
|
'System_Alarm_Warning_Uptime_Percentage_Time_Based'
|
|
],
|
|
'Value': [
|
|
uptime_results['total_time_span_minutes'],
|
|
uptime_results['total_time_span_hours'],
|
|
uptime_results['total_start_time'],
|
|
uptime_results['total_end_time'],
|
|
uptime_results['error_downtime_minutes'],
|
|
uptime_results['error_downtime_percentage'],
|
|
uptime_results['error_uptime_percentage'],
|
|
uptime_results['alarm_warning_downtime_minutes'],
|
|
uptime_results['alarm_warning_downtime_percentage'],
|
|
uptime_results['alarm_warning_uptime_percentage'],
|
|
uptime_results['combined_downtime_minutes'],
|
|
uptime_results['combined_downtime_percentage'],
|
|
uptime_results['combined_uptime_percentage'],
|
|
uptime_results.get('system_error_uptime_percentage', 'N/A'),
|
|
uptime_results.get('system_alarm_warning_uptime_percentage', 'N/A')
|
|
]
|
|
}
|
|
summary_df = pd.DataFrame(summary_data)
|
|
summary_path = os.path.join(output_dir, "system_uptime_summary.csv")
|
|
summary_df.to_csv(summary_path, index=False)
|
|
print(f"Exported system uptime summary to: {summary_path}")
|
|
|
|
# Export per-sensor error metrics
|
|
if not uptime_results['error_by_sensor'].empty:
|
|
error_sensor_path = os.path.join(output_dir, "sensor_error_uptime_metrics.csv")
|
|
uptime_results['error_by_sensor'].to_csv(error_sensor_path)
|
|
print(f"Exported per-sensor error uptime metrics to: {error_sensor_path}")
|
|
|
|
# Export per-sensor alarm/warning metrics
|
|
if not uptime_results['alarm_warning_by_sensor'].empty:
|
|
alarm_warning_sensor_path = os.path.join(output_dir, "sensor_alarm_warning_uptime_metrics.csv")
|
|
uptime_results['alarm_warning_by_sensor'].to_csv(alarm_warning_sensor_path)
|
|
print(f"Exported per-sensor alarm/warning uptime metrics to: {alarm_warning_sensor_path}")
|
|
|
|
# Export per-group error metrics
|
|
if not uptime_results['error_by_group'].empty:
|
|
error_group_path = os.path.join(output_dir, "group_error_uptime_metrics.csv")
|
|
# Reset index to make Sensor_Group a regular column
|
|
error_by_group_df = uptime_results['error_by_group'].reset_index()
|
|
# Make sure the index column is named properly
|
|
error_by_group_df.columns = ['Sensor_Group'] + list(error_by_group_df.columns[1:])
|
|
error_by_group_df.to_csv(error_group_path, index=False)
|
|
print(f"Exported per-group error uptime metrics to: {error_group_path}")
|
|
|
|
# Export per-group alarm/warning metrics
|
|
if not uptime_results['alarm_warning_by_group'].empty:
|
|
alarm_warning_group_path = os.path.join(output_dir, "group_alarm_warning_uptime_metrics.csv")
|
|
# Reset index to make Sensor_Group a regular column
|
|
alarm_warning_by_group_df = uptime_results['alarm_warning_by_group'].reset_index()
|
|
# Make sure the index column is named properly
|
|
alarm_warning_by_group_df.columns = ['Sensor_Group'] + list(alarm_warning_by_group_df.columns[1:])
|
|
alarm_warning_by_group_df.to_csv(alarm_warning_group_path, index=False)
|
|
print(f"Exported per-group alarm/warning uptime metrics to: {alarm_warning_group_path}")
|
|
|
|
print(f"\nUptime/downtime metrics exported to '{output_dir}' directory successfully!")
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
# Define file paths
|
|
csv_file = "CardinalAlarmsDec25.csv"
|
|
xlsx_file = "SensorReport Cardinal 2025-12-23_processed.xlsx" # Updated to the new file name
|
|
exclusion_file = "exclusion_config.json" # Optional: specify groups to exclude
|
|
|
|
# Create analyzer instance with exclusion file
|
|
analyzer = AlarmAnalyzer(csv_file, xlsx_file, exclusion_file_path=exclusion_file)
|
|
|
|
# Load data
|
|
alarm_data, sensor_data = analyzer.load_data()
|
|
|
|
# Categorize alarms
|
|
categorized_data = analyzer.categorize_alarms()
|
|
|
|
print("\nFirst few rows of categorized data:")
|
|
print(categorized_data.head())
|
|
|
|
# Pair events and calculate durations
|
|
paired_events = analyzer.pair_events_and_calculate_durations()
|
|
|
|
print("\nFirst few rows of paired events:")
|
|
print(paired_events.head())
|
|
|
|
# Perform basic analysis
|
|
basic_results = analyzer.basic_analysis()
|
|
|
|
# Perform advanced analysis
|
|
advanced_results = analyzer.advanced_analysis()
|
|
|
|
# Create visualizations
|
|
analyzer.create_visualizations(save_plots=True)
|
|
|
|
# Perform uptime analysis
|
|
uptime_results = analyzer.calculate_uptime_metrics()
|
|
|
|
# Export results
|
|
analyzer.export_results(output_dir="output")
|
|
|
|
# Export uptime metrics to new files
|
|
analyzer.export_uptime_metrics(output_dir="output", uptime_results=uptime_results) |