# tests/test_integration.py from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories, seed_household, seed_default_rules from src.services.importer import ImportService from src.services.analysis import AnalysisService from src.services.recurring import RecurringDetector RAWDATA = Path(__file__).parent.parent / "rawdata" def test_full_pipeline(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) session = Session(engine) # Setup seed_categories(session) andrew = seed_household(session, "Andrew", "self") donna = seed_household(session, "Donna", "wife") seed_default_rules(session) # Create accounts chase = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=andrew.id) checking = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=andrew.id, is_shared=True) session.add_all([chase, checking]) session.flush() svc = ImportService(session) # Import Chase CSV r1 = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=chase.id, column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category"}, amount_logic="signed", ) assert r1["imported"] > 100 # Import Checking CSV r2 = svc.import_csv( RAWDATA / "Checking1.csv", account_id=checking.id, column_map={"date": 0, "amount": 1, "description": 4}, amount_logic="signed", ) assert r2["imported"] > 50 # Verify some transactions got auto-categorized categorized = session.query(Transaction).filter(Transaction.category_id.isnot(None)).count() total = session.query(Transaction).count() assert categorized > 0, "Expected some transactions to be auto-categorized" print(f"Categorized {categorized}/{total} transactions") # Verify income attributed to correct people income_cat = session.query(Category).filter_by(name="Income").first() if income_cat: andrew_income = session.query(Transaction).filter( Transaction.category_id == income_cat.id, Transaction.attributed_to_id == andrew.id, ).count() donna_income = session.query(Transaction).filter( Transaction.category_id == income_cat.id, Transaction.attributed_to_id == donna.id, ).count() print(f"Andrew income transactions: {andrew_income}") print(f"Donna income transactions: {donna_income}") # Analysis works analysis = AnalysisService(session) monthly = analysis.spending_by_period("month") assert len(monthly) >= 1, "Expected at least one month of data" by_cat = analysis.spending_by_category() assert len(by_cat) >= 1, "Expected at least one category" by_tag = analysis.spending_by_tag() assert len(by_tag) >= 1, "Expected at least one tag" # Recurring detection works detector = RecurringDetector(session) recurring = detector.detect() # Print what was found for debugging for r in recurring: print(f"Recurring: {r['description']} - ${r['typical_amount']:.2f} {r['frequency']} (${r['annual_cost']:.2f}/yr)") # Verify no duplicate imports r3 = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=chase.id, column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"}, amount_logic="signed", ) assert r3["imported"] == 0, "Re-import should detect all duplicates" assert r3["duplicates"] == r1["imported"]