Files
SmartScanProbeTrack/app/models.py
2025-07-29 10:51:42 -04:00

554 lines
20 KiB
Python

from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class User:
"""User model representing application users"""
id: str # uuid
name: str
email: str
can_calibrate: bool
can_review: bool
signature_image: Optional[str] = None
@classmethod
def get_by_email(cls, email: str):
"""Get a single user by email"""
supabase = get_supabase()
result = supabase.table('users').select("*").eq('email', email).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls):
"""List all users"""
supabase = get_supabase()
result = supabase.table('users').select("*").execute()
return [cls(**row) for row in result.data]
@dataclass
class Probe:
"""Probe model representing physical measurement probes"""
id: str # uuid
model_id: str # uuid
serial_number: str
description: str
created_at: datetime
retired_at: Optional[datetime] = None
model_name: str = '' # Added to match dynamic attribute assignment
@classmethod
def get_by_id(cls, probe_id: str):
"""Get a single probe by ID"""
supabase = get_supabase()
result = supabase.table('probes').select("*").eq('id', probe_id).execute()
if not result.data:
return None
row = result.data[0]
# Parse datetime strings
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None
return cls(**row)
@classmethod
def get_all(cls, limit: int = 100, active_only: bool = True):
"""List all probes with optional limit and active filter"""
supabase = get_supabase()
query = supabase.table('probes') \
.select('*, probe_models(model_name)') \
.limit(limit)
if active_only:
query = query.is_('retired_at', 'null')
result = query.execute()
probes = []
for row in result.data:
# Extract model_name before creating Probe instance
model_name = row['probe_models']['model_name'] if row.get('probe_models') else 'Unknown'
# Remove probe_models from row to avoid dataclass init error
if 'probe_models' in row:
del row['probe_models']
# Parse datetime strings
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None
# Create probe instance
probe = cls(**row)
# Add model_name as dynamic attribute for template use
probe.model_name = model_name
probes.append(probe)
return probes
@classmethod
def create(cls, model_id: str, serial_number: str, description: str = ''):
"""Create a new probe"""
supabase = get_supabase()
result = supabase.table('probes').insert({
'model_id': model_id,
'serial_number': serial_number,
'description': description
}).execute()
if not result.data:
raise Exception('Failed to create probe')
return cls(**result.data[0])
def retire(self):
"""Mark probe as retired with current timestamp"""
supabase = get_supabase()
result = supabase.table('probes').update({
'retired_at': datetime.utcnow().isoformat()
}).eq('id', self.id).execute()
if not result.data:
raise Exception('Failed to retire probe')
self.retired_at = datetime.utcnow()
def delete(self):
"""Delete probe from database"""
supabase = get_supabase()
result = supabase.table('probes').delete().eq('id', self.id).execute()
if not result.data:
raise Exception('Failed to delete probe')
@dataclass
class Channel:
"""Channel model representing individual measurement channels"""
id: str # uuid
probe_id: str # uuid
serial_number: str # [0-9A-F]{16}
parameter_id: str # uuid
created_at: datetime
@property
def parameter(self):
"""Get the associated Parameter object"""
return Parameter.get_by_id(self.parameter_id)
@classmethod
def get_by_id(cls, channel_id: str):
"""Get a single channel by ID"""
supabase = get_supabase()
result = supabase.table('channels').select("*").eq('id', channel_id).execute()
if not result.data:
return None
row = result.data[0]
# Parse datetime string
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
return cls(**row)
@classmethod
def get_by_probe(cls, probe_id: str):
"""Get all channels for a specific probe"""
supabase = get_supabase()
result = supabase.table('channels').select("*").eq('probe_id', probe_id).execute()
channels = []
for row in result.data:
# Parse datetime string
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
channels.append(cls(**row))
return channels
@classmethod
def get_by_serial(cls, serial_number: str):
"""Get channel by serial number if it exists (case-sensitive exact match)"""
serial_upper = serial_number.upper()
supabase = get_supabase()
result = supabase.table('channels').select("*").eq('serial_number', serial_upper).execute()
if not result.data:
return None
row = result.data[0]
# Parse datetime string
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
return cls(**row)
@classmethod
def create(cls, probe_id: str, serial_number: str, parameter_id: str):
"""Create a new channel"""
supabase = get_supabase()
result = supabase.table('channels').insert({
'probe_id': probe_id,
'serial_number': serial_number,
'parameter_id': parameter_id
}).execute()
if not result.data:
raise Exception('Failed to create channel')
return cls(**result.data[0])
@dataclass
class WorkOrder:
"""Work order model representing calibration work orders"""
id: str # uuid
order_number: str
customer_id: str # uuid
assigned_to: str # uuid (FK to users.id)
due_date: datetime
status: str
redmine: Optional[int] = None
cal_type: Optional[str] = None # uuid (FK to calibration_types.id)
@classmethod
def get_by_id(cls, work_order_id: str):
"""Get a single work order by ID"""
supabase = get_supabase()
result = supabase.table('work_orders').select("*").eq('id', work_order_id).execute()
if not result.data:
return None
row = result.data[0]
# Parse due_date if it exists
if 'due_date' in row and row['due_date']:
if isinstance(row['due_date'], str):
row['due_date'] = datetime.fromisoformat(row['due_date'])
elif not isinstance(row['due_date'], datetime):
row['due_date'] = None
return cls(**row)
@classmethod
def get_all(cls, limit: int = 100):
"""List all work orders with optional limit"""
supabase = get_supabase()
result = supabase.table('work_orders').select("*").limit(limit).execute()
return [cls(**row) for row in result.data] if result.data else []
@classmethod
def create(cls, order_number: str, customer_id: str, assigned_to: str,
due_date: str, status: str, redmine: Optional[int] = None,
cal_type: Optional[str] = None):
"""Create a new work order"""
supabase = get_supabase()
# Convert due_date string to ISO format if not already
try:
due_date_obj = datetime.fromisoformat(due_date)
due_date_iso = due_date_obj.isoformat()
except ValueError:
due_date_iso = due_date # Fallback to original if parsing fails
result = supabase.table('work_orders').insert({
'order_number': order_number,
'customer_id': customer_id,
'assigned_to': assigned_to,
'due_date': due_date_iso,
'status': status,
'redmine': redmine,
'cal_type': cal_type
}).execute()
if not result.data:
raise Exception('Failed to create work order')
# Parse dates in the returned data
row = result.data[0]
row['due_date'] = datetime.fromisoformat(row['due_date']) if row['due_date'] else None
return cls(**row)
@dataclass
class Calibration:
"""Calibration model representing individual channel calibrations"""
id: str # uuid
channel_id: str # uuid (FK to channels.id)
work_order_id: str # uuid (FK to work_orders.id)
calibrated_by: str # uuid (FK to users.id)
std_used: str # uuid (FK to standards.id)
std_cal_date: datetime
std_cal_due: datetime
date: datetime
scale: float
offset: float
deviation_high: float
deviation_mid: float
deviation_low: float
set_high: float
set_mid: float
set_low: float
passed: bool
reviewed_by: Optional[str] = None # uuid (FK to users.id)
@classmethod
def get_all(cls, limit: int = 100):
"""List all calibrations with optional limit"""
supabase = get_supabase()
result = supabase.table('calibrations').select("*").limit(limit).execute()
calibrations = []
for row in result.data if result.data else []:
try:
# Parse datetime strings
for date_field in ['std_cal_date', 'std_cal_due', 'date']:
if row.get(date_field):
row[date_field] = datetime.fromisoformat(row[date_field])
calibrations.append(cls(**row))
except Exception as e:
print(f"Error parsing calibration data: {e}")
continue
return calibrations
@classmethod
def get_by_work_order(cls, work_order_id: str):
"""Get all calibrations for a specific work order"""
supabase = get_supabase()
result = supabase.table('calibrations').select("*").eq('work_order_id', work_order_id).execute()
return [cls(**row) for row in result.data] if result.data else []
@classmethod
def get_by_channel(cls, channel_id: str):
"""Get all calibrations for a specific channel"""
supabase = get_supabase()
result = supabase.table('calibrations').select("*").eq('channel_id', channel_id).execute()
calibrations = []
for row in result.data if result.data else []:
try:
# Parse datetime strings
for date_field in ['std_cal_date', 'std_cal_due', 'date']:
if row.get(date_field):
row[date_field] = datetime.fromisoformat(row[date_field])
calibrations.append(cls(**row))
except Exception as e:
print(f"Error parsing calibration data: {e}")
continue
return calibrations
@dataclass
class Customer:
"""Customer model representing clients who request calibrations"""
id: str # uuid
name: str
contact_name: str
contact_email: str
contact_phone: str
@classmethod
def get_by_id(cls, customer_id: str):
"""Get a single customer by ID"""
supabase = get_supabase()
result = supabase.table('customers').select("*").eq('id', customer_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all customers with optional limit"""
supabase = get_supabase()
try:
# First try with all expected fields
result = supabase.table('customers').select(
"id,name,contact_name,contact_email,contact_phone"
).limit(limit).execute()
except Exception as e:
if 'column customers.contact_email does not exist' in str(e) or 'column customers.contact_phone does not exist' in str(e):
# Fall back to simpler field names if expected ones don't exist
result = supabase.table('customers').select(
"id,name,contact_name,email,phone"
).limit(limit).execute()
for row in result.data:
if 'email' in row:
row['contact_email'] = row.pop('email')
if 'phone' in row:
row['contact_phone'] = row.pop('phone')
else:
raise
return [cls(**row) for row in result.data] if result.data else []
@dataclass
class Standard:
"""Standard model representing calibration standards"""
id: str # uuid
make: str
model: str
description: str
uncertainty: str
calibrated_on: datetime
calibration_due: datetime
support_name: str
support_email: str
support_phone: str
support_address: str
@classmethod
def get_by_id(cls, standard_id: str):
"""Get a single standard by ID"""
supabase = get_supabase()
result = supabase.table('standards').select("*").eq('id', standard_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all standards with optional limit"""
supabase = get_supabase()
result = supabase.table('standards').select("*").limit(limit).execute()
standards = []
for row in result.data if result.data else []:
# Parse datetime strings
for date_field in ['calibrated_on', 'calibration_due']:
if row.get(date_field):
row[date_field] = datetime.fromisoformat(row[date_field])
standards.append(cls(**row))
return standards
@classmethod
def create(cls, make: str, model: str, description: str, uncertainty: str,
calibrated_on: str, calibration_due: str, support_name: str,
support_email: str, support_phone: str, support_address: str):
"""Create a new standard"""
supabase = get_supabase()
# Convert date strings to ISO format
calibrated_on_iso = datetime.fromisoformat(calibrated_on).isoformat()
calibration_due_iso = datetime.fromisoformat(calibration_due).isoformat()
result = supabase.table('standards').insert({
'make': make,
'model': model,
'description': description,
'uncertainty': uncertainty,
'calibrated_on': calibrated_on_iso,
'calibration_due': calibration_due_iso,
'support_name': support_name,
'support_email': support_email,
'support_phone': support_phone,
'support_address': support_address
}).execute()
if not result.data:
raise Exception('Failed to create standard')
# Parse dates in the returned data
row = result.data[0]
row['calibrated_on'] = datetime.fromisoformat(row['calibrated_on']) if row['calibrated_on'] else None
row['calibration_due'] = datetime.fromisoformat(row['calibration_due']) if row['calibration_due'] else None
return cls(**row)
@dataclass
class ProbeModel:
"""Probe model definition representing different probe types/models"""
id: str # uuid
model_name: str
specifications: dict # jsonb
@classmethod
def get_by_id(cls, model_id: str):
"""Get a single probe model by ID"""
supabase = get_supabase()
result = supabase.table('probe_models').select("*").eq('id', model_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all probe models with optional limit"""
supabase = get_supabase()
result = supabase.table('probe_models').select("*").limit(limit).execute()
return [cls(**row) for row in result.data] if result.data else []
@dataclass
class CalibrationType:
"""CalibrationType model representing different types of calibrations"""
id: str # uuid
type_name: str
@classmethod
def get_by_id(cls, type_id: str):
"""Get a single calibration type by ID"""
supabase = get_supabase()
result = supabase.table('calibration_types').select("*").eq('id', type_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all calibration types with optional limit"""
supabase = get_supabase()
result = supabase.table('calibration_types').select("*").limit(limit).execute()
return [cls(**row) for row in result.data] if result.data else []
@dataclass
class Location:
"""Location model representing physical locations where probes are deployed"""
id: str # uuid
name: str
address: str
contact_name: str
contact_email: str
contact_phone: str
@classmethod
def get_by_id(cls, location_id: str):
"""Get a single location by ID"""
supabase = get_supabase()
result = supabase.table('locations').select("*").eq('id', location_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all locations with optional limit"""
supabase = get_supabase()
result = supabase.table('locations').select("*").limit(limit).execute()
return [cls(**row) for row in result.data] if result.data else []
@classmethod
def create(cls, name: str, address: str, contact_name: str,
contact_email: str, contact_phone: str):
"""Create a new location"""
supabase = get_supabase()
result = supabase.table('locations').insert({
'name': name,
'address': address,
'contact_name': contact_name,
'contact_email': contact_email,
'contact_phone': contact_phone
}).execute()
if not result.data:
raise Exception('Failed to create location')
return cls(**result.data[0])
@dataclass
class ProbeLocation:
"""ProbeLocation model representing probe assignments to locations"""
id: str # uuid
probe_id: str # uuid (FK to probes.id)
location_id: str # uuid (FK to locations.id)
start_date: datetime
end_date: Optional[datetime] = None
@classmethod
def get_by_id(cls, probe_location_id: str):
"""Get a single probe location by ID"""
supabase = get_supabase()
result = supabase.table('probe_locations').select("*").eq('id', probe_location_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_by_probe(cls, probe_id: str):
"""Get all location assignments for a specific probe"""
supabase = get_supabase()
result = supabase.table('probe_locations').select("*").eq('probe_id', probe_id).execute()
return [cls(**row) for row in result.data] if result.data else []
@classmethod
def get_by_location(cls, location_id: str):
"""Get all probe assignments for a specific location"""
supabase = get_supabase()
result = supabase.table('probe_locations').select("*").eq('location_id', location_id).execute()
return [cls(**row) for row in result.data] if result.data else []
@dataclass
class Parameter:
"""Parameter model representing measurement parameters"""
id: str # uuid
parameter_name: str
@classmethod
def get_by_id(cls, parameter_id: str):
"""Get a single parameter by ID"""
supabase = get_supabase()
result = supabase.table('parameters').select("*").eq('id', parameter_id).execute()
return cls(**result.data[0]) if result.data else None
@classmethod
def get_all(cls, limit: int = 100):
"""List all parameters with optional limit"""
supabase = get_supabase()
result = supabase.table('parameters').select("*").limit(limit).execute()
return [cls(**row) for row in result.data] if result.data else []
def get_supabase():
"""Local import of supabase client to avoid circular imports"""
from app.supabase_client import get_supabase as supabase_func
return supabase_func()