Files
SpendingAnalysis/tests/test_integration.py
andy db06108d2b feat: complete v1 implementation - all services, UI views, and tests
Adds remaining files from parallel development:
- Services: analysis, csv_reader, forecasting, normalizer, recurring
- UI: recurring_view, settings_view, sidebar, themes (dark/light)
- Tests: analysis, csv_reader, forecasting, import_categorize,
  normalizer, recurring, integration
- App entry point (main.py) and CLAUDE.md

52 tests passing across all modules.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 14:57:46 -05:00

100 lines
3.7 KiB
Python

# tests/test_integration.py
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from src.db import Base
from src.models import *
from src.seed import seed_categories, seed_household, seed_default_rules
from src.services.importer import ImportService
from src.services.analysis import AnalysisService
from src.services.recurring import RecurringDetector
RAWDATA = Path(__file__).parent.parent / "rawdata"
def test_full_pipeline():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
session = Session(engine)
# Setup
seed_categories(session)
andrew = seed_household(session, "Andrew", "self")
donna = seed_household(session, "Donna", "wife")
seed_default_rules(session)
# Create accounts
chase = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=andrew.id)
checking = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=andrew.id, is_shared=True)
session.add_all([chase, checking])
session.flush()
svc = ImportService(session)
# Import Chase CSV
r1 = svc.import_csv(
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
account_id=chase.id,
column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category"},
amount_logic="signed",
)
assert r1["imported"] > 100
# Import Checking CSV
r2 = svc.import_csv(
RAWDATA / "Checking1.csv",
account_id=checking.id,
column_map={"date": 0, "amount": 1, "description": 4},
amount_logic="signed",
)
assert r2["imported"] > 50
# Verify some transactions got auto-categorized
categorized = session.query(Transaction).filter(Transaction.category_id.isnot(None)).count()
total = session.query(Transaction).count()
assert categorized > 0, "Expected some transactions to be auto-categorized"
print(f"Categorized {categorized}/{total} transactions")
# Verify income attributed to correct people
income_cat = session.query(Category).filter_by(name="Income").first()
if income_cat:
andrew_income = session.query(Transaction).filter(
Transaction.category_id == income_cat.id,
Transaction.attributed_to_id == andrew.id,
).count()
donna_income = session.query(Transaction).filter(
Transaction.category_id == income_cat.id,
Transaction.attributed_to_id == donna.id,
).count()
print(f"Andrew income transactions: {andrew_income}")
print(f"Donna income transactions: {donna_income}")
# Analysis works
analysis = AnalysisService(session)
monthly = analysis.spending_by_period("month")
assert len(monthly) >= 1, "Expected at least one month of data"
by_cat = analysis.spending_by_category()
assert len(by_cat) >= 1, "Expected at least one category"
by_tag = analysis.spending_by_tag()
assert len(by_tag) >= 1, "Expected at least one tag"
# Recurring detection works
detector = RecurringDetector(session)
recurring = detector.detect()
# Print what was found for debugging
for r in recurring:
print(f"Recurring: {r['description']} - ${r['typical_amount']:.2f} {r['frequency']} (${r['annual_cost']:.2f}/yr)")
# Verify no duplicate imports
r3 = svc.import_csv(
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
account_id=chase.id,
column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"},
amount_logic="signed",
)
assert r3["imported"] == 0, "Re-import should detect all duplicates"
assert r3["duplicates"] == r1["imported"]