import pandas as pd import numpy as np import re from datetime import datetime from openpyxl import load_workbook import warnings warnings.filterwarnings('ignore') # Import matplotlib and seaborn only when needed for visualizations def _import_viz_libs(): import matplotlib.pyplot as plt import seaborn as sns return plt, sns class AlarmAnalyzer: def __init__(self, csv_file_path, xlsx_file_path=None, exclusion_file_path=None): """ Initialize the Alarm Analyzer with CSV and optional XLSX file paths """ self.csv_file_path = csv_file_path self.xlsx_file_path = xlsx_file_path self.exclusion_file_path = exclusion_file_path self.excluded_groups = set() self.alarm_data = None self.sensor_data = None self.processed_events = None self.sensor_mapping = None # Load excluded groups if exclusion file is provided if self.exclusion_file_path: self.load_excluded_groups() def load_excluded_groups(self): """ Load groups to exclude from analysis from a configuration file Supports both JSON format: {"excluded_groups": ["Group1", "Group2"]} and simple text format: one group name per line """ import json import os if not os.path.exists(self.exclusion_file_path): print(f"Warning: Exclusion file {self.exclusion_file_path} does not exist. No groups will be excluded.") return print(f"Loading excluded groups from {self.exclusion_file_path}...") try: # Try to parse as JSON first with open(self.exclusion_file_path, 'r') as f: content = f.read().strip() # Check if it's a JSON file by attempting to parse if content.startswith('{') or content.startswith('['): # It's a JSON file with open(self.exclusion_file_path, 'r') as f: config = json.load(f) if isinstance(config, dict) and 'excluded_groups' in config: self.excluded_groups = set(config['excluded_groups']) elif isinstance(config, list): # If the JSON is just an array of group names self.excluded_groups = set(config) else: print(f"Warning: Invalid JSON format in {self.exclusion_file_path}. Expected object with 'excluded_groups' key or array of group names.") return else: # It's a text file - read line by line with open(self.exclusion_file_path, 'r') as f: groups = [line.strip() for line in f if line.strip()] self.excluded_groups = set(groups) print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}") except json.JSONDecodeError: # If JSON parsing fails, treat as text file print(f"JSON parsing failed, treating {self.exclusion_file_path} as text file...") try: with open(self.exclusion_file_path, 'r') as f: groups = [line.strip() for line in f if line.strip()] self.excluded_groups = set(groups) print(f"Loaded {len(self.excluded_groups)} groups to exclude: {list(self.excluded_groups)}") except Exception as e: print(f"Error reading exclusion file: {e}") except Exception as e: print(f"Error loading exclusion file: {e}") def load_data(self): """ Load alarm data from CSV and sensor descriptions from XLSX """ print("Loading alarm data from CSV...") self.alarm_data = pd.read_csv(self.csv_file_path) # Convert Date and LogTime to datetime self.alarm_data['Date'] = pd.to_datetime(self.alarm_data['Date']) self.alarm_data['LogTime'] = pd.to_datetime(self.alarm_data['LogTime']) print(f"Loaded {len(self.alarm_data)} alarm records") print(f"Date range: {self.alarm_data['Date'].min()} to {self.alarm_data['Date'].max()}") # Load sensor descriptions if XLSX file is provided if self.xlsx_file_path: print("Loading sensor descriptions from XLSX...") try: # Read the sensor report - check if it's the new format (header=0) or old format (header=4) # First, try to read with header=0 (new format) temp_df = pd.read_excel(self.xlsx_file_path, header=0, nrows=5) # Check if the first row contains expected column names (new format) expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name'] has_expected_cols = any(col in temp_df.columns for col in expected_cols) if has_expected_cols: # New format - use header=0 self.sensor_data = pd.read_excel(self.xlsx_file_path, header=0) print("Detected new sensor report format (header=0)") # For new format, no hierarchical processing needed processed_sensor_data = self.sensor_data else: # Old format - use header=4 with hierarchical processing self.sensor_data = pd.read_excel(self.xlsx_file_path, header=4) print("Detected old sensor report format (header=4)") # Process the sensor data to handle hierarchical structure where group names # apply to all rows below until the next group name processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data) print(f"Loaded sensor data with {len(self.sensor_data)} records") print(f"Sensor data columns: {list(self.sensor_data.columns)}") # Create a mapping from Sensor_Id to sensor details # The 'ID' column appears to have numeric values that could match Sensor_Id in alarm data if 'ID' in self.sensor_data.columns: # Create a mapping from ID to other details self.sensor_mapping = {} for _, row in processed_sensor_data.iterrows(): sensor_id_raw = row['ID'] if pd.notna(sensor_id_raw): # Only map non-null values # Convert to int if it's numeric to match the alarm data try: sensor_id = int(sensor_id_raw) except (ValueError, TypeError): continue # Skip if conversion fails self.sensor_mapping[sensor_id] = { 'name': row['Name'] if pd.notna(row['Name']) else (row['Remote'] if pd.notna(row['Remote']) else 'Unknown'), 'group': row['Group'] if pd.notna(row['Group']) else 'Unknown', 'type': row['Type'] if pd.notna(row['Type']) else 'Unknown', 'serial_no': row['Serial No'] if pd.notna(row['Serial No']) else 'Unknown' } print(f"Created sensor mapping for {len(self.sensor_mapping)} sensors") # Add sensor information to alarm data self.add_sensor_info_to_alarms() # Filter out excluded groups if any are specified self.filter_excluded_groups() # Log summary of excluded groups if self.excluded_groups: print(f"Excluded groups summary: {len(self.excluded_groups)} groups were excluded from analysis") print(f"Excluded groups: {', '.join(sorted(self.excluded_groups))}") else: print("Warning: 'ID' column not found in sensor report. Attempting to find alternative mapping...") # Try to find other potential ID columns that might match Sensor_Id for col in self.sensor_data.columns: if 'SN' in str(col) or 'Remote' in str(col) or 'Id' in str(col): print(f"Found potential ID column: {col}") self.sensor_mapping = {} except Exception as e: print(f"Could not load XLSX file: {e}") import traceback traceback.print_exc() self.sensor_data = None self.sensor_mapping = {} else: print("No XLSX file provided for sensor descriptions") self.sensor_data = None self.sensor_mapping = {} return self.alarm_data, self.sensor_data def _process_hierarchical_sensor_data(self, sensor_df): """ Process sensor data to handle hierarchical structure where group names apply to all rows below until the next group name is specified. Same logic applies to other columns that follow this pattern. """ # Make a copy to avoid modifying the original dataframe df = sensor_df.copy() # Forward fill hierarchical columns to propagate values to empty cells below # This handles the case where a group name applies to all rows below it hierarchical_cols = ['Group', 'Remote', 'Name', 'Type', 'Serial No'] for col in hierarchical_cols: if col in df.columns: # Forward fill: propagate non-null values down until the next non-null value df[col] = df[col].ffill() return df def add_sensor_info_to_alarms(self): """ Add sensor information (Name, Group, Type) to alarm data using the sensor mapping """ if not self.sensor_mapping: print("No sensor mapping available, skipping sensor info addition") return # Add sensor name, group, and type to alarm data self.alarm_data['Sensor_Name'] = self.alarm_data['Sensor_Id'].map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) self.alarm_data['Sensor_Group'] = self.alarm_data['Sensor_Id'].map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) self.alarm_data['Sensor_Type'] = self.alarm_data['Sensor_Id'].map( lambda x: self.sensor_mapping.get(x, {}).get('type', 'Unknown') ) print(f"Added sensor information to {len(self.alarm_data)} alarm records") print(f"Found sensor names for {self.alarm_data['Sensor_Name'].nunique()} unique sensors") print(f"Found sensor groups for {self.alarm_data['Sensor_Group'].nunique()} unique sensors") def filter_excluded_groups(self): """ Filter out alarm data for excluded groups """ if not self.excluded_groups: print("No groups to exclude, skipping filtering") return initial_count = len(self.alarm_data) # Filter out rows where Sensor_Group is in excluded_groups self.alarm_data = self.alarm_data[~self.alarm_data['Sensor_Group'].isin(self.excluded_groups)] final_count = len(self.alarm_data) excluded_count = initial_count - final_count print(f"Filtered out {excluded_count} alarm records from excluded groups: {list(self.excluded_groups)}") print(f"Remaining alarm records: {final_count}") def parse_alarm_description(self, description): """ Parse alarm description to extract alarm type, value, threshold, and unit """ if pd.isna(description): return {'type': 'Unknown', 'value': None, 'threshold': None, 'unit': None} desc = str(description).strip() # Determine alarm type if 'Error' in desc: alarm_type = 'Error' elif 'Alarm' in desc and 'Warning' not in desc: alarm_type = 'Alarm' elif 'Warning' in desc: alarm_type = 'Warning' elif 'Normal' in desc: alarm_type = 'Normal' else: alarm_type = 'Other' # Extract value and threshold using regex value = None threshold = None unit = None # Pattern to match values in descriptions like "Hi Alarm: 51.3>=46.0F" value_pattern = r'([+-]?\d*\.?\d+)' unit_pattern = r'([CF%RH"|]+|min\.|%)' # Extract all numeric values numeric_matches = re.findall(value_pattern, desc) numeric_values = [float(x) for x in numeric_matches if x] # Extract unit unit_match = re.search(unit_pattern, desc) if unit_match: unit = unit_match.group(1) # Determine value and threshold based on alarm type if alarm_type == 'Normal': # For Normal events, the value is the current reading if len(numeric_values) >= 1: value = numeric_values[0] elif alarm_type in ['Alarm', 'Warning', 'Error']: # For alarm events, we typically have both current value and threshold if len(numeric_values) >= 2: value = numeric_values[0] threshold = numeric_values[1] elif len(numeric_values) == 1: # Sometimes only threshold is provided threshold = numeric_values[0] return { 'type': alarm_type, 'value': value, 'threshold': threshold, 'unit': unit } def categorize_alarms(self): """ Add parsed alarm information to the dataset """ print("Categorizing alarms...") # Apply parsing to each description parsed_data = self.alarm_data['Description'].apply(self.parse_alarm_description) # Create new columns for parsed information self.alarm_data['AlarmType'] = parsed_data.apply(lambda x: x['type']) self.alarm_data['Value'] = parsed_data.apply(lambda x: x['value']) self.alarm_data['Threshold'] = parsed_data.apply(lambda x: x['threshold']) self.alarm_data['Unit'] = parsed_data.apply(lambda x: x['unit']) # Count alarm types alarm_counts = self.alarm_data['AlarmType'].value_counts() print("Alarm type distribution:") for alarm_type, count in alarm_counts.items(): print(f" {alarm_type}: {count}") # Add sensor name, group, and type if not already added if 'Sensor_Name' not in self.alarm_data.columns and self.sensor_mapping: self.add_sensor_info_to_alarms() return self.alarm_data def pair_events_and_calculate_durations(self): """ Pair alarm start events with corresponding end events and calculate durations Updated to handle transitions between different alarm conditions (e.g., warning -> alarm) """ print("Pairing alarm events and calculating durations...") # Sort the data by Sensor_Id and Date to ensure proper chronological order self.alarm_data = self.alarm_data.sort_values(['Sensor_Id', 'Date']).reset_index(drop=True) # Create a new DataFrame to store paired events paired_events = [] # Group by Sensor_Id to process each sensor separately for sensor_id in self.alarm_data['Sensor_Id'].unique(): sensor_data = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].copy() sensor_data = sensor_data.sort_values('Date').reset_index(drop=True) # Find alarm start events (Alarm, Warning, Error) and pair with next Normal event or different alarm condition i = 0 while i < len(sensor_data): current_event = sensor_data.iloc[i] # Check if current event is an alarm start (not Normal or Other) if current_event['AlarmType'] in ['Alarm', 'Warning', 'Error']: start_time = current_event['Date'] start_type = current_event['AlarmType'] start_description = current_event['Description'] start_value = current_event['Value'] start_threshold = current_event['Threshold'] start_alarm_id = current_event['Alarm_Id'] # Get sensor information for the current sensor sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', 'Unknown') sensor_group = sensor_info.get('group', 'Unknown') sensor_type = sensor_info.get('type', 'Unknown') # Look for the next Normal event OR a different alarm condition for this sensor j = i + 1 end_found = False while j < len(sensor_data): next_event = sensor_data.iloc[j] # End condition 1: Normal event ends any alarm condition if next_event['AlarmType'] == 'Normal': end_time = next_event['Date'] end_description = next_event['Description'] end_value = next_event['Value'] end_alarm_id = next_event['Alarm_Id'] # Calculate duration duration = end_time - start_time duration_minutes = duration.total_seconds() / 60.0 # Add to paired events paired_events.append({ 'Sensor_Id': sensor_id, 'Sensor_Name': sensor_name, 'Sensor_Group': sensor_group, 'Sensor_Type': sensor_type, 'Start_Time': start_time, 'End_Time': end_time, 'Duration_Minutes': duration_minutes, 'Alarm_Type': start_type, 'Start_Description': start_description, 'End_Description': end_description, 'Start_Value': start_value, 'Threshold': start_threshold, 'End_Value': end_value, 'Start_Alarm_Id': start_alarm_id, 'End_Alarm_Id': end_alarm_id, 'End_Reason': 'Normal' }) # Move index to the end event and break to find next start event i = j end_found = True break # End condition 2: Different alarm condition (transition from warning to alarm, etc.) elif next_event['AlarmType'] in ['Alarm', 'Warning', 'Error']: # If the next event is a different alarm condition, end the current one # Calculate duration up to the next alarm condition end_time = next_event['Date'] end_description = next_event['Description'] end_value = next_event['Value'] end_alarm_id = next_event['Alarm_Id'] # Calculate duration duration = end_time - start_time duration_minutes = duration.total_seconds() / 60.0 # Add to paired events paired_events.append({ 'Sensor_Id': sensor_id, 'Sensor_Name': sensor_name, 'Sensor_Group': sensor_group, 'Sensor_Type': sensor_type, 'Start_Time': start_time, 'End_Time': end_time, 'Duration_Minutes': duration_minutes, 'Alarm_Type': start_type, 'Start_Description': start_description, 'End_Description': end_description, 'Start_Value': start_value, 'Threshold': start_threshold, 'End_Value': end_value, 'Start_Alarm_Id': start_alarm_id, 'End_Alarm_Id': end_alarm_id, 'End_Reason': f'Transition to {next_event["AlarmType"]}' }) # Move index to the next event (the new alarm condition becomes the start) i = j end_found = True break j += 1 # If no corresponding end event (Normal or different alarm condition) was found, record as unresolved if not end_found: duration = None paired_events.append({ 'Sensor_Id': sensor_id, 'Sensor_Name': sensor_name, 'Sensor_Group': sensor_group, 'Sensor_Type': sensor_type, 'Start_Time': start_time, 'End_Time': None, 'Duration_Minutes': None, 'Alarm_Type': start_type, 'Start_Description': start_description, 'End_Description': None, 'Start_Value': start_value, 'Threshold': start_threshold, 'End_Value': None, 'Start_Alarm_Id': start_alarm_id, 'End_Alarm_Id': None, 'End_Reason': 'Unresolved' }) i += 1 # Convert to DataFrame self.processed_events = pd.DataFrame(paired_events) print(f"Paired {len(self.processed_events)} events") if len(self.processed_events) > 0: print(f"Events with duration: {len(self.processed_events[self.processed_events['Duration_Minutes'].notna()])}") print(f"Unresolved events: {len(self.processed_events[self.processed_events['Duration_Minutes'].isna()])}") return self.processed_events def basic_analysis(self): """ Perform basic analysis: counts, min/max/average durations by sensor and alarm type """ print("Performing basic analysis...") if self.processed_events is None or len(self.processed_events) == 0: print("No processed events available for analysis. Run pair_events_and_calculate_durations first.") return # Analysis by alarm type and sensor print("\n--- ALARM COUNTS BY TYPE AND SENSOR ---") # Count events by alarm type and sensor count_by_type_sensor = self.processed_events.groupby(['Alarm_Type', 'Sensor_Id']).size().reset_index(name='Count') # Get top sensors by alarm count count_by_sensor = self.processed_events.groupby(['Sensor_Id', 'Alarm_Type']).size().reset_index(name='Count') print("\nTop 10 sensors with most alarms:") sensor_totals = count_by_sensor.groupby('Sensor_Id')['Count'].sum().sort_values(ascending=False) print(sensor_totals.head(10)) # If sensor groups are available, also analyze by group if 'Sensor_Group' in self.processed_events.columns: print("\n--- ALARM COUNTS BY GROUP ---") count_by_group = self.processed_events.groupby(['Alarm_Type', 'Sensor_Group']).size().reset_index(name='Count') print("\nTop 10 groups with most alarms:") group_totals = self.processed_events.groupby('Sensor_Group')['Sensor_Id'].count().sort_values(ascending=False) print(group_totals.head(10)) print("\n--- DURATION ANALYSIS ---") # Filter out unresolved events for duration analysis duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy() if len(duration_events) > 0: # Calculate min, max, avg duration by alarm type and sensor duration_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Id'])['Duration_Minutes'].agg([ 'count', 'min', 'max', 'mean' ]).round(2).reset_index() print("\nDuration statistics by alarm type and sensor (top 10 by count):") print(duration_stats.sort_values('count', ascending=False).head(10)) # Calculate overall statistics by alarm type overall_duration_stats = duration_events.groupby('Alarm_Type')['Duration_Minutes'].agg([ 'count', 'min', 'max', 'mean' ]).round(2) print("\nOverall duration statistics by alarm type:") print(overall_duration_stats) # Calculate statistics for all sensors combined all_sensor_stats = duration_events.groupby('Sensor_Id')['Duration_Minutes'].agg([ 'count', 'min', 'max', 'mean' ]).round(2).sort_values('count', ascending=False) print("\nTop 10 sensors by alarm count with duration stats:") print(all_sensor_stats.head(10)) # If sensor groups are available, calculate group statistics if 'Sensor_Group' in duration_events.columns: print("\n--- GROUP-BASED DURATION ANALYSIS ---") # Calculate statistics by group group_duration_stats = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([ 'count', 'min', 'max', 'mean' ]).round(2).sort_values('count', ascending=False) print("\nTop 10 groups by alarm count with duration stats:") print(group_duration_stats.head(10)) # Calculate statistics by alarm type and group type_group_stats = duration_events.groupby(['Alarm_Type', 'Sensor_Group'])['Duration_Minutes'].agg([ 'count', 'min', 'max', 'mean' ]).round(2).sort_values('count', ascending=False) print("\nTop 10 alarm type and group combinations by count:") print(type_group_stats.head(10)) return { 'count_by_type_sensor': count_by_type_sensor, 'count_by_sensor': count_by_sensor, 'count_by_group': count_by_group if 'Sensor_Group' in self.processed_events.columns else pd.DataFrame(), 'duration_stats': duration_stats, 'overall_duration_stats': overall_duration_stats, 'all_sensor_stats': all_sensor_stats, 'group_duration_stats': group_duration_stats if 'Sensor_Group' in duration_events.columns else pd.DataFrame() } else: print("No resolved events with duration data available for analysis.") return None def advanced_analysis(self): """ Perform advanced analysis including: - Time-based analysis - MTBF (Mean Time Between Failures) - Alarm correlation analysis - Severity analysis """ print("\n--- ADVANCED ANALYSIS ---") if self.processed_events is None or len(self.processed_events) == 0: print("No processed events available for analysis. Run pair_events_and_calculate_durations first.") return # Filter resolved events for time-based analysis duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy() if len(duration_events) == 0: print("No resolved events with duration data available for advanced analysis.") return print("\n1. TIME-BASED ANALYSIS") # Extract time components duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name() duration_events['Start_Date'] = duration_events['Start_Time'].dt.date # Alarm frequency by hour of day hourly_freq = duration_events.groupby('Start_Hour').size() print(f"\nAlarm frequency by hour of day (Top 5):") print(hourly_freq.sort_values(ascending=False).head()) # Alarm frequency by day of week daily_freq = duration_events.groupby('Start_DayOfWeek').size() print(f"\nAlarm frequency by day of week:") print(daily_freq) # Alarm frequency by date daily_count = duration_events.groupby('Start_Date').size() print(f"\nTotal alarms per day (last 5 days):") print(daily_count.tail()) print("\n2. MTBF (MEAN TIME BETWEEN FAILURES) ANALYSIS") # Calculate MTBF for each sensor # MTBF = Total operational time / Number of failures # For each sensor, we'll calculate the time between consecutive alarm starts mtbf_data = [] for sensor_id in duration_events['Sensor_Id'].unique(): sensor_events = duration_events[duration_events['Sensor_Id'] == sensor_id].sort_values('Start_Time') if len(sensor_events) > 1: # Calculate time between consecutive alarm starts sensor_events = sensor_events.copy() sensor_events['Time_Between_Alerts'] = sensor_events['Start_Time'].diff().dt.total_seconds() / 3600 # in hours # Calculate MTBF (mean time between consecutive alarms) mtbf_hours = sensor_events['Time_Between_Alerts'].mean() # Calculate total operational time (from first to last alarm) total_op_time = (sensor_events['Start_Time'].max() - sensor_events['Start_Time'].min()).total_seconds() / 3600 num_alarms = len(sensor_events) # MTBF = total operational time / number of alarms mtbf_by_total_time = total_op_time / num_alarms if num_alarms > 0 else 0 mtbf_data.append({ 'Sensor_Id': sensor_id, 'MTBF_Hours_By_Consecutive': mtbf_hours, 'MTBF_Hours_By_Total_Time': mtbf_by_total_time, 'Total_Alerts': num_alarms, 'Total_Op_Time_Hours': total_op_time }) mtbf_df = pd.DataFrame(mtbf_data).sort_values('MTBF_Hours_By_Total_Time', ascending=False) print("\nTop 10 sensors by MTBF (Mean Time Between Failures):") print(mtbf_df.head(10)) print("\n3. ALARM CORRELATION ANALYSIS") # Find sensors that frequently alarm together (within a time window) # Group events by time windows (e.g., 1 hour) and see which sensors alarm together duration_events['Time_Window'] = duration_events['Start_Time'].dt.floor('H') time_window_groups = duration_events.groupby('Time_Window')['Sensor_Id'].apply(list).reset_index() # Count how many times each pair of sensors alarms together correlation_data = [] for _, row in time_window_groups.iterrows(): sensors = row['Sensor_Id'] if len(sensors) > 1: # Get all pairs of sensors that alarmed in this time window for i in range(len(sensors)): for j in range(i+1, len(sensors)): sensor1, sensor2 = sensors[i], sensors[j] correlation_data.append({ 'Sensor1': sensor1, 'Sensor2': sensor2, 'Time_Window': row['Time_Window'] }) if correlation_data: correlation_df = pd.DataFrame(correlation_data) correlation_counts = correlation_df.groupby(['Sensor1', 'Sensor2']).size().reset_index(name='Count') correlation_counts = correlation_counts.sort_values('Count', ascending=False) print("\nTop 10 sensor pairs that alarm together frequently:") print(correlation_counts.head(10)) else: print("\nNo correlated alarms found in the same time windows.") print("\n4. SEVERITY ANALYSIS") # Weighted scoring based on alarm type and duration # Error: weight 3, Alarm: weight 2, Warning: weight 1 severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1} duration_events['Severity_Score'] = duration_events.apply( lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1 ) # Total severity by sensor severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].agg([ 'sum', 'mean', 'count' ]).round(2).reset_index() severity_by_sensor = severity_by_sensor.sort_values('sum', ascending=False) print("\nTop 10 sensors by total severity score:") print(severity_by_sensor.head(10)) print("\n5. ALARM ESCALATION ANALYSIS") # Count how many warnings escalate to alarms for each sensor # For this analysis, we'll look for cases where a warning is followed by an alarm for the same sensor escalation_data = [] # Group by sensor and sort by time for sensor_id in self.alarm_data['Sensor_Id'].unique(): sensor_alarms = self.alarm_data[self.alarm_data['Sensor_Id'] == sensor_id].sort_values('Date') for i in range(len(sensor_alarms) - 1): current = sensor_alarms.iloc[i] next_event = sensor_alarms.iloc[i + 1] # Check if current is warning and next is alarm (not Normal) if current['AlarmType'] == 'Warning' and next_event['AlarmType'] in ['Alarm', 'Error']: time_diff = (next_event['Date'] - current['Date']).total_seconds() / 60 # in minutes if time_diff <= 60: # Within 1 hour # Get sensor group information sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_group = sensor_info.get('group', 'Unknown') escalation_data.append({ 'Sensor_Id': sensor_id, 'Sensor_Group': sensor_group, 'Warning_Time': current['Date'], 'Escalation_Type': next_event['AlarmType'], 'Time_To_Escalation_Minutes': time_diff }) if escalation_data: escalation_df = pd.DataFrame(escalation_data) escalation_counts = escalation_df.groupby('Sensor_Id').size().reset_index(name='Escalation_Count') escalation_counts = escalation_counts.sort_values('Escalation_Count', ascending=False) print(f"\nTotal escalations found: {len(escalation_df)}") print("Top 10 sensors with most escalations (Warning -> Alarm/Error):") print(escalation_counts.head(10)) # Group-based escalation analysis if 'Sensor_Group' in escalation_df.columns: escalation_by_group = escalation_df.groupby('Sensor_Group').size().reset_index(name='Escalation_Count') escalation_by_group = escalation_by_group.sort_values('Escalation_Count', ascending=False) print("\nTop 10 groups with most escalations (Warning -> Alarm/Error):") print(escalation_by_group.head(10)) else: print("\nNo alarm escalations found in the data.") print("\n6. GROUP-BASED ANALYSIS") # Group-based statistics if sensor groups are available if 'Sensor_Group' in duration_events.columns: print("\n--- GROUP-BASED ANALYSIS ---") # Count alarms by group group_counts = duration_events.groupby('Sensor_Group').size().reset_index(name='Alarm_Count') group_counts = group_counts.sort_values('Alarm_Count', ascending=False) print("\nTop 10 groups by alarm count:") print(group_counts.head(10)) # Calculate MTBF by group mtbf_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].agg([ 'count', 'mean', 'min', 'max' ]).round(2).sort_values('count', ascending=False) print("\nMTBF statistics by group (top 10 by count):") print(mtbf_by_group.head(10)) # Severity by group severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].agg([ 'sum', 'mean', 'count' ]).round(2).sort_values('sum', ascending=False) print("\nTop 10 groups by total severity score:") print(severity_by_group.head(10)) # Alarm types by group alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().reset_index(name='Count') alarm_type_by_group = alarm_type_by_group.sort_values(['Sensor_Group', 'Count'], ascending=[True, False]) print("\nTop alarm types by group:") print(alarm_type_by_group.groupby('Sensor_Group').head(3)) return { 'hourly_frequency': hourly_freq, 'daily_frequency': daily_freq, 'mtbf_data': mtbf_df, 'correlation_data': correlation_counts if correlation_data else pd.DataFrame(), 'severity_analysis': severity_by_sensor, 'escalation_analysis': escalation_df if escalation_data else pd.DataFrame(), 'group_analysis': { 'group_counts': group_counts if 'Sensor_Group' in duration_events.columns else pd.DataFrame(), 'mtbf_by_group': mtbf_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(), 'severity_by_group': severity_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame(), 'alarm_type_by_group': alarm_type_by_group if 'Sensor_Group' in duration_events.columns else pd.DataFrame() } if 'Sensor_Group' in duration_events.columns else {} } def create_visualizations(self, save_plots=False, output_dir="plots"): """ Create visualizations for the alarm analysis """ print("\n--- CREATING VISUALIZATIONS ---") # Import visualization libraries only when needed try: plt, sns = _import_viz_libs() except ImportError: print("Matplotlib or seaborn not available. Skipping visualizations.") return if self.processed_events is None or len(self.processed_events) == 0: print("No processed events available for visualization. Run pair_events_and_calculate_durations first.") return # Create output directory if needed if save_plots: import os os.makedirs(output_dir, exist_ok=True) # Filter resolved events for visualization duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy() if len(duration_events) == 0: print("No resolved events with duration data available for visualization.") return # Extract time components for time-based analysis duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name() duration_events['Start_Date'] = duration_events['Start_Time'].dt.date # Set up the plotting style plt.style.use('default') sns.set_palette("husl") # 1. Alarm count by type fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('Alarm Analysis Dashboard', fontsize=16, fontweight='bold') # Alarm count by type alarm_type_counts = duration_events['Alarm_Type'].value_counts() axes[0, 0].bar(alarm_type_counts.index, alarm_type_counts.values) axes[0, 0].set_title('Alarm Count by Type') axes[0, 0].set_ylabel('Count') for i, v in enumerate(alarm_type_counts.values): axes[0, 0].text(i, v + v*0.01, str(v), ha='center', va='bottom') # Top 10 sensors by alarm count - with sensor names instead of IDs top_sensors = duration_events['Sensor_Id'].value_counts().head(10) sensor_names_for_plot = [] for sensor_id in top_sensors.index: sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', f'ID: {sensor_id}') sensor_group = sensor_info.get('group', 'Unknown') sensor_names_for_plot.append(f"{sensor_name}\n({sensor_group})") axes[0, 1].bar(range(len(top_sensors)), top_sensors.values) axes[0, 1].set_title('Top 10 Sensors by Alarm Count') axes[0, 1].set_ylabel('Count') axes[0, 1].set_xticks(range(len(top_sensors))) axes[0, 1].set_xticklabels(sensor_names_for_plot, rotation=45) for i, v in enumerate(top_sensors.values): axes[0, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom') # Alarm frequency by hour of day hourly_freq = duration_events.groupby('Start_Hour').size() axes[1, 0].plot(hourly_freq.index, hourly_freq.values, marker='o') axes[1, 0].set_title('Alarm Frequency by Hour of Day') axes[1, 0].set_xlabel('Hour of Day') axes[1, 0].set_ylabel('Number of Alarms') axes[1, 0].grid(True, alpha=0.3) # Alarm frequency by day of week day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0) axes[1, 1].bar(range(len(daily_freq)), daily_freq.values) axes[1, 1].set_title('Alarm Frequency by Day of Week') axes[1, 1].set_ylabel('Number of Alarms') axes[1, 1].set_xticks(range(len(daily_freq))) axes[1, 1].set_xticklabels([d[:3] for d in daily_freq.index], rotation=45) for i, v in enumerate(daily_freq.values): axes[1, 1].text(i, v + v*0.01, str(v), ha='center', va='bottom') plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/alarm_dashboard.png", dpi=300, bbox_inches='tight') plt.show() # 2. Duration analysis by alarm type plt.figure(figsize=(12, 6)) # Box plot of durations by alarm type plt.subplot(1, 2, 1) sns.boxplot(data=duration_events, x='Alarm_Type', y='Duration_Minutes') plt.title('Distribution of Alarm Durations by Type') plt.xlabel('Alarm Type') plt.ylabel('Duration (Minutes)') plt.yscale('log') # Log scale to better visualize the wide range of durations # Histogram of durations by alarm type plt.subplot(1, 2, 2) for alarm_type in duration_events['Alarm_Type'].unique(): subset = duration_events[duration_events['Alarm_Type'] == alarm_type] plt.hist(subset['Duration_Minutes'], alpha=0.6, label=alarm_type, bins=30) plt.title('Distribution of Alarm Durations by Type') plt.xlabel('Duration (Minutes)') plt.ylabel('Frequency') plt.legend() plt.yscale('log') # Log scale to better visualize the wide range of durations plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/duration_analysis.png", dpi=300, bbox_inches='tight') plt.show() # 3. Top sensors by various metrics fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('Top Sensors Analysis', fontsize=16, fontweight='bold') # Top 10 sensors by total alarms - with sensor names instead of IDs top_sensors_by_count = duration_events['Sensor_Id'].value_counts().head(10) sensor_names_for_plot = [] for sensor_id in top_sensors_by_count.index: sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', f'ID: {sensor_id}') sensor_group = sensor_info.get('group', 'Unknown') sensor_names_for_plot.append(f"{sensor_name} (Group: {sensor_group})") axes[0, 0].barh(range(len(top_sensors_by_count)), top_sensors_by_count.values) axes[0, 0].set_title('Top 10 Sensors by Total Alarm Count') axes[0, 0].set_xlabel('Number of Alarms') axes[0, 0].set_yticks(range(len(top_sensors_by_count))) axes[0, 0].set_yticklabels(sensor_names_for_plot) # Top 10 sensors by average duration - with sensor names instead of IDs avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).head(10) sensor_names_for_plot_avg = [] for sensor_id in avg_duration_by_sensor.index: sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', f'ID: {sensor_id}') sensor_group = sensor_info.get('group', 'Unknown') sensor_names_for_plot_avg.append(f"{sensor_name} (Group: {sensor_group})") axes[0, 1].barh(range(len(avg_duration_by_sensor)), avg_duration_by_sensor.values) axes[0, 1].set_title('Top 10 Sensors by Average Duration') axes[0, 1].set_xlabel('Average Duration (Minutes)') axes[0, 1].set_yticks(range(len(avg_duration_by_sensor))) axes[0, 1].set_yticklabels(sensor_names_for_plot_avg) # Top 10 sensors by max duration - with sensor names instead of IDs max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).head(10) sensor_names_for_plot_max = [] for sensor_id in max_duration_by_sensor.index: sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', f'ID: {sensor_id}') sensor_group = sensor_info.get('group', 'Unknown') sensor_names_for_plot_max.append(f"{sensor_name} (Group: {sensor_group})") axes[1, 0].barh(range(len(max_duration_by_sensor)), max_duration_by_sensor.values) axes[1, 0].set_title('Top 10 Sensors by Maximum Duration') axes[1, 0].set_xlabel('Maximum Duration (Minutes)') axes[1, 0].set_yticks(range(len(max_duration_by_sensor))) axes[1, 0].set_yticklabels(sensor_names_for_plot_max) # Top 10 sensors by total severity score - with sensor names instead of IDs severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1} duration_events['Severity_Score'] = duration_events.apply( lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1 ) severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).head(10) sensor_names_for_plot_severity = [] for sensor_id in severity_by_sensor.index: sensor_info = self.sensor_mapping.get(sensor_id, {}) sensor_name = sensor_info.get('name', f'ID: {sensor_id}') sensor_group = sensor_info.get('group', 'Unknown') sensor_names_for_plot_severity.append(f"{sensor_name} (Group: {sensor_group})") axes[1, 1].barh(range(len(severity_by_sensor)), severity_by_sensor.values) axes[1, 1].set_title('Top 10 Sensors by Total Severity Score') axes[1, 1].set_xlabel('Total Severity Score') axes[1, 1].set_yticks(range(len(severity_by_sensor))) axes[1, 1].set_yticklabels(sensor_names_for_plot_severity) plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/sensor_analysis.png", dpi=300, bbox_inches='tight') plt.show() # 4. Group-based visualizations if sensor groups are available if 'Sensor_Group' in duration_events.columns: print("\nCreating group-based visualizations...") # First group-based visualization - Dashboard with 4 plots fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('Group-Based Analysis Dashboard', fontsize=16, fontweight='bold') # Top 10 groups by alarm count top_groups_by_count = duration_events['Sensor_Group'].value_counts().head(10) axes[0, 0].barh(range(len(top_groups_by_count)), top_groups_by_count.values) axes[0, 0].set_title('Top 10 Groups by Total Alarm Count') axes[0, 0].set_xlabel('Number of Alarms') axes[0, 0].set_yticks(range(len(top_groups_by_count))) axes[0, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in top_groups_by_count.index]) # Top 10 groups by average duration avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).head(10) axes[0, 1].barh(range(len(avg_duration_by_group)), avg_duration_by_group.values) axes[0, 1].set_title('Top 10 Groups by Average Duration') axes[0, 1].set_xlabel('Average Duration (Minutes)') axes[0, 1].set_yticks(range(len(avg_duration_by_group))) axes[0, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in avg_duration_by_group.index]) # Top 10 groups by max duration max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).head(10) axes[1, 0].barh(range(len(max_duration_by_group)), max_duration_by_group.values) axes[1, 0].set_title('Top 10 Groups by Maximum Duration') axes[1, 0].set_xlabel('Maximum Duration (Minutes)') axes[1, 0].set_yticks(range(len(max_duration_by_group))) axes[1, 0].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in max_duration_by_group.index]) # Top 10 groups by total severity score severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1} duration_events['Severity_Score'] = duration_events.apply( lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1 ) severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).head(10) axes[1, 1].barh(range(len(severity_by_group)), severity_by_group.values) axes[1, 1].set_title('Top 10 Groups by Total Severity Score') axes[1, 1].set_xlabel('Total Severity Score') axes[1, 1].set_yticks(range(len(severity_by_group))) axes[1, 1].set_yticklabels([str(label)[:20] + '...' if len(str(label)) > 20 else str(label) for label in severity_by_group.index]) plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/group_analysis.png", dpi=300, bbox_inches='tight') plt.show() # Additional group-based visualizations print("Creating additional group-based visualizations...") # Group composition analysis - showing number of sensors per group if self.sensor_mapping: # Create a mapping of group to number of sensors group_to_sensor_count = {} for sensor_id, sensor_info in self.sensor_mapping.items(): group = sensor_info.get('group', 'Unknown') if group not in group_to_sensor_count: group_to_sensor_count[group] = 0 group_to_sensor_count[group] += 1 # Convert to dataframe and sort group_sensor_counts = pd.DataFrame( list(group_to_sensor_count.items()), columns=['Group', 'Sensor_Count'] ).sort_values('Sensor_Count', ascending=False).head(15) # Plot group composition plt.figure(figsize=(14, 8)) plt.barh(range(len(group_sensor_counts)), group_sensor_counts['Sensor_Count']) plt.title('Sensor Count by Group (Top 15 Groups)') plt.xlabel('Number of Sensors in Group') plt.ylabel('Group') plt.yticks(range(len(group_sensor_counts)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in group_sensor_counts['Group']]) for i, v in enumerate(group_sensor_counts['Sensor_Count']): plt.text(v + v*0.01, i, str(v), va='center') plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/group_composition.png", dpi=300, bbox_inches='tight') plt.show() # Alarm type distribution by group (stacked bar chart) if len(duration_events) > 0: alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0) top_10_groups = duration_events['Sensor_Group'].value_counts().head(10).index alarm_type_by_group_top = alarm_type_by_group.loc[top_10_groups] # Create stacked bar chart ax = alarm_type_by_group_top.plot(kind='barh', stacked=True, figsize=(14, 8)) plt.title('Alarm Type Distribution by Group (Top 10 Groups)') plt.xlabel('Number of Alarms') plt.ylabel('Group') plt.legend(title='Alarm Type', bbox_to_anchor=(1.05, 1), loc='upper left') plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/alarm_type_by_group.png", dpi=300, bbox_inches='tight') plt.show() # Group alarm intensity: alarms per sensor in each group if self.sensor_mapping: # Calculate alarms per sensor for each group alarms_per_sensor_by_group = duration_events.groupby('Sensor_Group')['Sensor_Id'].nunique().to_dict() # Calculate total sensors per group from mapping group_to_sensor_count = {} for sensor_id, sensor_info in self.sensor_mapping.items(): group = sensor_info.get('group', 'Unknown') if group not in group_to_sensor_count: group_to_sensor_count[group] = 0 group_to_sensor_count[group] += 1 # Calculate alarms per sensor ratio group_alarm_intensity = {} for group in set(duration_events['Sensor_Group'].unique()): total_alarms = len(duration_events[duration_events['Sensor_Group'] == group]) total_sensors = group_to_sensor_count.get(group, 1) # Avoid division by zero group_alarm_intensity[group] = total_alarms / total_sensors # Convert to DataFrame and sort intensity_df = pd.DataFrame( list(group_alarm_intensity.items()), columns=['Group', 'Alarms_Per_Sensor'] ).sort_values('Alarms_Per_Sensor', ascending=False).head(15) # Plot alarm intensity plt.figure(figsize=(14, 8)) plt.barh(range(len(intensity_df)), intensity_df['Alarms_Per_Sensor']) plt.title('Alarm Intensity: Alarms per Sensor by Group (Top 15 Groups)') plt.xlabel('Average Alarms per Sensor') plt.ylabel('Group') plt.yticks(range(len(intensity_df)), [str(label)[:30] + '...' if len(str(label)) > 30 else str(label) for label in intensity_df['Group']]) for i, v in enumerate(intensity_df['Alarms_Per_Sensor']): plt.text(v + v*0.01, i, f"{v:.2f}", va='center') plt.tight_layout() if save_plots: plt.savefig(f"{output_dir}/group_alarm_intensity.png", dpi=300, bbox_inches='tight') plt.show() print("Visualizations created successfully!") if save_plots: print(f"Plots saved to '{output_dir}' directory.") def calculate_uptime_metrics(self): """ Calculate uptime/downtime metrics based on two approaches: 1. Error duration as downtime (communication errors) 2. Alarm/Warning duration as downtime (operational issues) """ print("\n--- CALCULATING UPTIME/DOWNTIME METRICS ---") if self.processed_events is None or len(self.processed_events) == 0: print("No processed events available for uptime calculation. Run pair_events_and_calculate_durations first.") return None # Filter out unresolved events for duration analysis duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy() if len(duration_events) == 0: print("No resolved events with duration data available for uptime calculation.") return None # Calculate total time span from the alarm data (not just processed events) total_start_time = self.alarm_data['Date'].min() total_end_time = self.alarm_data['Date'].max() total_time_span_minutes = (total_end_time - total_start_time).total_seconds() / 60.0 print(f"Total time period: {total_start_time} to {total_end_time}") print(f"Total time span: {total_time_span_minutes:.2f} minutes ({total_time_span_minutes/60:.2f} hours)") # 1. Calculate error-based downtime (communication errors) error_events = duration_events[duration_events['Alarm_Type'] == 'Error'].copy() total_error_duration = error_events['Duration_Minutes'].sum() # Calculate error-based system downtime percentage (for communication errors) # This represents the total time spent in error state across all sensors error_downtime_percentage = (total_error_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0 print(f"\nError-based downtime (communication errors):") print(f" Total error duration across all sensors: {total_error_duration:.2f} minutes") print(f" Error downtime percentage (cumulative): {error_downtime_percentage:.4f}%") print(f" Error uptime percentage (communication): {100 - error_downtime_percentage:.4f}%") # 2. Calculate alarm/warning-based downtime (operational issues) alarm_warning_events = duration_events[duration_events['Alarm_Type'].isin(['Alarm', 'Warning'])].copy() total_alarm_warning_duration = alarm_warning_events['Duration_Minutes'].sum() # Calculate alarm/warning-based system downtime percentage alarm_warning_downtime_percentage = (total_alarm_warning_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0 print(f"\nAlarm/Warning-based downtime (operational issues):") print(f" Total alarm/warning duration across all sensors: {total_alarm_warning_duration:.2f} minutes") print(f" Alarm/Warning downtime percentage (cumulative): {alarm_warning_downtime_percentage:.4f}%") print(f" Alarm/Warning uptime percentage (operational): {100 - alarm_warning_downtime_percentage:.4f}%") # 3. Combined downtime (Error + Alarm + Warning) total_operational_duration = total_error_duration + total_alarm_warning_duration combined_downtime_percentage = (total_operational_duration / total_time_span_minutes) * 100 if total_time_span_minutes > 0 else 0 print(f"\nCombined system downtime:") print(f" Total combined duration across all sensors: {total_operational_duration:.2f} minutes") print(f" Combined downtime percentage (cumulative): {combined_downtime_percentage:.4f}%") print(f" Combined uptime percentage: {100 - combined_downtime_percentage:.4f}%") # Calculate more meaningful system-level uptime metrics # For this, we'll calculate the percentage of sensors in error/alarm state over time print(f"\n--- ADDITIONAL SYSTEM-LEVEL UPTIME METRICS ---") # Calculate time-bucketed system uptime (in 1-hour intervals) try: # Create time buckets and determine if any sensor was in error/alarm state in each bucket all_events = self.alarm_data.copy() all_events = all_events.sort_values('Date') # Create time buckets (1 hour each) time_buckets = pd.date_range( start=total_start_time.floor('H'), end=total_end_time.ceil('H'), freq='H' ) # For each time bucket, calculate if there were any errors or alarm/warnings active error_buckets = [] alarm_warning_buckets = [] for i in range(len(time_buckets)-1): bucket_start = time_buckets[i] bucket_end = time_buckets[i+1] # Find events that overlap with this time bucket bucket_events = all_events[ (all_events['Date'] >= bucket_start) & (all_events['Date'] < bucket_end) ] # Count how many error and alarm/warning events occurred in this bucket error_count = len(bucket_events[bucket_events['AlarmType'] == 'Error']) alarm_warning_count = len(bucket_events[bucket_events['AlarmType'].isin(['Alarm', 'Warning'])]) error_buckets.append(1 if error_count > 0 else 0) alarm_warning_buckets.append(1 if alarm_warning_count > 0 else 0) # Calculate percentage of time buckets with errors or alarm/warnings if len(error_buckets) > 0: system_error_uptime_percentage = 100 - (sum(error_buckets) / len(error_buckets) * 100) else: system_error_uptime_percentage = 100.0 if len(alarm_warning_buckets) > 0: system_alarm_warning_uptime_percentage = 100 - (sum(alarm_warning_buckets) / len(alarm_warning_buckets) * 100) else: system_alarm_warning_uptime_percentage = 100.0 print(f"System-level error uptime (time-based): {system_error_uptime_percentage:.4f}%") print(f"System-level alarm/warning uptime (time-based): {system_alarm_warning_uptime_percentage:.4f}%") except Exception as e: print(f"Could not calculate time-based system uptime metrics: {e}") system_error_uptime_percentage = None system_alarm_warning_uptime_percentage = None # For more meaningful individual sensor uptime, calculate based on the total monitoring time for each sensor # Calculate total monitoring time per sensor based on first and last alarm occurrence sensor_monitoring_time = self.alarm_data.groupby('Sensor_Id').agg({ 'Date': ['min', 'max'] }) sensor_monitoring_time.columns = ['First_Alarm_Time', 'Last_Alarm_Time'] sensor_monitoring_time['Total_Monitoring_Minutes'] = ( (sensor_monitoring_time['Last_Alarm_Time'] - sensor_monitoring_time['First_Alarm_Time']).dt.total_seconds() / 60.0 ) # Ensure minimum monitoring time (at least the time between first and last alarm for that sensor) sensor_monitoring_time['Total_Monitoring_Minutes'] = sensor_monitoring_time['Total_Monitoring_Minutes'].apply( lambda x: max(x, total_time_span_minutes / len(self.alarm_data['Sensor_Id'].unique())) # fallback to avg if needed ) # Calculate per-sensor metrics with proper uptime percentages print(f"\n--- PER-SENSOR UPTIME/DOWNTIME METRICS ---") # Error-based per-sensor metrics error_by_sensor = error_events.groupby('Sensor_Id').agg({ 'Duration_Minutes': ['sum', 'count', 'mean'], 'Start_Time': ['min', 'max'] }).round(2) error_by_sensor.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'First_Error_Time', 'Last_Error_Time'] # Get all unique sensor IDs from the alarm data all_sensors = set(self.alarm_data['Sensor_Id'].unique()) # Calculate downtime percentage based on the total time span across ALL data # Create a complete dataframe with all sensors, including those with 0 errors all_sensors_df = pd.DataFrame(index=list(all_sensors)) error_by_sensor_complete = all_sensors_df.join(error_by_sensor, how='left') # Fill NaN values with 0 for sensors with no errors error_by_sensor_complete = error_by_sensor_complete.fillna({ 'Total_Error_Duration': 0, 'Error_Count': 0, 'Avg_Error_Duration': 0, 'First_Error_Time': pd.NaT, 'Last_Error_Time': pd.NaT }) # Calculate downtime percentage using the total time span for all data error_by_sensor_complete['Error_Downtime_Percentage'] = ( (error_by_sensor_complete['Total_Error_Duration'] / total_time_span_minutes) * 100 ).round(4) # Cap the downtime percentage at 100% to avoid impossible negative uptime values error_by_sensor_complete['Error_Downtime_Percentage'] = error_by_sensor_complete['Error_Downtime_Percentage'].apply( lambda x: min(100.0, x) ) error_by_sensor_complete['Error_Uptime_Percentage'] = ( 100 - error_by_sensor_complete['Error_Downtime_Percentage'] ).round(4) # Ensure uptime doesn't go below 0 error_by_sensor_complete['Error_Uptime_Percentage'] = error_by_sensor_complete['Error_Uptime_Percentage'].apply( lambda x: max(0, round(x, 4)) ) # Update error_by_sensor with the complete data error_by_sensor = error_by_sensor_complete # Alarm/Warning-based per-sensor metrics alarm_warning_by_sensor_raw = alarm_warning_events.groupby('Sensor_Id').agg({ 'Duration_Minutes': ['sum', 'count', 'mean'], 'Start_Time': ['min', 'max'] }).round(2) alarm_warning_by_sensor_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'First_Alarm_Warning_Time', 'Last_Alarm_Warning_Time'] # Create a complete dataframe with all sensors, including those with 0 alarm/warnings alarm_warning_by_sensor = all_sensors_df.join(alarm_warning_by_sensor_raw, how='left') # Fill NaN values with 0 for sensors with no alarm/warnings alarm_warning_by_sensor = alarm_warning_by_sensor.fillna({ 'Total_Alarm_Warning_Duration': 0, 'Alarm_Warning_Count': 0, 'Avg_Alarm_Warning_Duration': 0, 'First_Alarm_Warning_Time': pd.NaT, 'Last_Alarm_Warning_Time': pd.NaT }) # Calculate downtime percentage based on the total time span across ALL data # Calculate downtime percentage using the total time span for all data alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = ( (alarm_warning_by_sensor['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100 ).round(4) # Cap the downtime percentage at 100% to avoid impossible negative uptime values alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'].apply( lambda x: min(100.0, x) ) alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = ( 100 - alarm_warning_by_sensor['Alarm_Warning_Downtime_Percentage'] ).round(4) # Ensure uptime doesn't go below 0 alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_sensor['Alarm_Warning_Uptime_Percentage'].apply( lambda x: max(0, round(x, 4)) ) # Add sensor names and groups to per-sensor metrics if self.sensor_mapping: for df in [error_by_sensor, alarm_warning_by_sensor]: if len(df) > 0: df['Sensor_Name'] = df.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) df['Sensor_Group'] = df.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in df.columns if col not in ['Sensor_Name', 'Sensor_Group']] df = df[cols] # 5. Per-group metrics print(f"\n--- PER-GROUP UPTIME/DOWNTIME METRICS ---") if 'Sensor_Group' in duration_events.columns: # Get all unique sensor groups from the alarm data all_groups = set(self.alarm_data['Sensor_Group'].unique()) # Error-based per-group metrics error_by_group_raw = error_events.groupby('Sensor_Group').agg({ 'Duration_Minutes': ['sum', 'count', 'mean'], 'Sensor_Id': 'nunique' }).round(2) error_by_group_raw.columns = ['Total_Error_Duration', 'Error_Count', 'Avg_Error_Duration', 'Unique_Sensors_With_Errors'] # Create a complete dataframe with all groups, including those with 0 errors all_groups_df = pd.DataFrame(index=list(all_groups)) error_by_group = all_groups_df.join(error_by_group_raw, how='left') # Fill NaN values with 0 for groups with no errors error_by_group = error_by_group.fillna({ 'Total_Error_Duration': 0, 'Error_Count': 0, 'Avg_Error_Duration': 0, 'Unique_Sensors_With_Errors': 0 }) # Calculate downtime percentage based on the total time span across ALL groups error_by_group['Error_Downtime_Percentage'] = ( (error_by_group['Total_Error_Duration'] / total_time_span_minutes) * 100 ).round(4) # Cap the downtime percentage at 100% to avoid impossible negative uptime values error_by_group['Error_Downtime_Percentage'] = error_by_group['Error_Downtime_Percentage'].apply( lambda x: min(100.0, x) ) error_by_group['Error_Uptime_Percentage'] = ( 100 - error_by_group['Error_Downtime_Percentage'] ).round(4) # Ensure uptime doesn't go below 0 error_by_group['Error_Uptime_Percentage'] = error_by_group['Error_Uptime_Percentage'].apply( lambda x: max(0, round(x, 4)) ) # Alarm/Warning-based per-group metrics alarm_warning_by_group_raw = alarm_warning_events.groupby('Sensor_Group').agg({ 'Duration_Minutes': ['sum', 'count', 'mean'], 'Sensor_Id': 'nunique' }).round(2) alarm_warning_by_group_raw.columns = ['Total_Alarm_Warning_Duration', 'Alarm_Warning_Count', 'Avg_Alarm_Warning_Duration', 'Unique_Sensors_With_Alarm_Warning'] # Create a complete dataframe with all groups, including those with no alarm/warnings alarm_warning_by_group = all_groups_df.join(alarm_warning_by_group_raw, how='left') # Fill NaN values with 0 for groups with no alarm/warnings alarm_warning_by_group = alarm_warning_by_group.fillna({ 'Total_Alarm_Warning_Duration': 0, 'Alarm_Warning_Count': 0, 'Avg_Alarm_Warning_Duration': 0, 'Unique_Sensors_With_Alarm_Warning': 0 }) # Calculate downtime percentage based on the total time span across ALL groups alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = ( (alarm_warning_by_group['Total_Alarm_Warning_Duration'] / total_time_span_minutes) * 100 ).round(4) # Cap the downtime percentage at 100% to avoid impossible negative uptime values alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'].apply( lambda x: min(100.0, x) ) alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = ( 100 - alarm_warning_by_group['Alarm_Warning_Downtime_Percentage'] ).round(4) # Ensure uptime doesn't go below 0 alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'] = alarm_warning_by_group['Alarm_Warning_Uptime_Percentage'].apply( lambda x: max(0, round(x, 4)) ) else: error_by_group = pd.DataFrame() alarm_warning_by_group = pd.DataFrame() # Compile all results uptime_results = { 'total_time_span_minutes': total_time_span_minutes, 'total_time_span_hours': total_time_span_minutes / 60, 'total_start_time': total_start_time, 'total_end_time': total_end_time, # System-wide metrics 'error_downtime_minutes': total_error_duration, 'error_downtime_percentage': error_downtime_percentage, 'error_uptime_percentage': 100 - error_downtime_percentage, 'alarm_warning_downtime_minutes': total_alarm_warning_duration, 'alarm_warning_downtime_percentage': alarm_warning_downtime_percentage, 'alarm_warning_uptime_percentage': 100 - alarm_warning_downtime_percentage, 'combined_downtime_minutes': total_operational_duration, 'combined_downtime_percentage': combined_downtime_percentage, 'combined_uptime_percentage': 100 - combined_downtime_percentage, # System-level metrics 'system_error_uptime_percentage': system_error_uptime_percentage, 'system_alarm_warning_uptime_percentage': system_alarm_warning_uptime_percentage, # Per-sensor metrics 'error_by_sensor': error_by_sensor, 'alarm_warning_by_sensor': alarm_warning_by_sensor, # Per-group metrics 'error_by_group': error_by_group, 'alarm_warning_by_group': alarm_warning_by_group } return uptime_results def export_results(self, output_dir="output"): """ Export analysis results to CSV files """ print("\n--- EXPORTING RESULTS ---") import os os.makedirs(output_dir, exist_ok=True) if self.processed_events is None or len(self.processed_events) == 0: print("No processed events available for export. Run pair_events_and_calculate_durations first.") return # Filter resolved events for export duration_events = self.processed_events[self.processed_events['Duration_Minutes'].notna()].copy() if len(duration_events) == 0: print("No resolved events with duration data available for export.") return # 1. Export all paired events paired_events_path = os.path.join(output_dir, "paired_alarm_events.csv") duration_events.to_csv(paired_events_path, index=False) print(f"Exported paired alarm events to: {paired_events_path}") # 2. Export summary by alarm type summary_by_type = duration_events.groupby('Alarm_Type').agg({ 'Duration_Minutes': ['count', 'min', 'max', 'mean'], 'Sensor_Id': 'nunique' }).round(2) summary_by_type.columns = ['Event_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Unique_Sensors'] summary_by_type_path = os.path.join(output_dir, "summary_by_alarm_type.csv") summary_by_type.to_csv(summary_by_type_path) print(f"Exported summary by alarm type to: {summary_by_type_path}") # 3. Export sensor statistics sensor_stats = duration_events.groupby('Sensor_Id').agg({ 'Alarm_Type': ['count', 'nunique'], 'Duration_Minutes': ['min', 'max', 'mean'], 'Start_Time': ['min', 'max'] }).round(2) sensor_stats.columns = ['Total_Alarm_Count', 'Alarm_Type_Count', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'First_Alarm', 'Last_Alarm'] sensor_stats = sensor_stats.sort_values('Total_Alarm_Count', ascending=False) # Add Group and Name information to sensor statistics if self.sensor_mapping: sensor_stats['Sensor_Name'] = sensor_stats.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) sensor_stats['Sensor_Group'] = sensor_stats.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group'] + [col for col in sensor_stats.columns if col not in ['Sensor_Name', 'Sensor_Group']] sensor_stats = sensor_stats[cols] sensor_stats_path = os.path.join(output_dir, "sensor_statistics.csv") sensor_stats.to_csv(sensor_stats_path) print(f"Exported sensor statistics to: {sensor_stats_path}") # 4. Export top sensors by various metrics # Top sensors by alarm count top_by_count = duration_events['Sensor_Id'].value_counts().to_frame('Alarm_Count') # Add Group and Name information to top sensors by alarm count if self.sensor_mapping: top_by_count['Sensor_Name'] = top_by_count.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) top_by_count['Sensor_Group'] = top_by_count.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group', 'Alarm_Count'] top_by_count = top_by_count[cols] top_by_count_path = os.path.join(output_dir, "top_sensors_by_alarm_count.csv") top_by_count.to_csv(top_by_count_path) print(f"Exported top sensors by alarm count to: {top_by_count_path}") # Top sensors by average duration avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration') # Add Group and Name information to top sensors by average duration if self.sensor_mapping: avg_duration_by_sensor['Sensor_Name'] = avg_duration_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) avg_duration_by_sensor['Sensor_Group'] = avg_duration_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group', 'Avg_Duration'] avg_duration_by_sensor = avg_duration_by_sensor[cols] avg_duration_path = os.path.join(output_dir, "top_sensors_by_avg_duration.csv") avg_duration_by_sensor.to_csv(avg_duration_path) print(f"Exported top sensors by average duration to: {avg_duration_path}") # Top sensors by max duration max_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration') # Add Group and Name information to top sensors by max duration if self.sensor_mapping: max_duration_by_sensor['Sensor_Name'] = max_duration_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) max_duration_by_sensor['Sensor_Group'] = max_duration_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group', 'Max_Duration'] max_duration_by_sensor = max_duration_by_sensor[cols] max_duration_path = os.path.join(output_dir, "top_sensors_by_max_duration.csv") max_duration_by_sensor.to_csv(max_duration_path) print(f"Exported top sensors by max duration to: {max_duration_path}") # Top sensors by total severity score severity_weights = {'Error': 3, 'Alarm': 2, 'Warning': 1} duration_events['Severity_Score'] = duration_events.apply( lambda row: severity_weights.get(row['Alarm_Type'], 1) * row['Duration_Minutes'], axis=1 ) severity_by_sensor = duration_events.groupby('Sensor_Id')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score') # Add Group and Name information to top sensors by severity score if self.sensor_mapping: severity_by_sensor['Sensor_Name'] = severity_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('name', 'Unknown') ) severity_by_sensor['Sensor_Group'] = severity_by_sensor.index.map( lambda x: self.sensor_mapping.get(x, {}).get('group', 'Unknown') ) # Reorder columns to put Sensor_Id, Name, Group first cols = ['Sensor_Name', 'Sensor_Group', 'Total_Severity_Score'] severity_by_sensor = severity_by_sensor[cols] severity_path = os.path.join(output_dir, "top_sensors_by_severity_score.csv") severity_by_sensor.to_csv(severity_path) print(f"Exported top sensors by severity score to: {severity_path}") # 5. Export time-based analysis duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name() duration_events['Start_Date'] = duration_events['Start_Time'].dt.date # Hourly frequency hourly_freq = duration_events.groupby('Start_Hour').size().to_frame('Alarm_Count') hourly_path = os.path.join(output_dir, "alarm_frequency_by_hour.csv") hourly_freq.to_csv(hourly_path) print(f"Exported alarm frequency by hour to: {hourly_path}") # Daily frequency day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] daily_freq = duration_events.groupby('Start_DayOfWeek').size().reindex(day_order, fill_value=0).to_frame('Alarm_Count') daily_path = os.path.join(output_dir, "alarm_frequency_by_day.csv") daily_freq.to_csv(daily_path) print(f"Exported alarm frequency by day to: {daily_path}") # 6. Export group-based analysis if sensor groups are available if 'Sensor_Group' in duration_events.columns: print("\nExporting group-based analysis...") # Group statistics group_stats = duration_events.groupby('Sensor_Group').agg({ 'Sensor_Id': ['count', 'nunique'], 'Duration_Minutes': ['min', 'max', 'mean', 'sum'], 'Start_Time': ['min', 'max'] }).round(2) group_stats.columns = ['Total_Alarm_Count', 'Unique_Sensors', 'Min_Duration', 'Max_Duration', 'Avg_Duration', 'Total_Duration', 'First_Alarm', 'Last_Alarm'] group_stats = group_stats.sort_values('Total_Alarm_Count', ascending=False) # Calculate total sensors per group from sensor report if self.sensor_data is not None and 'Group' in self.sensor_data.columns: # Get the total number of sensors in each group from the sensor report # Process the sensor data to handle hierarchical structure properly processed_sensor_data = self._process_hierarchical_sensor_data(self.sensor_data) # Count unique sensors per group (where ID is not null) sensor_counts_by_group = processed_sensor_data[processed_sensor_data['ID'].notna()].groupby('Group')['ID'].nunique().fillna(0).astype(int) # Add Total_Sensors_In_Group column group_stats['Total_Sensors_In_Group'] = group_stats.index.map( lambda x: sensor_counts_by_group.get(x, 0) if x != 'Unknown' else len(processed_sensor_data[((processed_sensor_data['Group'].isna()) | (processed_sensor_data['Group'] == 'Unknown')) & (processed_sensor_data['ID'].notna())]) ).fillna(0).astype(int) # Calculate percentage of monitoring points that experienced alarms # Avoid division by zero group_stats['Percentage_Monitoring_Points_Alarmed'] = ( (group_stats['Unique_Sensors'] / group_stats['Total_Sensors_In_Group']) * 100 ).round(2) group_stats['Percentage_Monitoring_Points_Alarmed'] = group_stats['Percentage_Monitoring_Points_Alarmed'].fillna(0).replace([np.inf, -np.inf], 0) # Calculate alarm time percentage for each group # Get the overall time range from alarm data first_alarm_overall = self.alarm_data['Date'].min() last_alarm_overall = self.alarm_data['Date'].max() if pd.notna(first_alarm_overall) and pd.notna(last_alarm_overall): total_time_span_hours = (last_alarm_overall - first_alarm_overall).total_seconds() / 3600.0 # Calculate the percentage of total possible sensor-hours that were in alarm # Total possible sensor-hours = total sensors in group * total time span total_possible_sensor_hours = group_stats['Total_Sensors_In_Group'] * total_time_span_hours # Actual alarm-hours = total alarm duration in hours actual_alarm_hours = group_stats['Total_Duration'] / 60.0 # Convert minutes to hours # Calculate percentage group_stats['Alarm_Time_Percentage'] = ( (actual_alarm_hours / total_possible_sensor_hours) * 100 ).round(2) group_stats['Alarm_Time_Percentage'] = group_stats['Alarm_Time_Percentage'].fillna(0).replace([np.inf, -np.inf], 0) else: group_stats['Alarm_Time_Percentage'] = 0.0 group_stats_path = os.path.join(output_dir, "group_statistics.csv") group_stats.to_csv(group_stats_path) print(f"Exported group statistics to: {group_stats_path}") # Top groups by various metrics # Top groups by alarm count top_groups_by_count = duration_events['Sensor_Group'].value_counts().to_frame('Alarm_Count') top_groups_count_path = os.path.join(output_dir, "top_groups_by_alarm_count.csv") top_groups_by_count.to_csv(top_groups_count_path) print(f"Exported top groups by alarm count to: {top_groups_count_path}") # Top groups by average duration avg_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].mean().sort_values(ascending=False).to_frame('Avg_Duration') avg_duration_group_path = os.path.join(output_dir, "top_groups_by_avg_duration.csv") avg_duration_by_group.to_csv(avg_duration_group_path) print(f"Exported top groups by average duration to: {avg_duration_group_path}") # Top groups by max duration max_duration_by_group = duration_events.groupby('Sensor_Group')['Duration_Minutes'].max().sort_values(ascending=False).to_frame('Max_Duration') max_duration_group_path = os.path.join(output_dir, "top_groups_by_max_duration.csv") max_duration_by_group.to_csv(max_duration_group_path) print(f"Exported top groups by max duration to: {max_duration_group_path}") # Top groups by total severity score severity_by_group = duration_events.groupby('Sensor_Group')['Severity_Score'].sum().sort_values(ascending=False).to_frame('Total_Severity_Score') severity_group_path = os.path.join(output_dir, "top_groups_by_severity_score.csv") severity_by_group.to_csv(severity_group_path) print(f"Exported top groups by severity score to: {severity_group_path}") # Alarm type distribution by group alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0) alarm_type_group_path = os.path.join(output_dir, "alarm_type_distribution_by_group.csv") alarm_type_by_group.to_csv(alarm_type_group_path) print(f"Exported alarm type distribution by group to: {alarm_type_group_path}") print(f"\nAll results exported to '{output_dir}' directory successfully!") def export_uptime_metrics(self, output_dir="output", uptime_results=None): """ Export uptime/downtime metrics to new output files """ print("\n--- EXPORTING UPTIME/DOWNTIME METRICS ---") import os os.makedirs(output_dir, exist_ok=True) # Calculate uptime metrics if not provided if uptime_results is None: uptime_results = self.calculate_uptime_metrics() if uptime_results is None: print("No uptime results to export.") return # Export system-wide summary summary_data = { 'Metric': [ 'Total_Time_Span_Minutes', 'Total_Time_Span_Hours', 'Total_Start_Time', 'Total_End_Time', 'Total_Error_Duration_Minutes', 'Error_Downtime_Percentage', 'Error_Uptime_Percentage', 'Total_Alarm_Warning_Duration_Minutes', 'Alarm_Warning_Downtime_Percentage', 'Alarm_Warning_Uptime_Percentage', 'Total_Combined_Duration_Minutes', 'Combined_Downtime_Percentage', 'Combined_Uptime_Percentage', 'System_Error_Uptime_Percentage_Time_Based', 'System_Alarm_Warning_Uptime_Percentage_Time_Based' ], 'Value': [ uptime_results['total_time_span_minutes'], uptime_results['total_time_span_hours'], uptime_results['total_start_time'], uptime_results['total_end_time'], uptime_results['error_downtime_minutes'], uptime_results['error_downtime_percentage'], uptime_results['error_uptime_percentage'], uptime_results['alarm_warning_downtime_minutes'], uptime_results['alarm_warning_downtime_percentage'], uptime_results['alarm_warning_uptime_percentage'], uptime_results['combined_downtime_minutes'], uptime_results['combined_downtime_percentage'], uptime_results['combined_uptime_percentage'], uptime_results.get('system_error_uptime_percentage', 'N/A'), uptime_results.get('system_alarm_warning_uptime_percentage', 'N/A') ] } summary_df = pd.DataFrame(summary_data) summary_path = os.path.join(output_dir, "system_uptime_summary.csv") summary_df.to_csv(summary_path, index=False) print(f"Exported system uptime summary to: {summary_path}") # Export per-sensor error metrics if not uptime_results['error_by_sensor'].empty: error_sensor_path = os.path.join(output_dir, "sensor_error_uptime_metrics.csv") uptime_results['error_by_sensor'].to_csv(error_sensor_path) print(f"Exported per-sensor error uptime metrics to: {error_sensor_path}") # Export per-sensor alarm/warning metrics if not uptime_results['alarm_warning_by_sensor'].empty: alarm_warning_sensor_path = os.path.join(output_dir, "sensor_alarm_warning_uptime_metrics.csv") uptime_results['alarm_warning_by_sensor'].to_csv(alarm_warning_sensor_path) print(f"Exported per-sensor alarm/warning uptime metrics to: {alarm_warning_sensor_path}") # Export per-group error metrics if not uptime_results['error_by_group'].empty: error_group_path = os.path.join(output_dir, "group_error_uptime_metrics.csv") # Reset index to make Sensor_Group a regular column error_by_group_df = uptime_results['error_by_group'].reset_index() # Make sure the index column is named properly error_by_group_df.columns = ['Sensor_Group'] + list(error_by_group_df.columns[1:]) error_by_group_df.to_csv(error_group_path, index=False) print(f"Exported per-group error uptime metrics to: {error_group_path}") # Export per-group alarm/warning metrics if not uptime_results['alarm_warning_by_group'].empty: alarm_warning_group_path = os.path.join(output_dir, "group_alarm_warning_uptime_metrics.csv") # Reset index to make Sensor_Group a regular column alarm_warning_by_group_df = uptime_results['alarm_warning_by_group'].reset_index() # Make sure the index column is named properly alarm_warning_by_group_df.columns = ['Sensor_Group'] + list(alarm_warning_by_group_df.columns[1:]) alarm_warning_by_group_df.to_csv(alarm_warning_group_path, index=False) print(f"Exported per-group alarm/warning uptime metrics to: {alarm_warning_group_path}") print(f"\nUptime/downtime metrics exported to '{output_dir}' directory successfully!") # Example usage if __name__ == "__main__": # Define file paths csv_file = "CardinalAlarmsDec25.csv" xlsx_file = "SensorReport Cardinal 2025-12-23_processed.xlsx" # Updated to the new file name exclusion_file = "exclusion_config.json" # Optional: specify groups to exclude # Create analyzer instance with exclusion file analyzer = AlarmAnalyzer(csv_file, xlsx_file, exclusion_file_path=exclusion_file) # Load data alarm_data, sensor_data = analyzer.load_data() # Categorize alarms categorized_data = analyzer.categorize_alarms() print("\nFirst few rows of categorized data:") print(categorized_data.head()) # Pair events and calculate durations paired_events = analyzer.pair_events_and_calculate_durations() print("\nFirst few rows of paired events:") print(paired_events.head()) # Perform basic analysis basic_results = analyzer.basic_analysis() # Perform advanced analysis advanced_results = analyzer.advanced_analysis() # Create visualizations analyzer.create_visualizations(save_plots=True) # Perform uptime analysis uptime_results = analyzer.calculate_uptime_metrics() # Export results analyzer.export_results(output_dir="output") # Export uptime metrics to new files analyzer.export_uptime_metrics(output_dir="output", uptime_results=uptime_results)