from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class User: """User model representing application users""" id: str # uuid name: str email: str can_calibrate: bool can_review: bool signature_image: Optional[str] = None @classmethod def get_by_email(cls, email: str): """Get a single user by email""" supabase = get_supabase() result = supabase.table('users').select("*").eq('email', email).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls): """List all users""" supabase = get_supabase() result = supabase.table('users').select("*").execute() return [cls(**row) for row in result.data] @dataclass class Probe: """Probe model representing physical measurement probes""" id: str # uuid model_id: str # uuid serial_number: str description: str created_at: datetime retired_at: Optional[datetime] = None @classmethod def get_by_id(cls, probe_id: str): """Get a single probe by ID""" supabase = get_supabase() result = supabase.table('probes').select("*").eq('id', probe_id).execute() if not result.data: return None row = result.data[0] # Parse datetime strings row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None return cls(**row) @classmethod def get_all(cls, limit: int = 100): """List all probes with optional limit""" supabase = get_supabase() result = supabase.table('probes').select("*").limit(limit).execute() probes = [] for row in result.data: # Parse datetime strings row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None probes.append(cls(**row)) return probes @classmethod def create(cls, model_id: str, serial_number: str, description: str = ''): """Create a new probe""" supabase = get_supabase() result = supabase.table('probes').insert({ 'model_id': model_id, 'serial_number': serial_number, 'description': description }).execute() if not result.data: raise Exception('Failed to create probe') return cls(**result.data[0]) @dataclass class Channel: """Channel model representing individual measurement channels""" id: str # uuid probe_id: str # uuid serial_number: str # [0-9A-F]{16} parameter_id: str # uuid created_at: datetime @property def parameter(self): """Get the associated Parameter object""" return Parameter.get_by_id(self.parameter_id) @classmethod def get_by_id(cls, channel_id: str): """Get a single channel by ID""" supabase = get_supabase() result = supabase.table('channels').select("*").eq('id', channel_id).execute() if not result.data: return None row = result.data[0] # Parse datetime string row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None return cls(**row) @classmethod def get_by_probe(cls, probe_id: str): """Get all channels for a specific probe""" supabase = get_supabase() result = supabase.table('channels').select("*").eq('probe_id', probe_id).execute() channels = [] for row in result.data: # Parse datetime string row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None channels.append(cls(**row)) return channels @classmethod def create(cls, probe_id: str, serial_number: str, parameter_id: str): """Create a new channel""" supabase = get_supabase() result = supabase.table('channels').insert({ 'probe_id': probe_id, 'serial_number': serial_number, 'parameter_id': parameter_id }).execute() if not result.data: raise Exception('Failed to create channel') return cls(**result.data[0]) @dataclass class WorkOrder: """Work order model representing calibration work orders""" id: str # uuid order_number: str customer_id: str # uuid assigned_to: str # uuid (FK to users.id) due_date: datetime status: str redmine: Optional[int] = None cal_type: Optional[str] = None # uuid (FK to calibration_types.id) @classmethod def get_by_id(cls, work_order_id: str): """Get a single work order by ID""" supabase = get_supabase() result = supabase.table('work_orders').select("*").eq('id', work_order_id).execute() if not result.data: return None row = result.data[0] # Parse due_date if it exists if 'due_date' in row and row['due_date']: if isinstance(row['due_date'], str): row['due_date'] = datetime.fromisoformat(row['due_date']) elif not isinstance(row['due_date'], datetime): row['due_date'] = None return cls(**row) @classmethod def get_all(cls, limit: int = 100): """List all work orders with optional limit""" supabase = get_supabase() result = supabase.table('work_orders').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def create(cls, order_number: str, customer_id: str, assigned_to: str, due_date: str, status: str, redmine: Optional[int] = None, cal_type: Optional[str] = None): """Create a new work order""" supabase = get_supabase() # Convert due_date string to ISO format if not already try: due_date_obj = datetime.fromisoformat(due_date) due_date_iso = due_date_obj.isoformat() except ValueError: due_date_iso = due_date # Fallback to original if parsing fails result = supabase.table('work_orders').insert({ 'order_number': order_number, 'customer_id': customer_id, 'assigned_to': assigned_to, 'due_date': due_date_iso, 'status': status, 'redmine': redmine, 'cal_type': cal_type }).execute() if not result.data: raise Exception('Failed to create work order') # Parse dates in the returned data row = result.data[0] row['due_date'] = datetime.fromisoformat(row['due_date']) if row['due_date'] else None return cls(**row) @dataclass class Calibration: """Calibration model representing individual channel calibrations""" id: str # uuid channel_id: str # uuid (FK to channels.id) work_order_id: str # uuid (FK to work_orders.id) calibrated_by: str # uuid (FK to users.id) std_used: str # uuid (FK to standards.id) std_cal_date: datetime std_cal_due: datetime date: datetime scale: float offset: float deviation_high: float deviation_mid: float deviation_low: float set_high: float set_mid: float set_low: float passed: bool reviewed_by: Optional[str] = None # uuid (FK to users.id) @classmethod def get_all(cls, limit: int = 100): """List all work orders with optional limit""" supabase = get_supabase() result = supabase.table('work_orders').select("*").limit(limit).execute() work_orders = [] for row in result.data if result.data else []: try: # Debug print raw data print(f"Raw work order data: {row}") # Ensure due_date is properly parsed if 'due_date' in row and row['due_date']: if isinstance(row['due_date'], str): row['due_date'] = datetime.fromisoformat(row['due_date']) elif not isinstance(row['due_date'], datetime): row['due_date'] = None # Create instance and verify date type wo = cls(**row) print(f"WorkOrder instance due_date type: {type(wo.due_date)}") work_orders.append(wo) except Exception as e: print(f"Error parsing work order data: {e}") continue return work_orders @classmethod def get_by_work_order(cls, work_order_id: str): """Get all calibrations for a specific work order""" supabase = get_supabase() result = supabase.table('calibrations').select("*").eq('work_order_id', work_order_id).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class Customer: """Customer model representing clients who request calibrations""" id: str # uuid name: str contact_name: str contact_email: str contact_phone: str @classmethod def get_by_id(cls, customer_id: str): """Get a single customer by ID""" supabase = get_supabase() result = supabase.table('customers').select("*").eq('id', customer_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all customers with optional limit""" supabase = get_supabase() try: # First try with all expected fields result = supabase.table('customers').select( "id,name,contact_name,contact_email,contact_phone" ).limit(limit).execute() except Exception as e: if 'column customers.contact_email does not exist' in str(e) or 'column customers.contact_phone does not exist' in str(e): # Fall back to simpler field names if expected ones don't exist result = supabase.table('customers').select( "id,name,contact_name,email,phone" ).limit(limit).execute() for row in result.data: if 'email' in row: row['contact_email'] = row.pop('email') if 'phone' in row: row['contact_phone'] = row.pop('phone') else: raise return [cls(**row) for row in result.data] if result.data else [] @dataclass class Standard: """Standard model representing calibration standards""" id: str # uuid make: str model: str description: str uncertainty: str calibrated_on: datetime calibration_due: datetime support_name: str support_email: str support_phone: str support_address: str @classmethod def get_by_id(cls, standard_id: str): """Get a single standard by ID""" supabase = get_supabase() result = supabase.table('standards').select("*").eq('id', standard_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all standards with optional limit""" supabase = get_supabase() result = supabase.table('standards').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class ProbeModel: """Probe model definition representing different probe types/models""" id: str # uuid model_name: str specifications: dict # jsonb @classmethod def get_by_id(cls, model_id: str): """Get a single probe model by ID""" supabase = get_supabase() result = supabase.table('probe_models').select("*").eq('id', model_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all probe models with optional limit""" supabase = get_supabase() result = supabase.table('probe_models').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class CalibrationType: """CalibrationType model representing different types of calibrations""" id: str # uuid type_name: str @classmethod def get_by_id(cls, type_id: str): """Get a single calibration type by ID""" supabase = get_supabase() result = supabase.table('calibration_types').select("*").eq('id', type_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all calibration types with optional limit""" supabase = get_supabase() result = supabase.table('calibration_types').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class Location: """Location model representing physical locations where probes are deployed""" id: str # uuid name: str address: str contact_name: str contact_email: str contact_phone: str @classmethod def get_by_id(cls, location_id: str): """Get a single location by ID""" supabase = get_supabase() result = supabase.table('locations').select("*").eq('id', location_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all locations with optional limit""" supabase = get_supabase() result = supabase.table('locations').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class ProbeLocation: """ProbeLocation model representing probe assignments to locations""" id: str # uuid probe_id: str # uuid (FK to probes.id) location_id: str # uuid (FK to locations.id) start_date: datetime end_date: Optional[datetime] = None @classmethod def get_by_id(cls, probe_location_id: str): """Get a single probe location by ID""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('id', probe_location_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_by_probe(cls, probe_id: str): """Get all location assignments for a specific probe""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('probe_id', probe_id).execute() return [cls(**row) for row in result.data] if result.data else [] @classmethod def get_by_location(cls, location_id: str): """Get all probe assignments for a specific location""" supabase = get_supabase() result = supabase.table('probe_locations').select("*").eq('location_id', location_id).execute() return [cls(**row) for row in result.data] if result.data else [] @dataclass class Parameter: """Parameter model representing measurement parameters""" id: str # uuid parameter_name: str @classmethod def get_by_id(cls, parameter_id: str): """Get a single parameter by ID""" supabase = get_supabase() result = supabase.table('parameters').select("*").eq('id', parameter_id).execute() return cls(**result.data[0]) if result.data else None @classmethod def get_all(cls, limit: int = 100): """List all parameters with optional limit""" supabase = get_supabase() result = supabase.table('parameters').select("*").limit(limit).execute() return [cls(**row) for row in result.data] if result.data else [] def get_supabase(): """Local import of supabase client to avoid circular imports""" from app.supabase_client import get_supabase as supabase_func return supabase_func()