554 lines
20 KiB
Python
554 lines
20 KiB
Python
from dataclasses import dataclass
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
|
|
@dataclass
|
|
class User:
|
|
"""User model representing application users"""
|
|
id: str # uuid
|
|
name: str
|
|
email: str
|
|
can_calibrate: bool
|
|
can_review: bool
|
|
signature_image: Optional[str] = None
|
|
|
|
@classmethod
|
|
def get_by_email(cls, email: str):
|
|
"""Get a single user by email"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('users').select("*").eq('email', email).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls):
|
|
"""List all users"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('users').select("*").execute()
|
|
return [cls(**row) for row in result.data]
|
|
|
|
@dataclass
|
|
class Probe:
|
|
"""Probe model representing physical measurement probes"""
|
|
id: str # uuid
|
|
model_id: str # uuid
|
|
serial_number: str
|
|
description: str
|
|
created_at: datetime
|
|
retired_at: Optional[datetime] = None
|
|
model_name: str = '' # Added to match dynamic attribute assignment
|
|
|
|
@classmethod
|
|
def get_by_id(cls, probe_id: str):
|
|
"""Get a single probe by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probes').select("*").eq('id', probe_id).execute()
|
|
if not result.data:
|
|
return None
|
|
row = result.data[0]
|
|
# Parse datetime strings
|
|
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
|
|
row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None
|
|
return cls(**row)
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100, active_only: bool = True):
|
|
"""List all probes with optional limit and active filter"""
|
|
supabase = get_supabase()
|
|
query = supabase.table('probes') \
|
|
.select('*, probe_models(model_name)') \
|
|
.limit(limit)
|
|
|
|
if active_only:
|
|
query = query.is_('retired_at', 'null')
|
|
|
|
result = query.execute()
|
|
|
|
probes = []
|
|
for row in result.data:
|
|
# Extract model_name before creating Probe instance
|
|
model_name = row['probe_models']['model_name'] if row.get('probe_models') else 'Unknown'
|
|
# Remove probe_models from row to avoid dataclass init error
|
|
if 'probe_models' in row:
|
|
del row['probe_models']
|
|
# Parse datetime strings
|
|
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
|
|
row['retired_at'] = datetime.fromisoformat(row['retired_at']) if row['retired_at'] else None
|
|
# Create probe instance
|
|
probe = cls(**row)
|
|
# Add model_name as dynamic attribute for template use
|
|
probe.model_name = model_name
|
|
probes.append(probe)
|
|
return probes
|
|
|
|
@classmethod
|
|
def create(cls, model_id: str, serial_number: str, description: str = ''):
|
|
"""Create a new probe"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probes').insert({
|
|
'model_id': model_id,
|
|
'serial_number': serial_number,
|
|
'description': description
|
|
}).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to create probe')
|
|
return cls(**result.data[0])
|
|
|
|
def retire(self):
|
|
"""Mark probe as retired with current timestamp"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probes').update({
|
|
'retired_at': datetime.utcnow().isoformat()
|
|
}).eq('id', self.id).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to retire probe')
|
|
self.retired_at = datetime.utcnow()
|
|
|
|
def delete(self):
|
|
"""Delete probe from database"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probes').delete().eq('id', self.id).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to delete probe')
|
|
|
|
@dataclass
|
|
class Channel:
|
|
"""Channel model representing individual measurement channels"""
|
|
id: str # uuid
|
|
probe_id: str # uuid
|
|
serial_number: str # [0-9A-F]{16}
|
|
parameter_id: str # uuid
|
|
created_at: datetime
|
|
|
|
@property
|
|
def parameter(self):
|
|
"""Get the associated Parameter object"""
|
|
return Parameter.get_by_id(self.parameter_id)
|
|
|
|
@classmethod
|
|
def get_by_id(cls, channel_id: str):
|
|
"""Get a single channel by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('channels').select("*").eq('id', channel_id).execute()
|
|
if not result.data:
|
|
return None
|
|
row = result.data[0]
|
|
# Parse datetime string
|
|
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
|
|
return cls(**row)
|
|
|
|
@classmethod
|
|
def get_by_probe(cls, probe_id: str):
|
|
"""Get all channels for a specific probe"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('channels').select("*").eq('probe_id', probe_id).execute()
|
|
channels = []
|
|
for row in result.data:
|
|
# Parse datetime string
|
|
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
|
|
channels.append(cls(**row))
|
|
return channels
|
|
|
|
@classmethod
|
|
def get_by_serial(cls, serial_number: str):
|
|
"""Get channel by serial number if it exists (case-sensitive exact match)"""
|
|
serial_upper = serial_number.upper()
|
|
supabase = get_supabase()
|
|
result = supabase.table('channels').select("*").eq('serial_number', serial_upper).execute()
|
|
if not result.data:
|
|
return None
|
|
row = result.data[0]
|
|
# Parse datetime string
|
|
row['created_at'] = datetime.fromisoformat(row['created_at']) if row['created_at'] else None
|
|
return cls(**row)
|
|
|
|
@classmethod
|
|
def create(cls, probe_id: str, serial_number: str, parameter_id: str):
|
|
"""Create a new channel"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('channels').insert({
|
|
'probe_id': probe_id,
|
|
'serial_number': serial_number,
|
|
'parameter_id': parameter_id
|
|
}).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to create channel')
|
|
return cls(**result.data[0])
|
|
|
|
@dataclass
|
|
class WorkOrder:
|
|
"""Work order model representing calibration work orders"""
|
|
id: str # uuid
|
|
order_number: str
|
|
customer_id: str # uuid
|
|
assigned_to: str # uuid (FK to users.id)
|
|
due_date: datetime
|
|
status: str
|
|
redmine: Optional[int] = None
|
|
cal_type: Optional[str] = None # uuid (FK to calibration_types.id)
|
|
|
|
@classmethod
|
|
def get_by_id(cls, work_order_id: str):
|
|
"""Get a single work order by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('work_orders').select("*").eq('id', work_order_id).execute()
|
|
if not result.data:
|
|
return None
|
|
|
|
row = result.data[0]
|
|
# Parse due_date if it exists
|
|
if 'due_date' in row and row['due_date']:
|
|
if isinstance(row['due_date'], str):
|
|
row['due_date'] = datetime.fromisoformat(row['due_date'])
|
|
elif not isinstance(row['due_date'], datetime):
|
|
row['due_date'] = None
|
|
|
|
return cls(**row)
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all work orders with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('work_orders').select("*").limit(limit).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@classmethod
|
|
def create(cls, order_number: str, customer_id: str, assigned_to: str,
|
|
due_date: str, status: str, redmine: Optional[int] = None,
|
|
cal_type: Optional[str] = None):
|
|
"""Create a new work order"""
|
|
supabase = get_supabase()
|
|
# Convert due_date string to ISO format if not already
|
|
try:
|
|
due_date_obj = datetime.fromisoformat(due_date)
|
|
due_date_iso = due_date_obj.isoformat()
|
|
except ValueError:
|
|
due_date_iso = due_date # Fallback to original if parsing fails
|
|
|
|
result = supabase.table('work_orders').insert({
|
|
'order_number': order_number,
|
|
'customer_id': customer_id,
|
|
'assigned_to': assigned_to,
|
|
'due_date': due_date_iso,
|
|
'status': status,
|
|
'redmine': redmine,
|
|
'cal_type': cal_type
|
|
}).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to create work order')
|
|
|
|
# Parse dates in the returned data
|
|
row = result.data[0]
|
|
row['due_date'] = datetime.fromisoformat(row['due_date']) if row['due_date'] else None
|
|
return cls(**row)
|
|
|
|
@dataclass
|
|
class Calibration:
|
|
"""Calibration model representing individual channel calibrations"""
|
|
id: str # uuid
|
|
channel_id: str # uuid (FK to channels.id)
|
|
work_order_id: str # uuid (FK to work_orders.id)
|
|
calibrated_by: str # uuid (FK to users.id)
|
|
std_used: str # uuid (FK to standards.id)
|
|
std_cal_date: datetime
|
|
std_cal_due: datetime
|
|
date: datetime
|
|
scale: float
|
|
offset: float
|
|
deviation_high: float
|
|
deviation_mid: float
|
|
deviation_low: float
|
|
set_high: float
|
|
set_mid: float
|
|
set_low: float
|
|
passed: bool
|
|
reviewed_by: Optional[str] = None # uuid (FK to users.id)
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all calibrations with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('calibrations').select("*").limit(limit).execute()
|
|
calibrations = []
|
|
for row in result.data if result.data else []:
|
|
try:
|
|
# Parse datetime strings
|
|
for date_field in ['std_cal_date', 'std_cal_due', 'date']:
|
|
if row.get(date_field):
|
|
row[date_field] = datetime.fromisoformat(row[date_field])
|
|
|
|
calibrations.append(cls(**row))
|
|
except Exception as e:
|
|
print(f"Error parsing calibration data: {e}")
|
|
continue
|
|
return calibrations
|
|
|
|
@classmethod
|
|
def get_by_work_order(cls, work_order_id: str):
|
|
"""Get all calibrations for a specific work order"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('calibrations').select("*").eq('work_order_id', work_order_id).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@classmethod
|
|
def get_by_channel(cls, channel_id: str):
|
|
"""Get all calibrations for a specific channel"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('calibrations').select("*").eq('channel_id', channel_id).execute()
|
|
calibrations = []
|
|
for row in result.data if result.data else []:
|
|
try:
|
|
# Parse datetime strings
|
|
for date_field in ['std_cal_date', 'std_cal_due', 'date']:
|
|
if row.get(date_field):
|
|
row[date_field] = datetime.fromisoformat(row[date_field])
|
|
|
|
calibrations.append(cls(**row))
|
|
except Exception as e:
|
|
print(f"Error parsing calibration data: {e}")
|
|
continue
|
|
return calibrations
|
|
|
|
@dataclass
|
|
class Customer:
|
|
"""Customer model representing clients who request calibrations"""
|
|
id: str # uuid
|
|
name: str
|
|
contact_name: str
|
|
contact_email: str
|
|
contact_phone: str
|
|
|
|
@classmethod
|
|
def get_by_id(cls, customer_id: str):
|
|
"""Get a single customer by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('customers').select("*").eq('id', customer_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all customers with optional limit"""
|
|
supabase = get_supabase()
|
|
try:
|
|
# First try with all expected fields
|
|
result = supabase.table('customers').select(
|
|
"id,name,contact_name,contact_email,contact_phone"
|
|
).limit(limit).execute()
|
|
except Exception as e:
|
|
if 'column customers.contact_email does not exist' in str(e) or 'column customers.contact_phone does not exist' in str(e):
|
|
# Fall back to simpler field names if expected ones don't exist
|
|
result = supabase.table('customers').select(
|
|
"id,name,contact_name,email,phone"
|
|
).limit(limit).execute()
|
|
for row in result.data:
|
|
if 'email' in row:
|
|
row['contact_email'] = row.pop('email')
|
|
if 'phone' in row:
|
|
row['contact_phone'] = row.pop('phone')
|
|
else:
|
|
raise
|
|
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@dataclass
|
|
class Standard:
|
|
"""Standard model representing calibration standards"""
|
|
id: str # uuid
|
|
make: str
|
|
model: str
|
|
description: str
|
|
uncertainty: str
|
|
calibrated_on: datetime
|
|
calibration_due: datetime
|
|
support_name: str
|
|
support_email: str
|
|
support_phone: str
|
|
support_address: str
|
|
|
|
@classmethod
|
|
def get_by_id(cls, standard_id: str):
|
|
"""Get a single standard by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('standards').select("*").eq('id', standard_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all standards with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('standards').select("*").limit(limit).execute()
|
|
standards = []
|
|
for row in result.data if result.data else []:
|
|
# Parse datetime strings
|
|
for date_field in ['calibrated_on', 'calibration_due']:
|
|
if row.get(date_field):
|
|
row[date_field] = datetime.fromisoformat(row[date_field])
|
|
standards.append(cls(**row))
|
|
return standards
|
|
|
|
@classmethod
|
|
def create(cls, make: str, model: str, description: str, uncertainty: str,
|
|
calibrated_on: str, calibration_due: str, support_name: str,
|
|
support_email: str, support_phone: str, support_address: str):
|
|
"""Create a new standard"""
|
|
supabase = get_supabase()
|
|
# Convert date strings to ISO format
|
|
calibrated_on_iso = datetime.fromisoformat(calibrated_on).isoformat()
|
|
calibration_due_iso = datetime.fromisoformat(calibration_due).isoformat()
|
|
|
|
result = supabase.table('standards').insert({
|
|
'make': make,
|
|
'model': model,
|
|
'description': description,
|
|
'uncertainty': uncertainty,
|
|
'calibrated_on': calibrated_on_iso,
|
|
'calibration_due': calibration_due_iso,
|
|
'support_name': support_name,
|
|
'support_email': support_email,
|
|
'support_phone': support_phone,
|
|
'support_address': support_address
|
|
}).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to create standard')
|
|
|
|
# Parse dates in the returned data
|
|
row = result.data[0]
|
|
row['calibrated_on'] = datetime.fromisoformat(row['calibrated_on']) if row['calibrated_on'] else None
|
|
row['calibration_due'] = datetime.fromisoformat(row['calibration_due']) if row['calibration_due'] else None
|
|
return cls(**row)
|
|
|
|
@dataclass
|
|
class ProbeModel:
|
|
"""Probe model definition representing different probe types/models"""
|
|
id: str # uuid
|
|
model_name: str
|
|
specifications: dict # jsonb
|
|
|
|
@classmethod
|
|
def get_by_id(cls, model_id: str):
|
|
"""Get a single probe model by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probe_models').select("*").eq('id', model_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all probe models with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probe_models').select("*").limit(limit).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@dataclass
|
|
class CalibrationType:
|
|
"""CalibrationType model representing different types of calibrations"""
|
|
id: str # uuid
|
|
type_name: str
|
|
|
|
@classmethod
|
|
def get_by_id(cls, type_id: str):
|
|
"""Get a single calibration type by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('calibration_types').select("*").eq('id', type_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all calibration types with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('calibration_types').select("*").limit(limit).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@dataclass
|
|
class Location:
|
|
"""Location model representing physical locations where probes are deployed"""
|
|
id: str # uuid
|
|
name: str
|
|
address: str
|
|
contact_name: str
|
|
contact_email: str
|
|
contact_phone: str
|
|
|
|
@classmethod
|
|
def get_by_id(cls, location_id: str):
|
|
"""Get a single location by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('locations').select("*").eq('id', location_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all locations with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('locations').select("*").limit(limit).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@classmethod
|
|
def create(cls, name: str, address: str, contact_name: str,
|
|
contact_email: str, contact_phone: str):
|
|
"""Create a new location"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('locations').insert({
|
|
'name': name,
|
|
'address': address,
|
|
'contact_name': contact_name,
|
|
'contact_email': contact_email,
|
|
'contact_phone': contact_phone
|
|
}).execute()
|
|
if not result.data:
|
|
raise Exception('Failed to create location')
|
|
return cls(**result.data[0])
|
|
|
|
@dataclass
|
|
class ProbeLocation:
|
|
"""ProbeLocation model representing probe assignments to locations"""
|
|
id: str # uuid
|
|
probe_id: str # uuid (FK to probes.id)
|
|
location_id: str # uuid (FK to locations.id)
|
|
start_date: datetime
|
|
end_date: Optional[datetime] = None
|
|
|
|
@classmethod
|
|
def get_by_id(cls, probe_location_id: str):
|
|
"""Get a single probe location by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probe_locations').select("*").eq('id', probe_location_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_by_probe(cls, probe_id: str):
|
|
"""Get all location assignments for a specific probe"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probe_locations').select("*").eq('probe_id', probe_id).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@classmethod
|
|
def get_by_location(cls, location_id: str):
|
|
"""Get all probe assignments for a specific location"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('probe_locations').select("*").eq('location_id', location_id).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
@dataclass
|
|
class Parameter:
|
|
"""Parameter model representing measurement parameters"""
|
|
id: str # uuid
|
|
parameter_name: str
|
|
|
|
@classmethod
|
|
def get_by_id(cls, parameter_id: str):
|
|
"""Get a single parameter by ID"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('parameters').select("*").eq('id', parameter_id).execute()
|
|
return cls(**result.data[0]) if result.data else None
|
|
|
|
@classmethod
|
|
def get_all(cls, limit: int = 100):
|
|
"""List all parameters with optional limit"""
|
|
supabase = get_supabase()
|
|
result = supabase.table('parameters').select("*").limit(limit).execute()
|
|
return [cls(**row) for row in result.data] if result.data else []
|
|
|
|
def get_supabase():
|
|
"""Local import of supabase client to avoid circular imports"""
|
|
from app.supabase_client import get_supabase as supabase_func
|
|
return supabase_func()
|