from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class User: """User model representing application users""" id: str # uuid name: str email: str can_calibrate: bool can_review: bool signature_image: Optional[str] = None @classmethod def get_by_email(cls, email: str): """Get a single user by email""" supabase = get_supabase() result = supabase.table('users').select("*").eq('email', email).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls): """List all users""" supabase = get_supabase() result = supabase.table('users').select("*").execute() return [cls(**row) for row in result.data] @dataclass class Probe: """Probe model representing physical measurement probes""" id: str # uuid model_id: str # uuid serial_number: str description: str created_at: datetime retired_at: Optional[datetime] = None model_name: str = '' # Added to match dynamic attribute assignment @classmethod def get_by_id(cls, probe_id: str): """Get a single probe by ID""" supabase = get_supabase() result = supabase.table('probes').select("*").eq('id', probe_id).execute() if not result.data: return None row = result.data[0] # Parse datetime strings row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None return cls(**row) @classmethod def get_all(cls, limit: int = 100, active_only: bool = True): """List all probes with optional limit and active filter""" supabase = get_supabase() query = supabase.table('probes') \ .select('*, probe_models(model_name)') \ .limit(limit) if active_only: query = query.is_('retired_at', 'null') result = query.execute() probes = [] for row in result.data: # Extract model_name before creating Probe instance model_name = row['probe_models']['model_name'] if row.get('probe_models') else 'Unknown' # Remove probe_models from row to avoid dataclass init error if 'probe_models' in row: del row['probe_models'] # Parse datetime strings row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None # Create probe instance probe = cls(**row) # Add model_name as dynamic attribute for template use probe.model_name = model_name probes.append(probe) return probes @classmethod def create(cls, model_id: str, serial_number: str, description: str = ''): """Create a new probe""" supabase = get_supabase() result = supabase.table('probes').insert({ 'model_id': model_id, 'serial_number': serial_number, 'description': description }).execute() if not result.data: raise Exception('Failed to create probe') return cls(**result.data[0]) def retire(self): """Mark probe as retired with current timestamp""" supabase = get_supabase() result = supabase.table('probes').update({ 'retired_at': datetime.utcnow().isoformat() }).eq('id', self.id).execute() if not result.data: raise Exception('Failed to retire probe') self.retired_at = datetime.utcnow() def delete(self): """Delete probe from database""" supabase = get_supabase() result = supabase.table('probes').delete().eq('id', self.id).execute() if not result.data: raise Exception('Failed to delete probe') @dataclass class Channel: """Channel model representing individual measurement channels""" id: str # uuid probe_id: str # uuid serial_number: str # [0-9A-F]{16} parameter_id: str # uuid created_at: datetime @property def parameter(self): """Get the associated Parameter object""" return Parameter.get_by_id(self.parameter_id) @classmethod def get_by_id(cls, channel_id: str): """Get a single channel by ID""" supabase = get_supabase() result = supabase.table('channels').select("*").eq('id', channel_id).execute() if not result.data: return None row = result.data[0] # Parse datetime string row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None return cls(**row) @classmethod def get_by_probe(cls, probe_id: str): """Get all channels for a specific probe""" supabase = get_supabase() result = supabase.table('channels').select("*").eq('probe_id', probe_id).execute() channels = [] for row in result.data: # Parse datetime string row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None channels.append(cls(**row)) return channels @classmethod def get_by_serial(cls, serial_number: str): """Get channel by serial number if it exists (case-sensitive exact match)""" serial_upper = serial_number.upper() supabase = get_supabase() result = supabase.table('channels').select("*").eq('serial_number', serial_upper).execute() if not result.data: return None row = result.data[0] # Parse datetime string row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None return cls(**row) @classmethod def create(cls, probe_id: str, serial_number: str, parameter_id: str): """Create a new channel""" supabase = get_supabase() result = supabase.table('channels').insert({ 'probe_id': probe_id, 'serial_number': serial_number, 'parameter_id': parameter_id }).execute() if not result.data: raise Exception('Failed to create channel') return cls(**result.data[0]) @dataclass class WorkOrder: """Work order model representing calibration work orders""" id: str # uuid order_number: str customer_id: str # uuid assigned_to: str # uuid (FK to users.id) due_date: datetime status: str redmine: Optional[int] = None cal_type: Optional[str] = None # uuid (FK to calibration_types.id) @classmethod def get_by_id(cls, work_order_id: str): """Get a single work order by ID""" supabase = get_supabase() result = supabase.table('work_orders').select("*").eq('id', work_order_id).execute() if not result.data: return None row = result.data[0] # Parse due_date if it exists if 'due_date' in row and row['due_date']: if isinstance(row['due_date'], str): row['due_date'] = datetime.fromisoformat(row['due_date']) elif not isinstance(row['due_date'], datetime): row['due_date'] = None return cls(**row) @classmethod def get_all(cls, limit: int = 100): """List all work orders with optional limit""" supabase = get_supabase() result = supabase.table('work_orders').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def create(cls, order_number: str, customer_id: str, assigned_to: str, due_date: str, status: str, redmine: Optional[int] = None, cal_type: Optional[str] = None): """Create a new work order""" supabase = get_supabase() # Convert due_date string to ISO format if not already try: due_date_obj = datetime.fromisoformat(due_date) due_date_iso = due_date_obj.isoformat() except ValueError: due_date_iso = due_date # Fallback to original if parsing fails result = supabase.table('work_orders').insert({ 'order_number': order_number, 'customer_id': customer_id, 'assigned_to': assigned_to, 'due_date': due_date_iso, 'status': status, 'redmine': redmine, 'cal_type': cal_type }).execute() if not result.data: raise Exception('Failed to create work order') # Parse dates in the returned data row = result.data[0] row['due_date'] = datetime.fromisoformat(row['due_date']) if row['due_date'] else None return cls(**row) @dataclass class Calibration: """Calibration model representing individual channel calibrations""" id: str # uuid channel_id: str # uuid (FK to channels.id) work_order_id: str # uuid (FK to work_orders.id) calibrated_by: str # uuid (FK to users.id) std_used: str # uuid (FK to standards.id) std_cal_date: datetime std_cal_due: datetime date: datetime scale: float offset: float deviation_high: float deviation_mid: float deviation_low: float set_high: float set_mid: float set_low: float passed: bool reviewed_by: Optional[str] = None # uuid (FK to users.id) @classmethod def get_all(cls, limit: int = 100): """List all calibrations with optional limit""" supabase = get_supabase() result = supabase.table('calibrations').select("*").limit(limit).execute() calibrations = [] for row in result.data if result.data else []: try: # Parse datetime strings for date_field in ['std_cal_date', 'std_cal_due', 'date']: if row.get(date_field): row[date_field] = datetime.fromisoformat(row[date_field]) calibrations.append(cls(**row)) except Exception as e: print(f"Error parsing calibration data: {e}") continue return calibrations @classmethod def get_by_work_order(cls, work_order_id: str): """Get all calibrations for a specific work order""" supabase = get_supabase() result = supabase.table('calibrations').select("*").eq('work_order_id', work_order_id).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def get_by_channel(cls, channel_id: str): """Get all calibrations for a specific channel""" supabase = get_supabase() result = supabase.table('calibrations').select("*").eq('channel_id', channel_id).execute() calibrations = [] for row in result.data if result.data else []: try: # Parse datetime strings for date_field in ['std_cal_date', 'std_cal_due', 'date']: if row.get(date_field): row[date_field] = datetime.fromisoformat(row[date_field]) calibrations.append(cls(**row)) except Exception as e: print(f"Error parsing calibration data: {e}") continue return calibrations @dataclass class Customer: """Customer model representing clients who request calibrations""" id: str # uuid name: str contact_name: str contact_email: str contact_phone: str @classmethod def get_by_id(cls, customer_id: str): """Get a single customer by ID""" supabase = get_supabase() result = supabase.table('customers').select("*").eq('id', customer_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all customers with optional limit""" supabase = get_supabase() try: # First try with all expected fields result = supabase.table('customers').select( "id,name,contact_name,contact_email,contact_phone" ).limit(limit).execute() except Exception as e: if 'column customers.contact_email does not exist' in str(e) or 'column customers.contact_phone does not exist' in str(e): # Fall back to simpler field names if expected ones don't exist result = supabase.table('customers').select( "id,name,contact_name,email,phone" ).limit(limit).execute() for row in result.data: if 'email' in row: row['contact_email'] = row.pop('email') if 'phone' in row: row['contact_phone'] = row.pop('phone') else: raise return [cls(**row) for row in result.data] if result.data else [] @dataclass class Standard: """Standard model representing calibration standards""" id: str # uuid make: str model: str description: str uncertainty: str calibrated_on: datetime calibration_due: datetime support_name: str support_email: str support_phone: str support_address: str @classmethod def get_by_id(cls, standard_id: str): """Get a single standard by ID""" supabase = get_supabase() result = supabase.table('standards').select("*").eq('id', standard_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all standards with optional limit""" supabase = get_supabase() result = supabase.table('standards').select("*").limit(limit).execute() standards = [] for row in result.data if result.data else []: # Parse datetime strings for date_field in ['calibrated_on', 'calibration_due']: if row.get(date_field): row[date_field] = datetime.fromisoformat(row[date_field]) standards.append(cls(**row)) return standards @classmethod def create(cls, make: str, model: str, description: str, uncertainty: str, calibrated_on: str, calibration_due: str, support_name: str, support_email: str, support_phone: str, support_address: str): """Create a new standard""" supabase = get_supabase() # Convert date strings to ISO format calibrated_on_iso = datetime.fromisoformat(calibrated_on).isoformat() calibration_due_iso = datetime.fromisoformat(calibration_due).isoformat() result = supabase.table('standards').insert({ 'make': make, 'model': model, 'description': description, 'uncertainty': uncertainty, 'calibrated_on': calibrated_on_iso, 'calibration_due': calibration_due_iso, 'support_name': support_name, 'support_email': support_email, 'support_phone': support_phone, 'support_address': support_address }).execute() if not result.data: raise Exception('Failed to create standard') # Parse dates in the returned data row = result.data[0] row['calibrated_on'] = datetime.fromisoformat(row['calibrated_on']) if row['calibrated_on'] else None row['calibration_due'] = datetime.fromisoformat(row['calibration_due']) if row['calibration_due'] else None return cls(**row) @dataclass class ProbeModel: """Probe model definition representing different probe types/models""" id: str # uuid model_name: str specifications: dict # jsonb @classmethod def get_by_id(cls, model_id: str): """Get a single probe model by ID""" supabase = get_supabase() result = supabase.table('probe_models').select("*").eq('id', model_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all probe models with optional limit""" supabase = get_supabase() result = supabase.table('probe_models').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class CalibrationType: """CalibrationType model representing different types of calibrations""" id: str # uuid type_name: str @classmethod def get_by_id(cls, type_id: str): """Get a single calibration type by ID""" supabase = get_supabase() result = supabase.table('calibration_types').select("*").eq('id', type_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all calibration types with optional limit""" supabase = get_supabase() result = supabase.table('calibration_types').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class Location: """Location model representing physical locations where probes are deployed""" id: str # uuid name: str address: str contact_name: str contact_email: str contact_phone: str @classmethod def get_by_id(cls, location_id: str): """Get a single location by ID""" supabase = get_supabase() result = supabase.table('locations').select("*").eq('id', location_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all locations with optional limit""" supabase = get_supabase() result = supabase.table('locations').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def create(cls, name: str, address: str, contact_name: str, contact_email: str, contact_phone: str): """Create a new location""" supabase = get_supabase() result = supabase.table('locations').insert({ 'name': name, 'address': address, 'contact_name': contact_name, 'contact_email': contact_email, 'contact_phone': contact_phone }).execute() if not result.data: raise Exception('Failed to create location') return cls(**result.data[0]) @dataclass class ProbeLocation: """ProbeLocation model representing probe assignments to locations""" id: str # uuid probe_id: str # uuid (FK to probes.id) location_id: str # uuid (FK to locations.id) start_date: datetime end_date: Optional[datetime] = None @classmethod def get_by_id(cls, probe_location_id: str): """Get a single probe location by ID""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('id', probe_location_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_by_probe(cls, probe_id: str): """Get all location assignments for a specific probe""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('probe_id', probe_id).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def get_by_location(cls, location_id: str): """Get all probe assignments for a specific location""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('location_id', location_id).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class Parameter: """Parameter model representing measurement parameters""" id: str # uuid parameter_name: str @classmethod def get_by_id(cls, parameter_id: str): """Get a single parameter by ID""" supabase = get_supabase() result = supabase.table('parameters').select("*").eq('id', parameter_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all parameters with optional limit""" supabase = get_supabase() result = supabase.table('parameters').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] def get_supabase(): """Local import of supabase client to avoid circular imports""" from app.supabase_client import get_supabase as supabase_func return supabase_func()