128 lines
4.0 KiB
Python
128 lines
4.0 KiB
Python
from supabase import create_client, Client
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
load_dotenv()
|
|
|
|
url: str = os.getenv('SUPABASE_URL')
|
|
key: str = os.getenv('SUPABASE_KEY')
|
|
|
|
def get_supabase() -> Client:
|
|
"""Initialize and return Supabase client"""
|
|
return create_client(url, key)
|
|
|
|
def test_connection():
|
|
"""Test the Supabase connection"""
|
|
supabase = get_supabase()
|
|
try:
|
|
# Simple query to test connection
|
|
data = supabase.table('probes').select("*").limit(1).execute()
|
|
return True if data.data else False
|
|
except Exception as e:
|
|
print(f"Connection test failed: {e}")
|
|
return False
|
|
|
|
# CRUD Operations
|
|
def create_probe(probe_data: dict):
|
|
"""Create a new probe record"""
|
|
supabase = get_supabase()
|
|
return supabase.table('probes').insert(probe_data).execute()
|
|
|
|
def get_probe(probe_id: str):
|
|
"""Get a single probe by ID"""
|
|
supabase = get_supabase()
|
|
return supabase.table('probes').select("*").eq('id', probe_id).execute()
|
|
|
|
def update_probe(probe_id: str, update_data: dict):
|
|
"""Update a probe record"""
|
|
supabase = get_supabase()
|
|
return supabase.table('probes').update(update_data).eq('id', probe_id).execute()
|
|
|
|
def delete_probe(probe_id: str):
|
|
"""Delete a probe record"""
|
|
supabase = get_supabase()
|
|
return supabase.table('probes').delete().eq('id', probe_id).execute()
|
|
|
|
def list_probes(limit: int = 100):
|
|
"""List all probes with optional limit"""
|
|
supabase = get_supabase()
|
|
return supabase.table('probes').select("*").limit(limit).execute()
|
|
|
|
def create_auth_audit_table():
|
|
"""Create auth_audit table if it doesn't exist"""
|
|
supabase = get_supabase()
|
|
supabase.rpc('create_auth_audit_table').execute()
|
|
|
|
def execute_sql_file(sql_file_path: str):
|
|
"""Execute SQL from a file"""
|
|
supabase = get_supabase()
|
|
|
|
# First ensure the execute_sql function exists
|
|
with open('sql/create_execute_sql_function.sql', 'r') as f:
|
|
create_func_sql = f.read()
|
|
supabase.rpc('execute_sql', {'sql_text': create_func_sql}).execute()
|
|
|
|
# Now execute the target SQL file
|
|
with open(sql_file_path, 'r') as f:
|
|
sql = f.read()
|
|
supabase.rpc('execute_sql', {'sql_text': sql}).execute()
|
|
|
|
return {'data': True} # Return success if no exceptions
|
|
|
|
def create_tables():
|
|
"""Create all required tables"""
|
|
supabase = get_supabase()
|
|
# Execute the core tables SQL
|
|
with open('sql/create_tables.sql', 'r') as f:
|
|
sql = f.read()
|
|
supabase.rpc('create_core_tables').execute()
|
|
# Create audit table
|
|
create_auth_audit_table()
|
|
|
|
def test_user_retrieval():
|
|
"""Test retrieving users from database"""
|
|
supabase = get_supabase()
|
|
try:
|
|
result = supabase.table('users').select("*").execute()
|
|
print(f"Found {len(result.data)} users:")
|
|
for user in result.data:
|
|
print(f"- {user['name']} ({user['email']})")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error retrieving users: {e}")
|
|
return False
|
|
|
|
if __name__ == '__main__':
|
|
if test_connection():
|
|
# Test user retrieval
|
|
print("\nTesting user retrieval:")
|
|
test_user_retrieval()
|
|
print("Supabase connection successful!")
|
|
|
|
# Test CRUD operations
|
|
test_probe = {
|
|
'model_id': '00000000-0000-0000-0000-000000000000',
|
|
'serial_number': 'TEST123',
|
|
'description': 'Test probe'
|
|
}
|
|
|
|
print("\nTesting CRUD operations:")
|
|
# Create
|
|
create_result = create_probe(test_probe)
|
|
print(f"Create: {create_result.data}")
|
|
|
|
# Read
|
|
probe_id = create_result.data[0]['id']
|
|
read_result = get_probe(probe_id)
|
|
print(f"Read: {read_result.data}")
|
|
|
|
# Update
|
|
update_result = update_probe(probe_id, {'description': 'Updated test probe'})
|
|
print(f"Update: {update_result.data}")
|
|
|
|
# Delete
|
|
delete_result = delete_probe(probe_id)
|
|
print(f"Delete: {delete_result.data}")
|
|
else:
|
|
print("Supabase connection failed")
|