Initial commit: alarm analysis project

Python project for analyzing alarm data from building monitoring systems.
Includes alarm analyzer, plotting, tests, and source data files.
This commit is contained in:
2026-02-26 09:03:54 -05:00
commit f08a1a9bf5
25 changed files with 11350 additions and 0 deletions

25
.gitignore vendored Normal file
View File

@@ -0,0 +1,25 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
# Virtual environment
alarm_analysis_env/
# Generated output
output/
plots/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Claude Code
.claude/

48
CLAUDE.md Normal file
View File

@@ -0,0 +1,48 @@
# Alarm Analysis
Python project for analyzing alarm data from building monitoring systems (CSV alarm logs + XLSX sensor reports).
## Commands
```bash
# Activate virtual environment
source alarm_analysis_env/Scripts/activate # Windows Git Bash
# or: alarm_analysis_env\Scripts\activate # Windows CMD
# Run full analysis (no plots)
python run_analysis.py
# Generate plots (requires matplotlib display)
python create_plots.py
# Run tests
python test_changes.py
python test_duration_fix.py
python test_mapping.py
python test_enhanced_plotting.py
```
## Architecture
- `alarm_analyzer.py` — Main `AlarmAnalyzer` class (~96KB). Handles data loading, alarm categorization, event pairing, duration calculation, basic/advanced analysis, uptime metrics, and export.
- `run_analysis.py` — Entry point that runs the full pipeline without visualizations.
- `create_plots.py` — Generates alarm dashboard, duration analysis, and sensor analysis plots.
## Data Files
- `CardinalAlarmsDec25.csv` — Raw alarm data (columns: Alarm_Id, Sensor_Id, Date, Description, LogTime)
- `SensorReport Cardinal 2025-12-23_processed.xlsx` — Sensor descriptions and group mappings
- `exclusion_config.json` — JSON format: `{"excluded_groups": ["GroupName1"]}`
- `groups_to_skip.txt` — Text format: one group name per line
## Key Patterns
- Alarm types are parsed from Description field via regex: Hi/Lo Alarm, Hi/Lo Warning, Error, Normal
- Events are paired (alarm start -> Normal end) to calculate durations
- Sensor mapping links Sensor_Id to human-readable names and groups from the XLSX file
- Visualization imports are deferred (`_import_viz_libs()`) so analysis can run headless
- Output goes to `output/` (CSVs) and `plots/` (PNGs)
## Dependencies
Python 3.13 with: pandas, numpy, matplotlib, seaborn, openpyxl

7917
CardinalAlarmsDec25.csv Normal file

File diff suppressed because it is too large Load Diff

174
ProjectPlan.md Normal file
View File

@@ -0,0 +1,174 @@
# Alarm Data Analysis Project Plan
## Overview
This project will develop a Python script to analyze alarm data from CSV files, cross-referencing with sensor descriptions from an XLSX file. The script will provide comprehensive statistics and insights about alarm events across monitoring points.
## Data Structure Analysis
Based on the CSV file structure:
- **Alarm_Id**: Unique identifier for each alarm event
- **Sensor_Id**: Identifies the monitoring point
- **Date**: Timestamp when the alarm/warning/error occurred
- **Description**: Details about the alarm event (e.g., "Hi Alarm: 51.3>=46.0F", "Normal 42.5F", "Error: Comm Loss Error")
- **LogTime**: Timestamp when the event was logged
## Implementation Plan
### Phase 1: Data Loading and Preprocessing
1. Load the CSV alarm data using pandas
2. Load the sensor report XLSX file to get sensor descriptions
3. Parse alarm descriptions to categorize events (Normal, Alarm, Warning, Error)
4. Extract numeric values and thresholds from alarm descriptions
5. Identify alarm start and end events to calculate durations
### Phase 2: Data Processing and Pairing
1. Pair start events (Alarm/Warning/Error) with corresponding end events (Normal)
2. Calculate duration for each alarm event
3. Handle edge cases (unpaired events, overlapping events)
4. Create a structured dataset of complete alarm events
### Phase 3: Basic Analysis
1. Count alarm events by type (Alarm, Warning, Error) for each sensor
2. Calculate min/max/average duration for each alarm type per sensor
3. Generate summary statistics across all sensors
4. Identify most problematic sensors (highest number of events, longest durations)
### Phase 4: Advanced Analysis
1. Time-based analysis:
- Frequency of events by hour of day, day of week
- Trend analysis over time periods
- Seasonal patterns if data spans multiple months
2. Alarm correlation analysis:
- Identify sensors that frequently alarm together
- Determine if specific alarm types lead to others
3. Severity analysis:
- Weighted scoring based on alarm type and duration
- Ranking sensors by overall impact
### Phase 5: Additional Valuable Metrics
1. **MTBF (Mean Time Between Failures)**: Average time between consecutive alarm events for each sensor
2. **Alarm Churn**: Rate of alarm state changes for each sensor
3. **Recovery Time**: Time taken to return to normal state after an alarm
4. **Alarm Escalation**: Percentage of warnings that escalate to alarms
5. **Persistence Analysis**: How long alarms typically last before being resolved
6. **Peak Time Identification**: Time periods with highest alarm frequency
7. **False Alarm Rate**: Estimate of alarms that return to normal quickly
8. **Critical Sensor Identification**: Sensors with highest frequency of high-severity events
### Phase 6: Visualization and Reporting
1. Generate summary reports in console and optionally save to file
2. Create visualizations (matplotlib/seaborn):
- Bar charts for alarm counts by sensor and type
- Box plots for duration analysis
- Time series plots for alarm frequency over time
- Heatmaps for alarm correlation
3. Export detailed analysis results to CSV files
### Phase 7: Output and Export
1. Create summary tables showing:
- Sensor-wise breakdown of alarm types and durations
- Top N problematic sensors
- Time-based trends
2. Export processed data for further analysis
3. Generate a comprehensive report file
## Technical Implementation Details
### Libraries to Use:
- pandas: For data manipulation and analysis
- numpy: For numerical operations
- matplotlib/seaborn: For visualizations
- openpyxl: For reading XLSX files
- re: For parsing alarm descriptions
- datetime: For time-based analysis
### Data Processing Steps:
1. Parse alarm descriptions using regular expressions to identify:
- Alarm type (Hi/Lo Alarm/Warning, Error, Normal)
- Measured value
- Threshold value
- Unit of measurement
2. Create a mapping between Sensor_Id and sensor descriptions from XLSX
3. For each sensor, pair alarm start events with corresponding normal end events
4. Calculate duration between paired events
5. Aggregate statistics by sensor and alarm type
### Alarm Type Classification:
- **Error**: Events containing "Error" in description
- **Alarm**: Events containing "Alarm" but not "Warning"
- **Warning**: Events containing "Warning"
- **Normal**: Events indicating return to normal state
### Key Metrics to Calculate:
For each sensor:
- Count of each alarm type
- Min/Max/Average duration for each alarm type
- Total alarm time percentage
- Alarm frequency rate
- Average time to recovery
- Percentage of events that escalate
## Expected Deliverables
1. Main analysis script (alarm_analyzer.py)
2. Configuration file for customization
3. Sample output files demonstrating analysis results
4. Documentation on how to run the script and interpret results
5. ProjectPlan.md (this document)
## Enhanced Features Implemented
### Enhanced Group-Based Analysis
1. **Total Sensors Per Group**: Added the total number of sensors in each group according to the sensor report
2. **Alarm Coverage Percentage**: Added percentage of monitoring points that experienced alarms
3. **Alarm Time Percentage**: Added percentage of time the group's sensors spent in alarm condition
### Enhanced Output Files
1. All sensor-specific output files now include sensor names and group information:
- `sensor_statistics.csv`
- `top_sensors_by_alarm_count.csv`
- `top_sensors_by_avg_duration.csv`
- `top_sensors_by_max_duration.csv`
- `top_sensors_by_severity_score.csv`
### Enhanced Plotting Functionality
1. All sensor-specific plots now display sensor names instead of just IDs
2. Added comprehensive group-based visualizations:
- Group composition analysis
- Alarm type distribution by group
- Group alarm intensity metrics
## Enhanced Features Implemented
### Uptime/Downtime Metrics
1. **Error-based downtime**: Calculates the total duration of all "Error" events across all sensors as a percentage of the total time period
2. **Alarm/Warning-based downtime**: Calculates the total duration of all "Alarm" and "Warning" events across all sensors as a percentage of the total time period
3. **System-level uptime metrics**: Time-based calculation showing the percentage of time that any sensor was in error or alarm/warning state
4. **Per-sensor and per-group metrics**: Individual sensor and group uptime/downtime percentages
5. **New output files**:
- `system_uptime_summary.csv` - Overall system uptime metrics
- `sensor_error_uptime_metrics.csv` - Per-sensor error-based uptime metrics
- `sensor_alarm_warning_uptime_metrics.csv` - Per-sensor alarm/warning-based uptime metrics
- `group_error_uptime_metrics.csv` - Per-group error-based uptime metrics
- `group_alarm_warning_uptime_metrics.csv` - Per-group alarm/warning-based uptime metrics
6. **Comprehensive group inclusion**: All output files covering groups now include all groups, including those with 0 errors or warnings, allowing for identification of systems with 100% uptime
### Optional Group Exclusion Feature
1. **Create an optional configuration file** (e.g., `exclusion_config.json` or `groups_to_skip.txt`) that allows users to specify groups to exclude from analysis
2. **Implementation approach**:
- Add a new parameter to the AlarmAnalyzer class to accept an exclusion file path
- Parse the exclusion file to get a list of groups to skip
- Filter out sensor data belonging to excluded groups before analysis
- Add logging to indicate which groups were excluded
3. **Configuration file format options**:
- JSON format: `{"excluded_groups": ["GroupName1", "GroupName2"]}`
- Simple text format: one group name per line
- CSV format: for more complex exclusion rules
4. **Benefits**:
- Allows users to exclude groups with known issues or maintenance periods
- Provides cleaner analysis results when certain groups have anomalous data
- Maintains flexibility without permanently modifying the source data
5. **Implementation details**:
- Add preprocessing step to filter out excluded groups before any analysis
- Update all analysis functions to work with the filtered dataset
- Maintain separate statistics for excluded groups if needed for reference
## Future Enhancement Plans

245
README.md Normal file
View File

@@ -0,0 +1,245 @@
# Alarm Analysis
Analyze alarm data from building monitoring systems — pair alarm events, calculate durations, compute uptime metrics, and generate visualizations. Built for CSV alarm logs and XLSX sensor reports exported from systems like Cardinal.
## Table of Contents
- [Quick Start](#quick-start)
- [Inputs](#inputs)
- [Outputs](#outputs)
- [How It Works](#how-it-works)
- [Configuration](#configuration)
- [Visualizations](#visualizations)
- [Testing](#testing)
- [Project Structure](#project-structure)
- [Dependencies](#dependencies)
## Quick Start
```bash
# Set up virtual environment
python -m venv alarm_analysis_env
source alarm_analysis_env/Scripts/activate # Windows Git Bash
# or: alarm_analysis_env\Scripts\activate # Windows CMD
# Install dependencies
pip install pandas numpy matplotlib seaborn openpyxl
# Run the full analysis (outputs CSVs to output/)
python run_analysis.py
# Generate plots (outputs PNGs to plots/)
python create_plots.py
```
## Inputs
### 1. Alarm CSV (`CardinalAlarmsDec25.csv`)
Raw alarm log exported from the monitoring system. Required columns:
| Column | Type | Description | Example |
|--------|------|-------------|---------|
| `Alarm_Id` | int | Unique alarm event ID | `486258` |
| `Sensor_Id` | int | Numeric sensor identifier | `9273` |
| `Date` | datetime | When the alarm occurred | `2025-12-01 00:01:27.000` |
| `Description` | string | Alarm condition text | `Lo Warning: 68.0<=68.0F` |
| `LogTime` | datetime | When the event was logged | `2025-12-01 00:01:32.843` |
**Description patterns** the analyzer recognizes:
| Pattern | Example | Parsed As |
|---------|---------|-----------|
| Hi/Lo Alarm | `Hi Alarm: 51.3>=46.0F` | Type=Alarm, Value=51.3, Threshold=46.0, Unit=F |
| Hi/Lo Warning | `Lo Warning: 68.0<=68.0F` | Type=Warning, Value=68.0, Threshold=68.0, Unit=F |
| Error | `Error: Comm Loss Error 20.4>=20 min.` | Type=Error |
| Normal | `Normal 68.1F` | Type=Normal (resolves prior alarm) |
Supported units: `F`, `C`, `%RH`, `"H2O`
### 2. Sensor Report XLSX (`SensorReport Cardinal 2025-12-23_processed.xlsx`)
Sensor metadata exported from the monitoring system. Expected columns:
| Column | Description |
|--------|-------------|
| `ID` | Sensor ID (matches `Sensor_Id` in the alarm CSV) |
| `Group` | Logical grouping (e.g., room, zone, building area) |
| `Remote` | Remote unit identifier |
| `Name` | Human-readable sensor name |
| `Type` | Sensor type (temperature, humidity, etc.) |
| `Serial No` | Hardware serial number |
The XLSX may use a hierarchical layout where `Group` names appear only in the first row of each group. The analyzer handles this automatically via forward-fill. Both `header=0` (new format) and `header=4` (legacy format) are auto-detected.
### 3. Exclusion Config (optional)
Exclude specific sensor groups from analysis. Provide either format:
**JSON** (`exclusion_config.json`):
```json
{
"excluded_groups": [
"Maintenance Sensors",
"Decommissioned Wing"
]
}
```
**Plain text** (`groups_to_skip.txt`):
```
Maintenance Sensors
Decommissioned Wing
```
Pass the file path when creating the analyzer:
```python
analyzer = AlarmAnalyzer('alarms.csv', 'sensors.xlsx', exclusion_file_path='exclusion_config.json')
```
## Outputs
All outputs are generated in `output/` (CSVs) and `plots/` (PNGs).
### Core Analysis CSVs
| File | Description |
|------|-------------|
| `paired_alarm_events.csv` | Every alarm event paired with its resolution — includes sensor name/group, start/end times, duration, alarm type, values, thresholds, and how the alarm ended |
| `summary_by_alarm_type.csv` | Aggregate counts and duration stats (min/max/avg) per alarm type |
| `sensor_statistics.csv` | Per-sensor stats: alarm count, duration stats, with name and group |
### Rankings
| File | Ranked By |
|------|-----------|
| `top_sensors_by_alarm_count.csv` | Total alarm events per sensor |
| `top_sensors_by_avg_duration.csv` | Average alarm duration |
| `top_sensors_by_max_duration.csv` | Longest single alarm event |
| `top_sensors_by_severity_score.csv` | Severity score (type weight x duration) |
| `top_groups_by_alarm_count.csv` | Total alarm events per group |
| `top_groups_by_avg_duration.csv` | Average alarm duration per group |
| `top_groups_by_max_duration.csv` | Longest single alarm event per group |
| `top_groups_by_severity_score.csv` | Severity score per group |
### Time Analysis
| File | Description |
|------|-------------|
| `alarm_frequency_by_hour.csv` | Alarm count for each hour of day (0-23) |
| `alarm_frequency_by_day.csv` | Alarm count for each day of week |
### Group Analysis
| File | Description |
|------|-------------|
| `group_statistics.csv` | Per-group stats including total sensors, percentage of sensors that alarmed, and alarm time percentage |
| `alarm_type_distribution_by_group.csv` | Crosstab of alarm types per group |
### Uptime Metrics
| File | Description |
|------|-------------|
| `system_uptime_summary.csv` | System-wide uptime: total time span, cumulative downtime percentages, time-based uptime (per-hour bucket analysis) |
| `sensor_error_uptime_metrics.csv` | Per-sensor error-based uptime (communication failures) |
| `sensor_alarm_warning_uptime_metrics.csv` | Per-sensor alarm/warning-based uptime (operational issues) |
| `group_error_uptime_metrics.csv` | Per-group error-based uptime |
| `group_alarm_warning_uptime_metrics.csv` | Per-group alarm/warning-based uptime |
## How It Works
### Pipeline Overview
```
CSV + XLSX ──> Load & Map ──> Categorize ──> Pair Events ──> Analyze ──> Export
│ │ │
├─ Sensor ID → Name/Group │ ├─ Basic stats
└─ Exclude groups │ ├─ Advanced (MTBF, correlation, severity)
│ └─ Uptime metrics
Alarm Start ──> Normal (resolved)
Alarm Start ──> Different Alarm (transition)
Alarm Start ──> [nothing] (unresolved)
```
### Step-by-Step
1. **Load Data** — Read the alarm CSV and sensor report XLSX. Build a mapping from sensor IDs to human-readable names and groups. Enrich alarm records with sensor metadata. Filter out excluded groups.
2. **Categorize Alarms** — Parse each alarm's `Description` field with regex to extract the alarm type (Error, Alarm, Warning, Normal), measured value, threshold, and unit.
3. **Pair Events & Calculate Durations** — For each sensor, walk through events chronologically:
- An alarm-start event (Alarm, Warning, or Error) looks forward for resolution
- If a `Normal` event follows → alarm is **resolved**, duration is calculated
- If a different alarm type follows → recorded as a **transition** (e.g., "Transition to Alarm")
- If nothing follows → marked **unresolved**
4. **Basic Analysis** — Count alarms by type, sensor, and group. Compute duration statistics (min, max, average).
5. **Advanced Analysis**:
- **Hourly/daily frequency** — when alarms tend to occur
- **MTBF** (Mean Time Between Failures) — average time between consecutive alarms per sensor
- **Alarm correlation** — sensor pairs that alarm within 1-hour windows of each other
- **Severity scoring** — weighted by type (Error=3x, Alarm=2x, Warning=1x) multiplied by duration
- **Alarm escalation** — warnings that escalate to Alarm or Error within 1 hour
- **Group aggregates** — all metrics rolled up by sensor group
6. **Uptime Metrics** — Calculate downtime from error events (communication failures) and alarm/warning events (operational issues). Compute both cumulative percentages and time-bucketed system uptime using 1-hour intervals. Include all sensors and groups, even those with zero events.
7. **Export** — Write all results to CSV files in `output/`.
## Visualizations
Run `python create_plots.py` to generate PNG plots in `plots/`:
| Plot | Description |
|------|-------------|
| `alarm_dashboard.png` | 4-panel overview: alarm count by type, top 10 sensors, hourly frequency, daily frequency |
| `duration_analysis.png` | Box plots and histograms of alarm durations by type (log scale) |
| `sensor_analysis.png` | 4-panel: top sensors by count, avg duration, max duration, severity |
Additional group-based plots are generated when group data is available (group dashboard, group composition, alarm type distribution by group, alarm intensity per group).
Visualization imports (matplotlib, seaborn) are deferred so `run_analysis.py` can execute headless without a display.
## Testing
```bash
python test_changes.py # Validates code structure (methods, columns, exports exist)
python test_duration_fix.py # Tests event pairing and duration calculation
python test_mapping.py # Verifies sensor ID → name/group mapping
python test_enhanced_plotting.py # Tests plot data preparation logic (no rendering)
```
## Project Structure
```
AlarmAnalysis/
├── alarm_analyzer.py # Core AlarmAnalyzer class (all analysis logic)
├── run_analysis.py # Entry point: run full analysis, export CSVs
├── create_plots.py # Entry point: generate visualization PNGs
├── exclusion_config.json # Group exclusion config (JSON format)
├── groups_to_skip.txt # Group exclusion config (plain text format)
├── CardinalAlarmsDec25.csv # Input: alarm log data
├── SensorReport *.xlsx # Input: sensor metadata
├── test_changes.py # Test: code structure validation
├── test_duration_fix.py # Test: event pairing logic
├── test_mapping.py # Test: sensor ID mapping
├── test_enhanced_plotting.py # Test: plot data preparation
├── output/ # Generated CSV analysis results
└── plots/ # Generated PNG visualizations
```
## Dependencies
- **Python** 3.13+
- **pandas** — data manipulation and analysis
- **numpy** — numerical operations
- **matplotlib** — plotting (only needed for `create_plots.py`)
- **seaborn** — statistical visualizations (only needed for `create_plots.py`)
- **openpyxl** — reading XLSX sensor reports
Install all dependencies:
```bash
pip install pandas numpy matplotlib seaborn openpyxl
```

Binary file not shown.

1884
alarm_analyzer.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python
# Script to verify the enhanced group statistics
import pandas as pd
import os
def check_enhanced_group_stats():
print("=== ENHANCED GROUP STATISTICS VERIFICATION ===")
print()
# Check if output directory exists
if not os.path.exists('output'):
print("Output directory not found!")
return
# Check if group_statistics.csv exists
group_stats_path = os.path.join('output', 'group_statistics.csv')
if not os.path.exists(group_stats_path):
print(f"Group statistics file not found at {group_stats_path}")
return
# Load the enhanced group statistics
group_stats_df = pd.read_csv(group_stats_path)
print("Enhanced Group Statistics Columns:")
print(list(group_stats_df.columns))
print()
# Verify the new columns exist
required_columns = [
'Total_Sensors_In_Group',
'Percentage_Monitoring_Points_Alarmed',
'Alarm_Time_Percentage'
]
missing_columns = [col for col in required_columns if col not in group_stats_df.columns]
if missing_columns:
print(f"ERROR: Missing columns: {missing_columns}")
return
else:
print("All required enhanced columns are present")
print()
# Display sample of the enhanced data
print("Sample of Enhanced Group Statistics (Top 10 by Alarm Count):")
print(group_stats_df[['Sensor_Group', 'Total_Alarm_Count', 'Unique_Sensors',
'Total_Sensors_In_Group', 'Percentage_Monitoring_Points_Alarmed',
'Alarm_Time_Percentage']].head(10))
print()
# Show some key statistics
print("=== ENHANCED ANALYSIS SUMMARY ===")
# Groups with highest percentage of monitoring points alarmed
print("Top 5 groups with highest percentage of monitoring points that experienced alarms:")
top_alarm_percent = group_stats_df.nlargest(5, 'Percentage_Monitoring_Points_Alarmed')[['Sensor_Group', 'Percentage_Monitoring_Points_Alarmed', 'Unique_Sensors', 'Total_Sensors_In_Group']]
print(top_alarm_percent)
print()
# Groups with highest alarm time percentage
print("Top 5 groups with highest percentage of time spent in alarm condition:")
top_time_percent = group_stats_df.nlargest(5, 'Alarm_Time_Percentage')[['Sensor_Group', 'Alarm_Time_Percentage', 'Total_Duration', 'Total_Sensors_In_Group']]
print(top_time_percent)
print()
# Groups with the most difference between total sensors and unique sensors that alarmed
print("Groups with the highest number of total sensors but lower alarm activity:")
group_stats_df['Sensors_Not_Alarming'] = group_stats_df['Total_Sensors_In_Group'] - group_stats_df['Unique_Sensors']
top_inactive = group_stats_df.nlargest(5, 'Sensors_Not_Alarming')[['Sensor_Group', 'Sensors_Not_Alarming', 'Total_Sensors_In_Group', 'Unique_Sensors', 'Percentage_Monitoring_Points_Alarmed']]
print(top_inactive)
print()
print("Enhanced group statistics analysis completed successfully!")
print()
print("New metrics added:")
print("- Total_Sensors_In_Group: Total number of sensors in the group according to sensor report")
print("- Percentage_Monitoring_Points_Alarmed: Percentage of sensors in the group that experienced alarms")
print("- Alarm_Time_Percentage: Percentage of total possible sensor-time that was spent in alarm condition")
if __name__ == "__main__":
check_enhanced_group_stats()

74
check_mapping.py Normal file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python
# Script to check the ID mapping between alarm data and sensor report
import pandas as pd
import numpy as np
def check_mapping():
print("Loading alarm data...")
alarm_df = pd.read_csv('CardinalAlarmsDec25.csv')
print("Loading sensor report...")
# Try to read with header=0 first (new format) then with header=4 (old format)
try:
temp_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0, nrows=5)
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
if has_expected_cols:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0)
print("Using new sensor report format (header=0)")
else:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=4)
print("Using old sensor report format (header=4)")
except FileNotFoundError:
print("Sensor report file not found. Please ensure 'SensorReport Cardinal 2025-12-23_processed.xlsx' is in the current directory.")
return
print(f"Alarm data shape: {alarm_df.shape}")
print(f"Sensor report shape: {sensor_df.shape}")
print("\nAlarm data Sensor_Id sample (first 10):")
print(alarm_df['Sensor_Id'].head(10).tolist())
print("\nSensor report columns:")
print(sensor_df.columns.tolist())
print("\nSensor report 'Remote SN' column info:")
print(f"Data type: {sensor_df['Remote SN'].dtype}")
print(f"Sample values (first 10): {sensor_df['Remote SN'].head(10).tolist()}")
print(f"Non-null count: {sensor_df['Remote SN'].notna().sum()}")
# Check for potential matches
alarm_sensors = set(alarm_df['Sensor_Id'].unique())
# Clean the Remote SN column to find valid numeric values
valid_remote_sns = []
for sn in sensor_df['Remote SN'].dropna():
try:
# Try to convert to int
valid_remote_sns.append(int(sn))
except (ValueError, TypeError):
print(f"Could not convert to int: {sn}")
continue
sensor_sns = set(valid_remote_sns)
print(f"\nNumber of unique alarm sensors: {len(alarm_sensors)}")
print(f"Number of valid sensor report IDs: {len(sensor_sns)}")
print(f"Common IDs between datasets: {len(alarm_sensors.intersection(sensor_sns))}")
if len(alarm_sensors.intersection(sensor_sns)) > 0:
print(f"Sample common IDs: {list(alarm_sensors.intersection(sensor_sns))[:10]}")
else:
print("No direct matches found. Let's check other potential ID columns in sensor report...")
# Check other columns that might contain IDs
for col in sensor_df.columns:
if col != 'Remote SN':
print(f"\nChecking column: {col}")
non_null_values = sensor_df[col].dropna().head(10).tolist()
print(f"Sample values: {non_null_values}")
if __name__ == "__main__":
check_mapping()

45
check_output.py Normal file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python
# Check the output files to confirm sensor names are included
import pandas as pd
def check_output():
try:
print("Loading paired events CSV...")
paired_events = pd.read_csv('output/paired_alarm_events.csv')
print(f'Paired events CSV loaded successfully')
print(f'Shape: {paired_events.shape}')
print('Columns:', list(paired_events.columns))
# Show a few rows to verify sensor names are included
print('\nFirst 5 rows with Sensor_Id, Sensor_Name, Sensor_Group:')
cols_to_show = ['Sensor_Id', 'Sensor_Name', 'Sensor_Group', 'Alarm_Type', 'Duration_Minutes']
available_cols = [col for col in cols_to_show if col in paired_events.columns]
if available_cols:
print(paired_events[available_cols].head())
else:
print("Columns not found in paired events file")
print('\nSample of unique sensor names:')
if 'Sensor_Name' in paired_events.columns:
unique_names = paired_events['Sensor_Name'].unique()
print(f'Number of unique sensor names: {len(unique_names)}')
print('Sample sensor names:', unique_names[:10])
else:
print("Sensor_Name column not found in paired events")
print('\nSample of unique sensor groups:')
if 'Sensor_Group' in paired_events.columns:
unique_groups = paired_events['Sensor_Group'].unique()
print(f'Number of unique sensor groups: {len(unique_groups)}')
print('Sample sensor groups:', unique_groups[:10])
else:
print("Sensor_Group column not found in paired events")
except Exception as e:
print(f'Error reading output file: {e}')
import traceback
traceback.print_exc()
if __name__ == "__main__":
check_output()

50
check_sensor_report.py Normal file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python
# Check the sensor report data structure
import pandas as pd
def check_sensor_report():
print("Loading sensor report...")
# Try to read with header=0 first (new format) then with header=4 (old format)
try:
temp_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0, nrows=5)
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
if has_expected_cols:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0)
print("Using new sensor report format (header=0)")
else:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=4)
print("Using old sensor report format (header=4)")
except FileNotFoundError:
print("Sensor report file not found. Please ensure 'SensorReport Cardinal 2025-12-23_processed.xlsx' is in the current directory.")
return
print(f"Sensor report shape: {sensor_df.shape}")
print(f"Columns: {list(sensor_df.columns)}")
print("\nFirst few rows:")
print(sensor_df.head(10))
print("\nSample of the specific columns we're interested in:")
sample_ids = [9273, 3817, 8963, 7414, 9092, 9105, 7080, 3799]
for col in ['ID', 'Remote', 'Group', 'Type', 'Serial No']:
print(f"\n{col} column:")
if col in sensor_df.columns:
print(sensor_df[sensor_df['ID'].isin(sample_ids)][col].head(10))
else:
print(f"Column {col} not found")
# Check for some of the IDs that should exist
print(f"\nChecking for specific ID values...")
for sensor_id in sample_ids:
matches = sensor_df[sensor_df['ID'] == float(sensor_id)]
if not matches.empty:
print(f"ID {sensor_id}:")
print(matches[['ID', 'Remote', 'Group', 'Type', 'Name']].iloc[0] if not matches.empty else "No match")
print("---")
if __name__ == "__main__":
check_sensor_report()

44
check_unknown_sensors.py Normal file
View File

@@ -0,0 +1,44 @@
import pandas as pd
from alarm_analyzer import AlarmAnalyzer
# Create analyzer instance
analyzer = AlarmAnalyzer(
csv_file_path="C:\\Users\\AndrewConlon\\Documents\\AlarmAnalysis\\CardinalAlarmsDec25.csv",
xlsx_file_path="C:\\Users\\AndrewConlon\\Documents\\AlarmAnalysis\\SensorReport Cardinal 2025-12-23_processed.xlsx"
)
# Load data
alarm_data, sensor_data = analyzer.load_data()
# Check which sensors are mapped to 'Unknown' group
unknown_sensors = analyzer.alarm_data[analyzer.alarm_data['Sensor_Group'] == 'Unknown']
print(f"Number of alarm records with 'Unknown' group: {len(unknown_sensors)}")
print(f"Number of unique sensors with 'Unknown' group: {unknown_sensors['Sensor_Id'].nunique()}")
if len(unknown_sensors) > 0:
print("\nFirst 20 unique sensors with 'Unknown' group:")
unknown_sensor_ids = unknown_sensors['Sensor_Id'].unique()[:20]
print(unknown_sensor_ids)
print("\nSensor details for first few 'Unknown' sensors:")
for sensor_id in unknown_sensor_ids[:10]:
sensor_records = unknown_sensors[unknown_sensors['Sensor_Id'] == sensor_id].iloc[0]
print(f"Sensor ID: {sensor_id}, Name: {sensor_records['Sensor_Name']}, Group: {sensor_records['Sensor_Group']}")
# Check if this sensor exists in the sensor mapping
sensor_info = analyzer.sensor_mapping.get(sensor_id, {})
if sensor_info:
print(f" Sensor mapping info: {sensor_info}")
else:
print(f" Sensor NOT found in mapping")
print()
# Also check which sensors from alarm data are not in the sensor mapping
alarm_sensor_ids = set(analyzer.alarm_data['Sensor_Id'].unique())
mapped_sensor_ids = set(analyzer.sensor_mapping.keys())
unmapped_sensors = alarm_sensor_ids - mapped_sensor_ids
print(f"\nNumber of sensors in alarm data but not in sensor mapping: {len(unmapped_sensors)}")
if unmapped_sensors:
print("First 20 unmapped sensor IDs:", list(unmapped_sensors)[:20])

37
create_plots.py Normal file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/env python
# Script to create visualizations with enhanced group and sensor name information
from alarm_analyzer import AlarmAnalyzer
def main():
print("Creating analyzer instance for visualizations...")
# Create analyzer instance
analyzer = AlarmAnalyzer('CardinalAlarmsDec25.csv', 'SensorReport Cardinal 2025-12-23_processed.xlsx')
print("Loading data...")
# Load data
alarm_data, sensor_data = analyzer.load_data()
print(f"Loaded {len(alarm_data)} alarm records")
if analyzer.sensor_mapping:
print(f"Created sensor mapping for {len(analyzer.sensor_mapping)} sensors")
else:
print("No sensor mapping created - sensor report may not have been processed correctly")
print("Categorizing alarms...")
# Categorize alarms
categorized_data = analyzer.categorize_alarms()
print("Pairing events and calculating durations...")
# Pair events and calculate durations
paired_events = analyzer.pair_events_and_calculate_durations()
print("Creating enhanced visualizations...")
# Create visualizations with enhanced group and sensor name information
analyzer.create_visualizations(save_plots=True, output_dir='plots')
print("Visualizations created successfully!")
print("Plots have been saved to the plots directory.")
if __name__ == '__main__':
main()

54
debug_sensor_report.py Normal file
View File

@@ -0,0 +1,54 @@
import pandas as pd
# Read the sensor report
# Try to read with header=0 first (new format) then with header=4 (old format)
try:
temp_df = pd.read_excel('C:\\Users\\AndrewConlon\\Documents\\AlarmAnalysis\\SensorReport Cardinal 2025-12-23_processed.xlsx', header=0, nrows=5)
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
if has_expected_cols:
df = pd.read_excel('C:\\Users\\AndrewConlon\\Documents\\AlarmAnalysis\\SensorReport Cardinal 2025-12-23_processed.xlsx', header=0)
print("Using new sensor report format (header=0)")
else:
df = pd.read_excel('C:\\Users\\AndrewConlon\\Documents\\AlarmAnalysis\\SensorReport Cardinal 2025-12-23_processed.xlsx', header=4)
print("Using old sensor report format (header=4)")
except FileNotFoundError:
print("Sensor report file not found. Please ensure 'SensorReport Cardinal 2025-12-23_processed.xlsx' is in the current directory.")
exit(1)
print('Shape:', df.shape)
print('Before forward-fill:')
print('First 10 rows:')
print(df[['ID', 'Group']].head(10))
# Apply the same hierarchical processing as in the code
df_processed = df.copy()
hierarchical_cols = ['Group', 'Remote', 'Name', 'Type', 'Serial No']
for col in hierarchical_cols:
if col in df_processed.columns:
# Forward fill: propagate non-null values down until the next non-null value
df_processed[col] = df_processed[col].ffill()
print()
print('After forward-fill:')
print('First 10 rows:')
print(df_processed[['ID', 'Group']].head(10))
# Check if sensor 7335 now has a group
sensor_7335 = df_processed[pd.to_numeric(df_processed['ID'], errors='coerce') == 7335]
if not sensor_7335.empty:
print()
print('Sensor 7335 after forward-fill:')
print(sensor_7335[['ID', 'Group', 'Name']])
else:
print()
print('Sensor 7335 not found in processed data')
# Let's also check for all sensors that have ID 7335 in the original data
original_sensor_7335 = df[pd.to_numeric(df['ID'], errors='coerce') == 7335]
if not original_sensor_7335.empty:
print()
print('Sensor 7335 in original data:')
print(original_sensor_7335[['ID', 'Group', 'Name']])

View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python
# Final demonstration of enhanced group-based analysis
import pandas as pd
import os
def demonstrate_enhanced_features():
print("=== ENHANCED GROUP-BASED ANALYSIS DEMONSTRATION ===")
print()
# Load the enhanced group statistics
group_stats_path = os.path.join('output', 'group_statistics.csv')
if not os.path.exists(group_stats_path):
print(f"Group statistics file not found at {group_stats_path}")
return
group_stats_df = pd.read_csv(group_stats_path)
print("NEW ENHANCED METRICS ADDED TO GROUP STATISTICS:")
print()
print("1. Total_Sensors_In_Group - Total number of sensors in each group (from sensor report)")
print("2. Percentage_Monitoring_Points_Alarmed - Percentage of sensors in the group that experienced alarms")
print("3. Alarm_Time_Percentage - Percentage of total possible sensor-time that was spent in alarm condition")
print()
print("SAMPLE ENHANCED DATA (Top 5 groups by alarm count):")
print(group_stats_df[['Sensor_Group', 'Total_Alarm_Count', 'Unique_Sensors',
'Total_Sensors_In_Group', 'Percentage_Monitoring_Points_Alarmed',
'Alarm_Time_Percentage']].head())
print()
print("INTERPRETATION OF NEW METRICS:")
print()
print("- Total_Sensors_In_Group: Shows the actual size of each monitoring group")
print("- Percentage_Monitoring_Points_Alarmed: Reveals how widespread alarm events are within each group")
print("- Alarm_Time_Percentage: Indicates how much time the group's sensors spend in alarm condition")
print()
# Example interpretation
print("EXAMPLE ANALYSIS:")
sci_mansfield = group_stats_df[group_stats_df['Sensor_Group'] == 'SCI - Mansfield'].iloc[0]
print(f"- SCI - Mansfield group has {sci_mansfield['Total_Sensors_In_Group']} total sensors,")
print(f" {sci_mansfield['Unique_Sensors']} experienced alarms ({sci_mansfield['Percentage_Monitoring_Points_Alarmed']}% of group),")
print(f" and spent {sci_mansfield['Alarm_Time_Percentage']}% of total possible time in alarm condition.")
print()
snx_trailer = group_stats_df[group_stats_df['Sensor_Group'] == 'SNX Trailer'].iloc[0]
print(f"- SNX Trailer group has {snx_trailer['Total_Sensors_In_Group']} total sensors,")
print(f" all {snx_trailer['Unique_Sensors']} experienced alarms (100% of group),")
print(f" and spent {snx_trailer['Alarm_Time_Percentage']}% of total possible time in alarm condition.")
print()
print("These new metrics provide deeper insights into:")
print("- Group size and coverage")
print("- Alarm distribution within groups")
print("- Overall alarm activity intensity per group")
print()
print("The enhanced analysis provides better visibility into which groups have the most comprehensive")
print("alarm coverage and which groups are experiencing the most persistent alarm conditions.")
if __name__ == "__main__":
demonstrate_enhanced_features()

6
exclusion_config.json Normal file
View File

@@ -0,0 +1,6 @@
{
"excluded_groups": [
"GroupName1",
"GroupName2"
]
}

72
find_matches.py Normal file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python
# Script to find matches between alarm IDs and sensor report IDs
import pandas as pd
def find_matches():
print("Loading alarm data...")
alarm_df = pd.read_csv('CardinalAlarmsDec25.csv')
print("Loading sensor report...")
# Try to read with header=0 first (new format) then with header=4 (old format)
try:
temp_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0, nrows=5)
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
has_expected_cols = any(col in temp_df.columns for col in expected_cols)
if has_expected_cols:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0)
print("Using new sensor report format (header=0)")
else:
sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=4)
print("Using old sensor report format (header=4)")
except FileNotFoundError:
print("Sensor report file not found. Please ensure 'SensorReport Cardinal 2025-12-23_processed.xlsx' is in the current directory.")
return
alarm_sensors = set(alarm_df['Sensor_Id'].unique())
sensor_ids = set([int(x) for x in sensor_df['ID'].dropna() if pd.notna(x)])
print(f"Number of unique alarm sensors: {len(alarm_sensors)}")
print(f"Number of unique sensor report IDs: {len(sensor_ids)}")
matches = alarm_sensors.intersection(sensor_ids)
print(f"Number of common IDs: {len(matches)}")
if len(matches) > 0:
print(f"Common IDs: {list(matches)}")
else:
print("No exact matches found between alarm Sensor_Id and sensor report ID column.")
print("\nLet's look for any potential patterns or partial matches...")
# Check if any alarm sensor IDs might be in other columns of the sensor report
print("\nChecking other columns in the sensor report for potential matches...")
for col in sensor_df.columns:
if col != 'ID' and col != 'Remote SN': # Skip columns we already know don't match
print(f"\nChecking column: {col}")
# Look for any numeric values in this column that might match
numeric_values = []
for val in sensor_df[col].dropna():
try:
# Try to extract any numbers from the value
import re
numbers = re.findall(r'\d+', str(val))
for num in numbers:
numeric_values.append(int(num))
except:
continue
if numeric_values:
numeric_set = set(numeric_values)
col_matches = alarm_sensors.intersection(numeric_set)
if col_matches:
print(f" Found {len(col_matches)} matches in {col}: {list(col_matches)[:10]}")
else:
print(f" No matches in {col}")
else:
print(f" No numeric values found in {col}")
if __name__ == "__main__":
find_matches()

2
groups_to_skip.txt Normal file
View File

@@ -0,0 +1,2 @@
GroupName1
GroupName2

View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python
"""
Script to inspect the new sensor report format and compare it with the old one
"""
import pandas as pd
def inspect_new_sensor_report():
print("Inspecting new sensor report: SensorReport Cardinal 2025-12-23_processed.xlsx")
try:
# Try to read the new sensor report with different header options
print("\nTrying to read with header=4 (same as old format)...")
new_sensor_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=4)
print(f"New sensor report shape: {new_sensor_df.shape}")
print(f"New sensor report columns: {list(new_sensor_df.columns)}")
print("\nFirst few rows of new sensor report:")
print(new_sensor_df.head())
print("\nData types of columns:")
print(new_sensor_df.dtypes)
# Check for key columns that are expected by the current code
expected_cols = ['ID', 'Remote', 'Group', 'Type', 'Serial No', 'Name']
print(f"\nChecking for expected columns: {expected_cols}")
for col in expected_cols:
if col in new_sensor_df.columns:
print(f" [OK] {col}: Present")
else:
print(f" [MISSING] {col}: Missing")
# Look at a sample of the data to understand its structure
print(f"\nSample data for first 10 rows:")
sample_cols = [col for col in expected_cols if col in new_sensor_df.columns]
if sample_cols:
print(new_sensor_df[sample_cols].head(10))
# Try different header values to see if the structure is different
print("\nTrying with header=0 (first row)...")
new_sensor_df_h0 = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=0)
print(f"With header=0 - Shape: {new_sensor_df_h0.shape}, Columns: {list(new_sensor_df_h0.columns[:10])}") # First 10 columns
print("\nTrying with header=3...")
new_sensor_df_h3 = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=3)
print(f"With header=3 - Shape: {new_sensor_df_h3.shape}, Columns: {list(new_sensor_df_h3.columns[:10])}")
# Also try to see the first few rows without setting a header
print("\nFirst few rows without setting header (to see raw structure):")
raw_df = pd.read_excel('SensorReport Cardinal 2025-12-23_processed.xlsx', header=None)
print(raw_df.head(10))
except Exception as e:
print(f"Error reading new sensor report: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
inspect_new_sensor_report()

52
run_analysis.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python
# Simple script to run the alarm analyzer without visualization
from alarm_analyzer import AlarmAnalyzer
def main():
print("Creating analyzer instance...")
# Create analyzer instance
analyzer = AlarmAnalyzer('CardinalAlarmsDec25.csv', 'SensorReport Cardinal 2025-12-23_processed.xlsx')
print("Loading data...")
# Load data
alarm_data, sensor_data = analyzer.load_data()
print(f"Loaded {len(alarm_data)} alarm records")
if analyzer.sensor_mapping:
print(f"Created sensor mapping for {len(analyzer.sensor_mapping)} sensors")
else:
print("No sensor mapping created - sensor report may not have been processed correctly")
print("Categorizing alarms...")
# Categorize alarms
categorized_data = analyzer.categorize_alarms()
print("Pairing events and calculating durations...")
# Pair events and calculate durations
paired_events = analyzer.pair_events_and_calculate_durations()
print("Performing basic analysis...")
# Perform basic analysis
basic_results = analyzer.basic_analysis()
print("Performing advanced analysis...")
# Perform advanced analysis
advanced_results = analyzer.advanced_analysis()
print("Exporting results...")
# Export results (this doesn't require matplotlib)
analyzer.export_results(output_dir='output')
# Perform uptime analysis
print("Performing uptime analysis...")
uptime_results = analyzer.calculate_uptime_metrics()
# Export uptime metrics to new files
analyzer.export_uptime_metrics(output_dir="output", uptime_results=uptime_results)
print("Analysis completed successfully!")
print("Results have been exported to the output directory.")
if __name__ == '__main__':
main()

66
show_results.py Normal file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python
# Script to show the enhanced analysis results
import pandas as pd
import os
def show_results():
print("=== ENHANCED ALARM ANALYSIS RESULTS ===")
print()
# Check that output directory exists and show files
if os.path.exists('output'):
print("Output files created:")
for file in sorted(os.listdir('output')):
print(f" - {file}")
print()
else:
print("Output directory not found!")
return
# Show sample from paired_alarm_events.csv
try:
print("Sample from paired_alarm_events.csv (first 5 rows with sensor names and groups):")
paired_df = pd.read_csv('output/paired_alarm_events.csv')
print(paired_df[['Sensor_Id', 'Sensor_Name', 'Sensor_Group', 'Alarm_Type', 'Duration_Minutes']].head())
print()
except Exception as e:
print(f"Could not read paired_alarm_events.csv: {e}")
# Show top groups by alarm count
try:
print("Top groups by alarm count:")
groups_count_df = pd.read_csv('output/top_groups_by_alarm_count.csv')
print(groups_count_df.head(10))
print()
except Exception as e:
print(f"Could not read top_groups_by_alarm_count.csv: {e}")
# Show sample of group statistics
try:
print("Sample of group statistics (top 10 by alarm count):")
group_stats_df = pd.read_csv('output/group_statistics.csv')
print(group_stats_df[['Sensor_Group', 'Total_Alarm_Count', 'Avg_Duration', 'Total_Severity_Score']].head(10))
print()
except Exception as e:
print(f"Could not read group_statistics.csv: {e}")
# Show top sensors by alarm count to compare
try:
print("Top sensors by alarm count (with names):")
sensors_count_df = pd.read_csv('output/top_sensors_by_alarm_count.csv')
print(sensors_count_df.head(10))
print()
except Exception as e:
print(f"Could not read top_sensors_by_alarm_count.csv: {e}")
print("Analysis completed successfully with enhanced group and sensor name information!")
print()
print("Key enhancements:")
print("- Sensor IDs now replaced with meaningful sensor names")
print("- Groups properly mapped using hierarchical structure processing")
print("- Group-based analysis now available throughout the system")
print("- All output files contain enhanced sensor name and group information")
if __name__ == "__main__":
show_results()

74
test_changes.py Normal file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python
# Test script to validate the changes made to alarm_analyzer.py
import sys
import os
import pandas as pd
def test_code_structure():
"""Test that the modified code has the correct structure"""
# Read the file to check if our changes were applied correctly
with open('alarm_analyzer.py', 'r') as f:
content = f.read()
print("Testing if new methods were added correctly...")
# Check if the add_sensor_info_to_alarms method exists
if 'def add_sensor_info_to_alarms(self)' in content:
print("[OK] add_sensor_info_to_alarms method exists")
else:
print("[ERROR] add_sensor_info_to_alarms method missing")
# Check if the load_data method was updated
if 'header=4' in content and 'Remote SN' in content:
print("[OK] load_data method updated with proper header reading")
else:
print("[ERROR] load_data method not properly updated")
# Check if sensor info is added to paired events
if 'Sensor_Name' in content and 'Sensor_Group' in content and 'Sensor_Type' in content:
print("[OK] Sensor information added to paired events")
else:
print("[ERROR] Sensor information not properly added to paired events")
# Check if group-based analysis was added
if 'group_counts' in content and 'mtbf_by_group' in content:
print("[OK] Group-based analysis added to basic and advanced analysis")
else:
print("[ERROR] Group-based analysis not properly added")
# Check if group-based visualizations were added
if 'Group-Based Analysis Dashboard' in content:
print("[OK] Group-based visualizations added")
else:
print("[ERROR] Group-based visualizations not properly added")
# Check if group-based exports were added
if 'group_statistics.csv' in content:
print("[OK] Group-based exports added")
else:
print("[ERROR] Group-based exports not properly added")
print("\nAll structural changes have been validated!")
def test_logic():
"""Test the logic of the changes"""
print("\nTesting the logic of the changes...")
# Check that the updated main section uses the correct file name
with open('alarm_analyzer.py', 'r') as f:
content = f.read()
if 'SensorReport Cardinal 2025-12-23_processed.xlsx' in content:
print("[OK] Main section updated with correct sensor report file name")
else:
print("[ERROR] Main section not updated with correct sensor report file name")
print("Logic validation completed!")
if __name__ == "__main__":
print("Validating changes made to alarm_analyzer.py...")
test_code_structure()
test_logic()
print("\nValidation completed successfully!")

81
test_duration_fix.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
Test script to verify the fix for alarm duration calculation
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from alarm_analyzer import AlarmAnalyzer
def test_duration_calculation():
"""
Test the updated duration calculation with sample data
"""
print("Testing updated duration calculation...")
# Use the existing files
csv_file = "CardinalAlarmsDec25.csv"
xlsx_file = "SensorReport Cardinal 2025-12-23_processed.xlsx"
if not os.path.exists(csv_file):
print(f"CSV file {csv_file} not found. Creating a small test file...")
# Create a minimal test file
test_data = """Alarm_Id,Sensor_Id,Date,Description,LogTime
1,1001,2025-12-01 00:01:00.000,"Lo Warning: 68.0<=68.0F ",2025-12-01 00:01:01.077
2,1001,2025-12-01 00:05:00.000,"Lo Alarm: 67.5<=68.0F ",2025-12-01 00:05:01.077
3,1001,2025-12-01 00:10:00.000,"Normal 68.2F ",2025-12-01 00:10:01.077
4,1002,2025-12-01 00:02:00.000,"Error: Comm Loss Error 20.4>=20 min.",2025-12-01 00:02:01.077
5,1002,2025-12-01 00:07:00.000,"Hi Alarm: 70.0>=68.0F ",2025-12-01 00:07:01.077
6,1002,2025-12-01 00:12:00.000,"Normal 69.5F ",2025-12-01 00:12:01.077"""
with open(csv_file, 'w') as f:
f.write(test_data)
# Create analyzer instance
analyzer = AlarmAnalyzer(csv_file, xlsx_file)
try:
# Load data
alarm_data, sensor_data = analyzer.load_data()
print(f"Loaded {len(alarm_data)} alarm records")
# Categorize alarms
categorized_data = analyzer.categorize_alarms()
print("Categorized alarms successfully")
# Pair events and calculate durations
paired_events = analyzer.pair_events_and_calculate_durations()
if paired_events is not None and len(paired_events) > 0:
print(f"Created {len(paired_events)} paired events")
print("\nFirst few paired events:")
print(paired_events[['Sensor_Id', 'Alarm_Type', 'Start_Time', 'End_Time', 'Duration_Minutes', 'End_Reason']].head(10))
# Check if End_Reason column exists
if 'End_Reason' in paired_events.columns:
print(f"\nEnd reason distribution:")
print(paired_events['End_Reason'].value_counts())
else:
print("ERROR: End_Reason column not found in paired events")
# Check for transitions
if 'End_Reason' in paired_events.columns:
transitions = paired_events[paired_events['End_Reason'].str.contains('Transition', na=False)]
if len(transitions) > 0:
print(f"\nFound {len(transitions)} alarm condition transitions:")
print(transitions[['Sensor_Id', 'Alarm_Type', 'Start_Description', 'End_Description', 'Duration_Minutes', 'End_Reason']])
else:
print("\nNo alarm condition transitions found in this sample.")
else:
print("No paired events created")
print("Test completed successfully!")
except Exception as e:
print(f"Error during test: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_duration_calculation()

144
test_enhanced_plotting.py Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python
# Test script to verify enhanced plotting functionality without creating actual plots
from alarm_analyzer import AlarmAnalyzer
import pandas as pd
def test_enhanced_plotting():
print("Testing enhanced plotting functionality...")
# Create analyzer instance
analyzer = AlarmAnalyzer('CardinalAlarmsDec25.csv', 'SensorReport Cardinal 2025-12-23_processed.xlsx')
print("Loading data...")
# Load data
alarm_data, sensor_data = analyzer.load_data()
print(f"Loaded {len(alarm_data)} alarm records")
if analyzer.sensor_mapping:
print(f"Created sensor mapping for {len(analyzer.sensor_mapping)} sensors")
else:
print("No sensor mapping created - sensor report may not have been processed correctly")
print("Categorizing alarms...")
# Categorize alarms
categorized_data = analyzer.categorize_alarms()
print("Pairing events and calculating durations...")
# Pair events and calculate durations
paired_events = analyzer.pair_events_and_calculate_durations()
# Test the sensor name mapping logic without creating plots
print("\n--- TESTING ENHANCED PLOTTING LOGIC ---")
# Filter resolved events for testing
duration_events = analyzer.processed_events[analyzer.processed_events['Duration_Minutes'].notna()].copy()
if len(duration_events) == 0:
print("No resolved events with duration data available for testing.")
return
# Extract time components for time-based analysis
duration_events['Start_Hour'] = duration_events['Start_Time'].dt.hour
duration_events['Start_DayOfWeek'] = duration_events['Start_Time'].dt.day_name()
duration_events['Start_Date'] = duration_events['Start_Time'].dt.date
print("\nTesting sensor name mapping for top sensors by alarm count...")
# Top 10 sensors by alarm count - with sensor names instead of IDs
top_sensors = duration_events['Sensor_Id'].value_counts().head(10)
sensor_names_for_plot = []
for sensor_id in top_sensors.index:
sensor_info = analyzer.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot.append(f"{sensor_name}\n({sensor_group})")
print("Sample of enhanced sensor labels for plotting:")
for i, (sensor_id, count) in enumerate(top_sensors.head(5).items()):
print(f" {sensor_names_for_plot[i]}: {count} alarms")
print("\nTesting sensor name mapping for average duration...")
# Top 10 sensors by average duration - with sensor names instead of IDs
avg_duration_by_sensor = duration_events.groupby('Sensor_Id')['Duration_Minutes'].mean().sort_values(ascending=False).head(10)
sensor_names_for_plot_avg = []
for sensor_id in avg_duration_by_sensor.index:
sensor_info = analyzer.sensor_mapping.get(sensor_id, {})
sensor_name = sensor_info.get('name', f'ID: {sensor_id}')
sensor_group = sensor_info.get('group', 'Unknown')
sensor_names_for_plot_avg.append(f"{sensor_name} (Group: {sensor_group})")
print("Sample of enhanced sensor labels for average duration plotting:")
for i, (sensor_id, avg_duration) in enumerate(avg_duration_by_sensor.head(5).items()):
print(f" {sensor_names_for_plot_avg[i]}: {avg_duration:.2f} minutes")
print("\nTesting group-based visualizations...")
if 'Sensor_Group' in duration_events.columns:
print("Group-based visualizations would be created...")
# Test group composition analysis
if analyzer.sensor_mapping:
# Create a mapping of group to number of sensors
group_to_sensor_count = {}
for sensor_id, sensor_info in analyzer.sensor_mapping.items():
group = sensor_info.get('group', 'Unknown')
if group not in group_to_sensor_count:
group_to_sensor_count[group] = 0
group_to_sensor_count[group] += 1
# Convert to dataframe and sort
group_sensor_counts = pd.DataFrame(
list(group_to_sensor_count.items()),
columns=['Group', 'Sensor_Count']
).sort_values('Sensor_Count', ascending=False).head(10)
print("Sample of group composition data:")
for _, row in group_sensor_counts.head(5).iterrows():
print(f" {row['Group']}: {row['Sensor_Count']} sensors")
# Test alarm type distribution by group
alarm_type_by_group = duration_events.groupby(['Sensor_Group', 'Alarm_Type']).size().unstack(fill_value=0)
top_10_groups = duration_events['Sensor_Group'].value_counts().head(10).index
alarm_type_by_group_top = alarm_type_by_group.loc[top_10_groups]
print("Sample of alarm type distribution by group:")
sample_groups = alarm_type_by_group_top.head(3)
for group in sample_groups.index:
print(f" {group}:")
for alarm_type in sample_groups.columns:
count = sample_groups.loc[group, alarm_type]
if count > 0:
print(f" {alarm_type}: {count} alarms")
# Test group alarm intensity
alarms_per_sensor_by_group = duration_events.groupby('Sensor_Group')['Sensor_Id'].nunique().to_dict()
# Calculate total sensors per group from mapping
group_to_sensor_count = {}
for sensor_id, sensor_info in analyzer.sensor_mapping.items():
group = sensor_info.get('group', 'Unknown')
if group not in group_to_sensor_count:
group_to_sensor_count[group] = 0
group_to_sensor_count[group] += 1
# Calculate alarms per sensor ratio
group_alarm_intensity = {}
for group in set(duration_events['Sensor_Group'].unique()):
total_alarms = len(duration_events[duration_events['Sensor_Group'] == group])
total_sensors = group_to_sensor_count.get(group, 1) # Avoid division by zero
group_alarm_intensity[group] = total_alarms / total_sensors
# Convert to DataFrame and sort
intensity_df = pd.DataFrame(
list(group_alarm_intensity.items()),
columns=['Group', 'Alarms_Per_Sensor']
).sort_values('Alarms_Per_Sensor', ascending=False).head(10)
print("Sample of group alarm intensity:")
for _, row in intensity_df.head(5).iterrows():
print(f" {row['Group']}: {row['Alarms_Per_Sensor']:.2f} alarms per sensor")
print("\nAll enhanced plotting logic tests passed!")
print("The enhanced plotting functionality is ready to use when matplotlib and seaborn are available.")
if __name__ == '__main__':
test_enhanced_plotting()

51
test_mapping.py Normal file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
# Test script to check the mapping functionality
import pandas as pd
from alarm_analyzer import AlarmAnalyzer
def test_mapping():
print("Creating analyzer instance...")
analyzer = AlarmAnalyzer('CardinalAlarmsDec25.csv', 'SensorReport Cardinal 2025-12-23_processed.xlsx')
print("Loading data...")
alarm_data, sensor_data = analyzer.load_data()
print(f"Created sensor mapping for {len(analyzer.sensor_mapping)} sensors")
# Check if specific IDs from the alarm data are in the mapping
sample_alarm_ids = [9273, 3817, 8963, 7414, 9092, 9105, 7080, 9455, 9451, 3799]
print(f"Sample alarm IDs: {sample_alarm_ids}")
found_in_mapping = []
for alarm_id in sample_alarm_ids:
if alarm_id in analyzer.sensor_mapping:
found_in_mapping.append(alarm_id)
print(f" ID {alarm_id}: {analyzer.sensor_mapping[alarm_id]}")
else:
print(f" ID {alarm_id}: NOT FOUND")
print(f"Found {len(found_in_mapping)} out of {len(sample_alarm_ids)} sample IDs in mapping")
# Check alarm data for sensor names and groups
print(f"\nSensor_Name column in alarm data: {'Sensor_Name' in analyzer.alarm_data.columns}")
print(f"Sensor_Group column in alarm data: {'Sensor_Group' in analyzer.alarm_data.columns}")
if 'Sensor_Name' in analyzer.alarm_data.columns:
unique_names = analyzer.alarm_data['Sensor_Name'].unique()
print(f"Unique sensor names: {len(unique_names)} - {unique_names[:10]}")
if 'Sensor_Group' in analyzer.alarm_data.columns:
unique_groups = analyzer.alarm_data['Sensor_Group'].unique()
print(f"Unique sensor groups: {len(unique_groups)} - {unique_groups[:10]}")
# Check a few rows to see the mapping worked
print("\nFirst 10 rows of alarm data with sensor info:")
cols_to_show = ['Sensor_Id', 'Sensor_Name', 'Sensor_Group', 'Description']
if all(col in analyzer.alarm_data.columns for col in cols_to_show):
print(analyzer.alarm_data[cols_to_show].head(10))
else:
print("Some columns not found in alarm data")
if __name__ == "__main__":
test_mapping()