# SpendingAnalysis Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a desktop spending analysis app that imports bank/credit card CSVs, auto-categorizes transactions, and provides visual analysis of spending habits with forecasting. **Architecture:** Three-layer (data/service/UI) Python app. SQLite via SQLAlchemy for storage, pandas for CSV/analysis, PySide6 for desktop UI, matplotlib for charts. Categorization engine behind a Protocol for future AI extensibility. **Tech Stack:** Python 3.12+, PySide6, SQLAlchemy, pandas, matplotlib, pytest --- ## Phase 1: Foundation ### Task 1: Project Setup **Files:** - Create: `pyproject.toml` - Create: `src/__init__.py` - Create: `src/models/__init__.py` - Create: `src/services/__init__.py` - Create: `src/ui/__init__.py` - Create: `tests/__init__.py` - Create: `tests/services/__init__.py` - Create: `tests/models/__init__.py` **Step 1: Create pyproject.toml** ```toml [project] name = "spending-analysis" version = "0.1.0" requires-python = ">=3.12" dependencies = [ "PySide6>=6.6", "sqlalchemy>=2.0", "pandas>=2.0", "matplotlib>=3.8", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-qt>=4.2", ] [project.scripts] spending-analysis = "src.main:main" ``` **Step 2: Create directory structure** Create all `__init__.py` files listed above (empty files). **Step 3: Create virtual environment and install** Run: `python -m venv .venv && .venv\Scripts\activate && pip install -e ".[dev]"` **Step 4: Verify installation** Run: `python -c "import PySide6; import sqlalchemy; import pandas; import matplotlib; print('OK')"` Expected: `OK` **Step 5: Commit** ```bash git add pyproject.toml src/ tests/ git commit -m "feat: project setup with dependencies" ``` --- ### Task 2: Database Models **Files:** - Create: `src/db.py` - Create: `src/models/household.py` - Create: `src/models/account.py` - Create: `src/models/category.py` - Create: `src/models/transaction.py` - Create: `src/models/rule.py` - Create: `src/models/csv_mapping.py` - Create: `tests/models/test_models.py` **Step 1: Write failing test for model creation** ```python # tests/models/test_models.py from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models.household import HouseholdMember from src.models.account import Account from src.models.category import Category from src.models.transaction import Transaction from src.models.rule import CategorizationRule from src.models.csv_mapping import CsvMapping def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def test_create_household_member(): session = make_session() member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.commit() assert member.id is not None assert member.name == "Andrew" def test_create_account_with_owner(): session = make_session() member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account( name="Chase Freedom", institution="Chase", account_type="credit", owner_id=member.id, ) session.add(account) session.commit() assert account.owner.name == "Andrew" def test_create_category(): session = make_session() cat = Category(name="Groceries", default_tag="needs") session.add(cat) session.commit() assert cat.id is not None def test_create_transaction(): session = make_session() member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id) cat = Category(name="Groceries", default_tag="needs") session.add_all([account, cat]) session.flush() txn = Transaction( date="2026-01-15", amount=-48.52, description="WAL-MART #7181", raw_description="PURCHASE AUTHORIZED ON 01/14 WAL-MART #7181 BEAUFORT SC CARD 5360", account_id=account.id, category_id=cat.id, attributed_to_id=member.id, tag="needs", ) session.add(txn) session.commit() assert txn.id is not None assert txn.account.name == "Checking" assert txn.category.name == "Groceries" def test_create_categorization_rule(): session = make_session() cat = Category(name="Groceries", default_tag="needs") session.add(cat) session.flush() rule = CategorizationRule( pattern="WAL-MART", category_id=cat.id, priority=10, ) session.add(rule) session.commit() assert rule.id is not None def test_create_csv_mapping(): session = make_session() member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() mapping = CsvMapping( name="Chase Credit Card", fingerprint="Transaction Date,Post Date,Description,Category,Type,Amount,Memo", column_map='{"date": "Transaction Date", "amount": "Amount", "description": "Description"}', amount_logic="signed", account_id=account.id, ) session.add(mapping) session.commit() assert mapping.id is not None ``` **Step 2: Run test to verify it fails** Run: `pytest tests/models/test_models.py -v` Expected: FAIL (imports not found) **Step 3: Implement db.py and all models** ```python # src/db.py from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import DeclarativeBase, Session class Base(DeclarativeBase): pass def get_engine(db_path: Path | None = None): if db_path is None: db_path = Path.home() / ".spending_analysis" / "spending.db" db_path.parent.mkdir(parents=True, exist_ok=True) return create_engine(f"sqlite:///{db_path}") def get_session(db_path: Path | None = None) -> Session: engine = get_engine(db_path) Base.metadata.create_all(engine) return Session(engine) ``` ```python # src/models/household.py from sqlalchemy import String from sqlalchemy.orm import Mapped, mapped_column, relationship from src.db import Base class HouseholdMember(Base): __tablename__ = "household_members" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100)) relationship: Mapped[str] = mapped_column(String(50)) accounts: Mapped[list["Account"]] = relationship(back_populates="owner") ``` ```python # src/models/account.py from sqlalchemy import ForeignKey, String from sqlalchemy.orm import Mapped, mapped_column, relationship from src.db import Base class Account(Base): __tablename__ = "accounts" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100)) institution: Mapped[str] = mapped_column(String(100)) account_type: Mapped[str] = mapped_column(String(20)) # checking, credit, savings owner_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id")) is_shared: Mapped[bool] = mapped_column(default=False) owner: Mapped["HouseholdMember | None"] = relationship(back_populates="accounts") ``` ```python # src/models/category.py from sqlalchemy import String from sqlalchemy.orm import Mapped, mapped_column from src.db import Base class Category(Base): __tablename__ = "categories" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True) default_tag: Mapped[str | None] = mapped_column(String(20)) # needs, wants, savings icon: Mapped[str | None] = mapped_column(String(50)) ``` ```python # src/models/transaction.py import datetime from sqlalchemy import Date, ForeignKey, Numeric, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from src.db import Base class Transaction(Base): __tablename__ = "transactions" id: Mapped[int] = mapped_column(primary_key=True) date: Mapped[datetime.date] = mapped_column(Date) amount: Mapped[float] = mapped_column(Numeric(10, 2)) description: Mapped[str] = mapped_column(String(300)) raw_description: Mapped[str | None] = mapped_column(Text) account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id")) category_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id")) attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id")) tag: Mapped[str | None] = mapped_column(String(20)) source_category: Mapped[str | None] = mapped_column(String(100)) is_transfer: Mapped[bool] = mapped_column(default=False) transfer_pair_id: Mapped[int | None] = mapped_column(ForeignKey("transactions.id")) account: Mapped["Account"] = relationship() category: Mapped["Category | None"] = relationship() attributed_to: Mapped["HouseholdMember | None"] = relationship() ``` ```python # src/models/rule.py from sqlalchemy import ForeignKey, String from sqlalchemy.orm import Mapped, mapped_column, relationship from src.db import Base class CategorizationRule(Base): __tablename__ = "categorization_rules" id: Mapped[int] = mapped_column(primary_key=True) pattern: Mapped[str] = mapped_column(String(300)) category_id: Mapped[int] = mapped_column(ForeignKey("categories.id")) tag_override: Mapped[str | None] = mapped_column(String(20)) attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id")) priority: Mapped[int] = mapped_column(default=0) category: Mapped["Category"] = relationship() attributed_to: Mapped["HouseholdMember | None"] = relationship() ``` ```python # src/models/csv_mapping.py from sqlalchemy import ForeignKey, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from src.db import Base class CsvMapping(Base): __tablename__ = "csv_mappings" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100)) fingerprint: Mapped[str] = mapped_column(String(500)) column_map: Mapped[str] = mapped_column(Text) # JSON amount_logic: Mapped[str] = mapped_column(String(50)) # signed, separate_columns, type_column account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id")) account: Mapped["Account"] = relationship() ``` Update `src/models/__init__.py` to re-export all models: ```python from src.models.household import HouseholdMember from src.models.account import Account from src.models.category import Category from src.models.transaction import Transaction from src.models.rule import CategorizationRule from src.models.csv_mapping import CsvMapping ``` **Step 4: Run tests** Run: `pytest tests/models/test_models.py -v` Expected: All 6 tests PASS **Step 5: Commit** ```bash git add src/db.py src/models/ tests/models/ git commit -m "feat: database models and connection management" ``` --- ### Task 3: Seed Data **Files:** - Create: `src/seed.py` - Create: `tests/test_seed.py` **Step 1: Write failing test** ```python # tests/test_seed.py from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.seed import seed_categories, seed_household, DEFAULT_CATEGORIES from src.models import Category, HouseholdMember def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def test_seed_categories(): session = make_session() seed_categories(session) cats = session.query(Category).all() assert len(cats) == len(DEFAULT_CATEGORIES) names = {c.name for c in cats} assert "Groceries" in names assert "Transfer" in names assert "Income" in names def test_seed_categories_idempotent(): session = make_session() seed_categories(session) seed_categories(session) cats = session.query(Category).all() assert len(cats) == len(DEFAULT_CATEGORIES) def test_seed_household(): session = make_session() seed_household(session, "Andrew", "self") members = session.query(HouseholdMember).all() assert len(members) == 1 assert members[0].name == "Andrew" ``` **Step 2: Run test to verify it fails** Run: `pytest tests/test_seed.py -v` Expected: FAIL **Step 3: Implement seed.py** ```python # src/seed.py from sqlalchemy.orm import Session from src.models import Category, HouseholdMember DEFAULT_CATEGORIES = [ ("Income", None, None), ("Housing", "needs", None), ("Groceries", "needs", None), ("Dining Out", "wants", None), ("Transportation", "needs", None), ("Gas", "needs", None), ("Utilities", "needs", None), ("Insurance", "needs", None), ("Healthcare", "needs", None), ("Entertainment", "wants", None), ("Shopping", "wants", None), ("Subscriptions", "wants", None), ("Personal Care", "wants", None), ("Family", "needs", None), ("Gifts & Donations", "wants", None), ("Debt Payment", "needs", None), ("Savings", "savings", None), ("Transfer", None, None), ("Travel", "wants", None), ("Home", "needs", None), ("Professional Services", "needs", None), ("Uncategorized", None, None), ] def seed_categories(session: Session) -> None: existing = {c.name for c in session.query(Category).all()} for name, tag, icon in DEFAULT_CATEGORIES: if name not in existing: session.add(Category(name=name, default_tag=tag, icon=icon)) session.commit() def seed_household(session: Session, name: str, relationship: str) -> HouseholdMember: existing = session.query(HouseholdMember).filter_by(name=name).first() if existing: return existing member = HouseholdMember(name=name, relationship=relationship) session.add(member) session.commit() return member ``` **Step 4: Run tests** Run: `pytest tests/test_seed.py -v` Expected: All 3 tests PASS **Step 5: Commit** ```bash git add src/seed.py tests/test_seed.py git commit -m "feat: seed data for default categories and household" ``` --- ## Phase 2: CSV Import Pipeline ### Task 4: Description Normalizer **Files:** - Create: `src/services/normalizer.py` - Create: `tests/services/test_normalizer.py` **Step 1: Write failing tests using real transaction data** ```python # tests/services/test_normalizer.py from src.services.normalizer import normalize_description def test_strip_authorization_prefix(): raw = "PURCHASE AUTHORIZED ON 02/06 WALMART.COM 8009256278 BENTONVILLE AR P000000089502338 CARD 5360" result = normalize_description(raw) assert "AUTHORIZED ON" not in result assert "CARD 5360" not in result assert "WALMART.COM" in result def test_strip_recurring_prefix(): raw = "RECURRING PAYMENT AUTHORIZED ON 02/05 HELLOFRESH 646-846-3663 NY S356036316425851 CARD 5360" result = normalize_description(raw) assert "AUTHORIZED ON" not in result assert "HELLOFRESH" in result def test_strip_reference_ids(): raw = "RECURRING TRANSFER TO CONLON A WAY2SAVE SAVINGS REF #OP0WS99NKQ XXXXXX6065" result = normalize_description(raw) assert "REF #" not in result assert "XXXXXX" not in result assert "WAY2SAVE SAVINGS" in result def test_strip_card_number(): raw = "PURCHASE AUTHORIZED ON 01/08 MRS B COMPANY LLC PORT ROYAL SC S386008692282379 CARD 5360" result = normalize_description(raw) assert "CARD 5360" not in result assert "MRS B COMPANY LLC" in result def test_strip_transaction_codes(): raw = "OASISBATCH PAYROLL 260109 MP027126352 DONNA CONLON" result = normalize_description(raw) assert "OASISBATCH PAYROLL" in result assert "DONNA CONLON" in result def test_clean_chase_description(): """Chase descriptions are already clean, should pass through mostly unchanged.""" raw = "PUBLIX #1716" result = normalize_description(raw) assert result == "PUBLIX #1716" def test_check_number(): raw = "CHECK # 104" result = normalize_description(raw) assert "CHECK" in result def test_strip_html_entities(): raw = "TST*PONCHOS TACOS & BEE" result = normalize_description(raw) assert "&" not in result assert "&" in result ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_normalizer.py -v` **Step 3: Implement normalizer** ```python # src/services/normalizer.py import re import html def normalize_description(raw: str) -> str: desc = html.unescape(raw.strip()) # Strip "PURCHASE AUTHORIZED ON MM/DD" or "RECURRING PAYMENT AUTHORIZED ON MM/DD" desc = re.sub(r"^(PURCHASE|RECURRING PAYMENT|RECURRING TRANSFER)\s+AUTHORIZED ON \d{2}/\d{2}\s+", "", desc) # Strip "CARD XXXX" at end desc = re.sub(r"\s+CARD\s+\d{4}\s*$", "", desc) # Strip long alphanumeric transaction/reference codes (10+ chars, mixed letters/digits) desc = re.sub(r"\s+[A-Z0-9]{10,}\b", "", desc) # Strip "REF #..." references desc = re.sub(r"\s+REF\s+#\S+", "", desc) # Strip masked account numbers desc = re.sub(r"\s+X{4,}\d+", "", desc) # Strip phone numbers desc = re.sub(r"\s+\d{3}-\d{3}-\d{4}", "", desc) # Strip trailing state abbreviations + zip that follow addresses # But keep them if they're part of merchant name (be conservative) desc = re.sub(r"\s+[A-Z]{2}\s+\d{5}(?:-\d{4})?\s*$", "", desc) # Clean up multiple spaces desc = re.sub(r"\s{2,}", " ", desc).strip() return desc ``` **Step 4: Run tests, iterate on regex until all pass** Run: `pytest tests/services/test_normalizer.py -v` Expected: All 8 tests PASS **Step 5: Commit** ```bash git add src/services/normalizer.py tests/services/test_normalizer.py git commit -m "feat: transaction description normalizer" ``` --- ### Task 5: CSV Reader and Format Detection **Files:** - Create: `src/services/csv_reader.py` - Create: `tests/services/test_csv_reader.py` - Reference: `rawdata/Chase0372_Activity20260101_20260210_20260210.CSV` - Reference: `rawdata/Checking1.csv` **Step 1: Write failing tests** ```python # tests/services/test_csv_reader.py from pathlib import Path from src.services.csv_reader import read_csv, detect_format RAWDATA = Path(__file__).parent.parent.parent / "rawdata" def test_detect_chase_format(): result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV") assert result["has_headers"] is True assert "Transaction Date" in result["headers"] assert result["delimiter"] == "," assert result["row_count"] > 0 def test_detect_checking_format(): result = detect_format(RAWDATA / "Checking1.csv") assert result["has_headers"] is False assert result["column_count"] == 5 assert result["row_count"] > 0 def test_read_chase_csv(): rows = read_csv(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV") assert len(rows) > 100 first = rows[0] assert "Transaction Date" in first assert "Amount" in first assert "Description" in first def test_read_headerless_csv(): rows = read_csv(RAWDATA / "Checking1.csv") assert len(rows) > 50 first = rows[0] # Headerless: keys should be column indices assert 0 in first or "0" in first def test_preview_rows(): result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV") assert "preview" in result assert len(result["preview"]) <= 5 ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_csv_reader.py -v` **Step 3: Implement csv_reader.py** ```python # src/services/csv_reader.py import csv from pathlib import Path def detect_format(file_path: Path) -> dict: with open(file_path, "r", encoding="utf-8-sig") as f: sample = f.read(8192) dialect = csv.Sniffer().sniff(sample) has_headers = csv.Sniffer().has_header(sample) with open(file_path, "r", encoding="utf-8-sig") as f: reader = csv.reader(f, dialect) all_rows = list(reader) # Filter empty rows all_rows = [r for r in all_rows if any(cell.strip() for cell in r)] result = { "delimiter": dialect.delimiter, "has_headers": has_headers, "file_path": str(file_path), } if has_headers: result["headers"] = all_rows[0] data_rows = all_rows[1:] else: result["column_count"] = len(all_rows[0]) if all_rows else 0 data_rows = all_rows result["row_count"] = len(data_rows) result["preview"] = data_rows[:5] return result def read_csv(file_path: Path) -> list[dict]: with open(file_path, "r", encoding="utf-8-sig") as f: sample = f.read(8192) dialect = csv.Sniffer().sniff(sample) has_headers = csv.Sniffer().has_header(sample) with open(file_path, "r", encoding="utf-8-sig") as f: if has_headers: reader = csv.DictReader(f, dialect=dialect) return [dict(row) for row in reader if any(v.strip() for v in row.values())] else: reader = csv.reader(f, dialect) rows = [] for row in reader: if any(cell.strip() for cell in row): rows.append({i: cell for i, cell in enumerate(row)}) return rows ``` **Step 4: Run tests** Run: `pytest tests/services/test_csv_reader.py -v` Expected: All 5 tests PASS **Step 5: Commit** ```bash git add src/services/csv_reader.py tests/services/test_csv_reader.py git commit -m "feat: CSV reader with format auto-detection" ``` --- ### Task 6: Import Service **Files:** - Create: `src/services/importer.py` - Create: `tests/services/test_importer.py` **Step 1: Write failing tests** ```python # tests/services/test_importer.py import datetime from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories from src.services.importer import ImportService RAWDATA = Path(__file__).parent.parent.parent / "rawdata" def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def setup_chase_account(session): member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() seed_categories(session) return account def setup_checking_account(session): member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id, is_shared=True) session.add(account) session.flush() seed_categories(session) return account def test_import_chase_csv(): session = make_session() account = setup_chase_account(session) column_map = { "date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category", } svc = ImportService(session) result = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=account.id, column_map=column_map, amount_logic="signed", ) assert result["imported"] > 0 assert result["duplicates"] == 0 txns = session.query(Transaction).all() assert len(txns) == result["imported"] # Check a known transaction sephora = [t for t in txns if "SEPHORA" in t.description] assert len(sephora) == 1 assert sephora[0].amount == -75.00 def test_import_checking_csv(): session = make_session() account = setup_checking_account(session) column_map = { "date": 0, "amount": 1, "description": 4, } svc = ImportService(session) result = svc.import_csv( RAWDATA / "Checking1.csv", account_id=account.id, column_map=column_map, amount_logic="signed", ) assert result["imported"] > 0 txns = session.query(Transaction).all() assert len(txns) > 50 def test_duplicate_detection(): session = make_session() account = setup_chase_account(session) column_map = { "date": "Transaction Date", "amount": "Amount", "description": "Description", } svc = ImportService(session) result1 = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=account.id, column_map=column_map, amount_logic="signed", ) result2 = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=account.id, column_map=column_map, amount_logic="signed", ) assert result2["duplicates"] == result1["imported"] assert result2["imported"] == 0 ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_importer.py -v` **Step 3: Implement import service** ```python # src/services/importer.py import datetime from pathlib import Path from sqlalchemy.orm import Session from src.models.transaction import Transaction from src.services.csv_reader import read_csv from src.services.normalizer import normalize_description class ImportService: def __init__(self, session: Session): self.session = session def import_csv( self, file_path: Path, account_id: int, column_map: dict, amount_logic: str = "signed", ) -> dict: rows = read_csv(file_path) imported = 0 duplicates = 0 for row in rows: date_val = self._parse_date(row[column_map["date"]]) amount_val = self._parse_amount(row, column_map, amount_logic) raw_desc = row[column_map["description"]].strip() description = normalize_description(raw_desc) source_cat = row.get(column_map.get("source_category", "__missing__"), "").strip() or None if date_val is None or amount_val is None: continue # Duplicate check existing = ( self.session.query(Transaction) .filter_by(date=date_val, amount=amount_val, raw_description=raw_desc, account_id=account_id) .first() ) if existing: duplicates += 1 continue txn = Transaction( date=date_val, amount=amount_val, description=description, raw_description=raw_desc, account_id=account_id, source_category=source_cat, ) self.session.add(txn) imported += 1 self.session.commit() return {"imported": imported, "duplicates": duplicates, "total_rows": len(rows)} def _parse_date(self, val: str) -> datetime.date | None: val = val.strip().strip('"') for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m-%d-%Y"): try: return datetime.datetime.strptime(val, fmt).date() except ValueError: continue return None def _parse_amount(self, row: dict, column_map: dict, logic: str) -> float | None: try: if logic == "signed": return float(row[column_map["amount"]].strip().strip('"').replace(",", "")) # Add other logic types as needed except (ValueError, KeyError): return None ``` **Step 4: Run tests** Run: `pytest tests/services/test_importer.py -v` Expected: All 3 tests PASS **Step 5: Commit** ```bash git add src/services/importer.py tests/services/test_importer.py git commit -m "feat: CSV import service with duplicate detection" ``` --- ## Phase 3: Categorization ### Task 7: Categorization Engine **Files:** - Create: `src/services/categorizer.py` - Create: `tests/services/test_categorizer.py` **Step 1: Write failing tests** ```python # tests/services/test_categorizer.py from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories from src.services.categorizer import RuleBasedCategorizer, Categorizer def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def test_categorizer_protocol(): """RuleBasedCategorizer implements Categorizer protocol.""" session = make_session() cat = RuleBasedCategorizer(session) assert isinstance(cat, Categorizer) def test_match_simple_pattern(): session = make_session() seed_categories(session) groceries = session.query(Category).filter_by(name="Groceries").one() rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10) session.add(rule) session.commit() cat = RuleBasedCategorizer(session) txn = Transaction(date="2026-01-15", amount=-44.90, description="PUBLIX #1716", account_id=1) result = cat.categorize(txn) assert result is not None assert result.category_id == groceries.id def test_match_pipe_separated_pattern(): session = make_session() seed_categories(session) groceries = session.query(Category).filter_by(name="Groceries").one() rule = CategorizationRule(pattern="PUBLIX|ALDI|PIGGLY WIGGLY", category_id=groceries.id, priority=10) session.add(rule) session.commit() cat = RuleBasedCategorizer(session) for desc in ["PUBLIX #1716", "ALDI 76180 BEAUFORT", "PIGGLY WIGGLY #286"]: txn = Transaction(date="2026-01-15", amount=-20.00, description=desc, account_id=1) result = cat.categorize(txn) assert result is not None, f"Failed to match: {desc}" assert result.category_id == groceries.id def test_no_match_returns_none(): session = make_session() seed_categories(session) cat = RuleBasedCategorizer(session) txn = Transaction(date="2026-01-15", amount=-10.00, description="UNKNOWN MERCHANT", account_id=1) result = cat.categorize(txn) assert result is None def test_priority_ordering(): session = make_session() seed_categories(session) groceries = session.query(Category).filter_by(name="Groceries").one() shopping = session.query(Category).filter_by(name="Shopping").one() # Lower priority number = higher priority rule1 = CategorizationRule(pattern="WAL-MART", category_id=groceries.id, priority=1) rule2 = CategorizationRule(pattern="WAL", category_id=shopping.id, priority=10) session.add_all([rule1, rule2]) session.commit() cat = RuleBasedCategorizer(session) txn = Transaction(date="2026-01-15", amount=-48.52, description="WAL-MART #7181", account_id=1) result = cat.categorize(txn) assert result.category_id == groceries.id def test_tag_override(): session = make_session() seed_categories(session) dining = session.query(Category).filter_by(name="Dining Out").one() rule = CategorizationRule(pattern="CHICK-FIL-A", category_id=dining.id, tag_override="needs", priority=10) session.add(rule) session.commit() cat = RuleBasedCategorizer(session) txn = Transaction(date="2026-01-15", amount=-5.39, description="CHICK-FIL-A #01476", account_id=1) result = cat.categorize(txn) assert result.tag == "needs" # Override from category default "wants" def test_person_attribution(): session = make_session() seed_categories(session) donna = HouseholdMember(name="Donna", relationship="wife") session.add(donna) session.flush() income = session.query(Category).filter_by(name="Income").one() rule = CategorizationRule(pattern="OASISBATCH PAYROLL", category_id=income.id, attributed_to_id=donna.id, priority=10) session.add(rule) session.commit() cat = RuleBasedCategorizer(session) txn = Transaction(date="2026-01-09", amount=1087.83, description="OASISBATCH PAYROLL 260109 DONNA CONLON", account_id=1) result = cat.categorize(txn) assert result.category_id == income.id assert result.attributed_to_id == donna.id ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_categorizer.py -v` **Step 3: Implement categorizer** ```python # src/services/categorizer.py import re from dataclasses import dataclass from typing import Protocol, runtime_checkable from sqlalchemy.orm import Session from src.models.rule import CategorizationRule from src.models.transaction import Transaction @dataclass class CategoryResult: category_id: int tag: str | None = None attributed_to_id: int | None = None @runtime_checkable class Categorizer(Protocol): def categorize(self, transaction: Transaction) -> CategoryResult | None: ... class RuleBasedCategorizer: def __init__(self, session: Session): self.session = session self._rules: list[CategorizationRule] | None = None def _load_rules(self) -> list[CategorizationRule]: if self._rules is None: self._rules = ( self.session.query(CategorizationRule) .order_by(CategorizationRule.priority) .all() ) return self._rules def invalidate_cache(self) -> None: self._rules = None def categorize(self, transaction: Transaction) -> CategoryResult | None: rules = self._load_rules() desc = transaction.description.upper() for rule in rules: # Support pipe-separated patterns patterns = [p.strip() for p in rule.pattern.split("|")] for pattern in patterns: if re.search(re.escape(pattern.upper()), desc): tag = rule.tag_override if tag is None and rule.category: tag = rule.category.default_tag return CategoryResult( category_id=rule.category_id, tag=tag, attributed_to_id=rule.attributed_to_id, ) return None ``` **Step 4: Run tests** Run: `pytest tests/services/test_categorizer.py -v` Expected: All 7 tests PASS **Step 5: Commit** ```bash git add src/services/categorizer.py tests/services/test_categorizer.py git commit -m "feat: rule-based categorization engine with protocol" ``` --- ### Task 8: Post-Import Categorization Integration **Files:** - Modify: `src/services/importer.py` - Create: `tests/services/test_import_categorize.py` **Step 1: Write failing test for categorize-on-import** ```python # tests/services/test_import_categorize.py from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories from src.services.importer import ImportService RAWDATA = Path(__file__).parent.parent.parent / "rawdata" def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def test_import_applies_categorization_rules(): session = make_session() seed_categories(session) member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() groceries = session.query(Category).filter_by(name="Groceries").one() rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10) session.add(rule) session.commit() svc = ImportService(session) svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=account.id, column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"}, amount_logic="signed", ) publix_txns = session.query(Transaction).filter(Transaction.description.contains("PUBLIX")).all() assert len(publix_txns) > 0 for txn in publix_txns: assert txn.category_id == groceries.id assert txn.tag == "needs" def test_uncategorized_transactions_have_no_category(): session = make_session() seed_categories(session) member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() # No rules defined — everything should be uncategorized svc = ImportService(session) svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=account.id, column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"}, amount_logic="signed", ) uncategorized = session.query(Transaction).filter(Transaction.category_id.is_(None)).count() total = session.query(Transaction).count() assert uncategorized == total ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_import_categorize.py -v` **Step 3: Update ImportService to run categorization after import** Add to `ImportService.import_csv()`, after the main import loop but before `session.commit()`: ```python from src.services.categorizer import RuleBasedCategorizer # In import_csv, after building all transactions: categorizer = RuleBasedCategorizer(self.session) for txn in self.session.new: if isinstance(txn, Transaction) and txn.category_id is None: result = categorizer.categorize(txn) if result: txn.category_id = result.category_id txn.tag = result.tag txn.attributed_to_id = result.attributed_to_id ``` **Step 4: Run tests** Run: `pytest tests/services/test_import_categorize.py -v` Expected: All 2 tests PASS **Step 5: Commit** ```bash git add src/services/importer.py tests/services/test_import_categorize.py git commit -m "feat: auto-categorize transactions on import" ``` --- ## Phase 4: Analysis Services ### Task 9: Spending Analysis Service **Files:** - Create: `src/services/analysis.py` - Create: `tests/services/test_analysis.py` **Step 1: Write failing tests** ```python # tests/services/test_analysis.py import datetime from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories from src.services.analysis import AnalysisService def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def make_test_data(session): seed_categories(session) member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() groceries = session.query(Category).filter_by(name="Groceries").one() dining = session.query(Category).filter_by(name="Dining Out").one() txns = [ Transaction(date=datetime.date(2026, 1, 5), amount=-50.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"), Transaction(date=datetime.date(2026, 1, 12), amount=-30.0, description="ALDI", account_id=account.id, category_id=groceries.id, tag="needs"), Transaction(date=datetime.date(2026, 1, 20), amount=-15.0, description="CHICK-FIL-A", account_id=account.id, category_id=dining.id, tag="wants"), Transaction(date=datetime.date(2026, 2, 3), amount=-60.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"), Transaction(date=datetime.date(2026, 2, 7), amount=-25.0, description="KFC", account_id=account.id, category_id=dining.id, tag="wants"), ] session.add_all(txns) session.commit() return account, member def test_spending_by_month(): session = make_session() make_test_data(session) svc = AnalysisService(session) result = svc.spending_by_period("month") assert len(result) == 2 # Jan and Feb jan = [r for r in result if r["period"] == "2026-01"][0] assert jan["total"] == -95.0 def test_spending_by_category(): session = make_session() make_test_data(session) svc = AnalysisService(session) result = svc.spending_by_category( start=datetime.date(2026, 1, 1), end=datetime.date(2026, 2, 28), ) groceries_row = [r for r in result if r["category"] == "Groceries"][0] assert groceries_row["total"] == -140.0 def test_spending_by_tag(): session = make_session() make_test_data(session) svc = AnalysisService(session) result = svc.spending_by_tag( start=datetime.date(2026, 1, 1), end=datetime.date(2026, 2, 28), ) needs = [r for r in result if r["tag"] == "needs"][0] wants = [r for r in result if r["tag"] == "wants"][0] assert needs["total"] == -140.0 assert wants["total"] == -40.0 def test_spending_filtered_by_person(): session = make_session() account, member = make_test_data(session) # Attribute some transactions to Andrew txns = session.query(Transaction).all() for t in txns: t.attributed_to_id = member.id session.commit() svc = AnalysisService(session) result = svc.spending_by_category( start=datetime.date(2026, 1, 1), end=datetime.date(2026, 2, 28), person_id=member.id, ) total = sum(r["total"] for r in result) assert total == -180.0 ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_analysis.py -v` **Step 3: Implement analysis service** ```python # src/services/analysis.py import datetime from sqlalchemy import func, extract from sqlalchemy.orm import Session from src.models.transaction import Transaction from src.models.category import Category class AnalysisService: def __init__(self, session: Session): self.session = session def _base_query(self, start=None, end=None, person_id=None, account_id=None, category_id=None): q = self.session.query(Transaction).filter(Transaction.is_transfer == False) if start: q = q.filter(Transaction.date >= start) if end: q = q.filter(Transaction.date <= end) if person_id: q = q.filter(Transaction.attributed_to_id == person_id) if account_id: q = q.filter(Transaction.account_id == account_id) if category_id: q = q.filter(Transaction.category_id == category_id) return q def spending_by_period(self, period: str = "month", **filters) -> list[dict]: q = self._base_query(**filters) if period == "month": q = ( q.with_entities( func.strftime("%Y-%m", Transaction.date).label("period"), func.sum(Transaction.amount).label("total"), ) .group_by("period") .order_by("period") ) elif period == "week": q = ( q.with_entities( func.strftime("%Y-W%W", Transaction.date).label("period"), func.sum(Transaction.amount).label("total"), ) .group_by("period") .order_by("period") ) elif period == "day": q = ( q.with_entities( func.strftime("%Y-%m-%d", Transaction.date).label("period"), func.sum(Transaction.amount).label("total"), ) .group_by("period") .order_by("period") ) return [{"period": r.period, "total": float(r.total)} for r in q.all()] def spending_by_category(self, start=None, end=None, **filters) -> list[dict]: q = self._base_query(start=start, end=end, **filters) q = ( q.join(Category, Transaction.category_id == Category.id, isouter=True) .with_entities( func.coalesce(Category.name, "Uncategorized").label("category"), func.sum(Transaction.amount).label("total"), func.count(Transaction.id).label("count"), ) .group_by("category") .order_by(func.sum(Transaction.amount)) ) return [{"category": r.category, "total": float(r.total), "count": r.count} for r in q.all()] def spending_by_tag(self, start=None, end=None, **filters) -> list[dict]: q = self._base_query(start=start, end=end, **filters) q = ( q.with_entities( func.coalesce(Transaction.tag, "untagged").label("tag"), func.sum(Transaction.amount).label("total"), ) .group_by("tag") .order_by(func.sum(Transaction.amount)) ) return [{"tag": r.tag, "total": float(r.total)} for r in q.all()] ``` **Step 4: Run tests** Run: `pytest tests/services/test_analysis.py -v` Expected: All 4 tests PASS **Step 5: Commit** ```bash git add src/services/analysis.py tests/services/test_analysis.py git commit -m "feat: spending analysis service with period, category, and tag breakdowns" ``` --- ### Task 10: Recurring Charge Detection **Files:** - Create: `src/services/recurring.py` - Create: `tests/services/test_recurring.py` **Step 1: Write failing tests** ```python # tests/services/test_recurring.py import datetime from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.services.recurring import RecurringDetector def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def make_recurring_data(session): member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Checking", institution="WF", account_type="checking", owner_id=member.id) session.add(account) session.flush() # Netflix monthly: ~$19.07 on 3rd/4th of month for month in [1, 2]: session.add(Transaction( date=datetime.date(2026, month, 4), amount=-19.07, description="Netflix.com", account_id=account.id, )) # HelloFresh weekly: ~$142.87 for day in [1, 8, 15, 22, 29]: session.add(Transaction( date=datetime.date(2026, 1, day), amount=-142.87, description="HELLOFRESH", account_id=account.id, )) # Random one-off purchase session.add(Transaction( date=datetime.date(2026, 1, 10), amount=-95.38, description="CARL'S GOLFLAND INC", account_id=account.id, )) session.commit() return account def test_detect_monthly_recurring(): session = make_session() make_recurring_data(session) detector = RecurringDetector(session) results = detector.detect() netflix = [r for r in results if "Netflix" in r["description"]] assert len(netflix) == 1 assert netflix[0]["frequency"] == "monthly" assert abs(netflix[0]["typical_amount"] - 19.07) < 0.01 def test_detect_weekly_recurring(): session = make_session() make_recurring_data(session) detector = RecurringDetector(session) results = detector.detect() hello = [r for r in results if "HELLOFRESH" in r["description"]] assert len(hello) == 1 assert hello[0]["frequency"] == "weekly" def test_one_off_not_detected(): session = make_session() make_recurring_data(session) detector = RecurringDetector(session) results = detector.detect() golf = [r for r in results if "GOLFLAND" in r["description"]] assert len(golf) == 0 def test_annual_cost_calculation(): session = make_session() make_recurring_data(session) detector = RecurringDetector(session) results = detector.detect() netflix = [r for r in results if "Netflix" in r["description"]][0] assert abs(netflix["annual_cost"] - 19.07 * 12) < 1.0 ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_recurring.py -v` **Step 3: Implement recurring detector** ```python # src/services/recurring.py from collections import defaultdict from sqlalchemy.orm import Session from src.models.transaction import Transaction class RecurringDetector: def __init__(self, session: Session): self.session = session def detect(self, amount_tolerance: float = 0.10) -> list[dict]: txns = ( self.session.query(Transaction) .filter(Transaction.amount < 0) .filter(Transaction.is_transfer == False) .order_by(Transaction.date) .all() ) # Group by description (normalized) groups: dict[str, list[Transaction]] = defaultdict(list) for txn in txns: groups[txn.description].append(txn) results = [] for desc, group in groups.items(): if len(group) < 2: continue amounts = [abs(t.amount) for t in group] avg_amount = sum(amounts) / len(amounts) # Check amount consistency if any(abs(a - avg_amount) / avg_amount > amount_tolerance for a in amounts): continue # Detect frequency from intervals dates = sorted(t.date for t in group) intervals = [(dates[i + 1] - dates[i]).days for i in range(len(dates) - 1)] avg_interval = sum(intervals) / len(intervals) frequency = self._classify_frequency(avg_interval) if frequency is None: continue annual_multiplier = {"weekly": 52, "biweekly": 26, "monthly": 12, "quarterly": 4, "annual": 1} results.append({ "description": desc, "typical_amount": round(avg_amount, 2), "frequency": frequency, "annual_cost": round(avg_amount * annual_multiplier[frequency], 2), "occurrences": len(group), "last_date": max(dates), }) results.sort(key=lambda r: r["annual_cost"], reverse=True) return results def _classify_frequency(self, avg_days: float) -> str | None: if 5 <= avg_days <= 9: return "weekly" elif 12 <= avg_days <= 16: return "biweekly" elif 25 <= avg_days <= 35: return "monthly" elif 80 <= avg_days <= 100: return "quarterly" elif 350 <= avg_days <= 380: return "annual" return None ``` **Step 4: Run tests** Run: `pytest tests/services/test_recurring.py -v` Expected: All 4 tests PASS **Step 5: Commit** ```bash git add src/services/recurring.py tests/services/test_recurring.py git commit -m "feat: recurring charge detection service" ``` --- ### Task 11: Forecasting Service **Files:** - Create: `src/services/forecasting.py` - Create: `tests/services/test_forecasting.py` **Step 1: Write failing tests** ```python # tests/services/test_forecasting.py import datetime from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories from src.services.forecasting import ForecastingService def make_session(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) return Session(engine) def make_forecast_data(session): seed_categories(session) member = HouseholdMember(name="Andrew", relationship="self") session.add(member) session.flush() account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id) session.add(account) session.flush() groceries = session.query(Category).filter_by(name="Groceries").one() # 3 months of grocery spending: $500, $600, $700 (trending up) for month, total in [(10, 500), (11, 600), (12, 700)]: session.add(Transaction( date=datetime.date(2025, month, 15), amount=-total, description="GROCERIES", account_id=account.id, category_id=groceries.id, tag="needs", )) session.commit() return account def test_monthly_forecast(): session = make_session() make_forecast_data(session) svc = ForecastingService(session) forecast = svc.forecast_month() groceries = [f for f in forecast if f["category"] == "Groceries"] assert len(groceries) == 1 # Should be weighted toward recent months assert groceries[0]["projected"] < 0 def test_annual_forecast(): session = make_session() make_forecast_data(session) svc = ForecastingService(session) forecast = svc.forecast_year() assert len(forecast) > 0 total = sum(f["projected"] for f in forecast) assert total < 0 # We're spending money def test_what_if_removes_recurring(): session = make_session() make_forecast_data(session) svc = ForecastingService(session) base = svc.forecast_month() adjusted = svc.forecast_month(exclude_descriptions=["GROCERIES"]) base_total = sum(f["projected"] for f in base) adj_total = sum(f["projected"] for f in adjusted) assert adj_total > base_total # Less spending when we exclude groceries ``` **Step 2: Run to verify failure** Run: `pytest tests/services/test_forecasting.py -v` **Step 3: Implement forecasting service** ```python # src/services/forecasting.py import datetime from collections import defaultdict from sqlalchemy import func from sqlalchemy.orm import Session from src.models.transaction import Transaction from src.models.category import Category class ForecastingService: def __init__(self, session: Session): self.session = session def _get_monthly_by_category(self, months_back: int = 3) -> dict[str, list[float]]: cutoff = datetime.date.today() - datetime.timedelta(days=months_back * 31) rows = ( self.session.query( func.strftime("%Y-%m", Transaction.date).label("month"), func.coalesce(Category.name, "Uncategorized").label("category"), func.sum(Transaction.amount).label("total"), ) .join(Category, Transaction.category_id == Category.id, isouter=True) .filter(Transaction.date >= cutoff) .filter(Transaction.is_transfer == False) .filter(Transaction.amount < 0) .group_by("month", "category") .all() ) by_cat: dict[str, list[float]] = defaultdict(list) for row in rows: by_cat[row.category].append(float(row.total)) return by_cat def _weighted_average(self, values: list[float]) -> float: if not values: return 0.0 # More recent months get higher weight weights = list(range(1, len(values) + 1)) total_weight = sum(weights) return sum(v * w for v, w in zip(values, weights)) / total_weight def forecast_month(self, exclude_descriptions: list[str] | None = None) -> list[dict]: by_cat = self._get_monthly_by_category() if exclude_descriptions: # Re-query excluding those descriptions cutoff = datetime.date.today() - datetime.timedelta(days=3 * 31) q = ( self.session.query( func.strftime("%Y-%m", Transaction.date).label("month"), func.coalesce(Category.name, "Uncategorized").label("category"), func.sum(Transaction.amount).label("total"), ) .join(Category, Transaction.category_id == Category.id, isouter=True) .filter(Transaction.date >= cutoff) .filter(Transaction.is_transfer == False) .filter(Transaction.amount < 0) ) for desc in exclude_descriptions: q = q.filter(~Transaction.description.contains(desc)) rows = q.group_by("month", "category").all() by_cat = defaultdict(list) for row in rows: by_cat[row.category].append(float(row.total)) results = [] for cat, monthly_totals in by_cat.items(): projected = self._weighted_average(monthly_totals) results.append({ "category": cat, "projected": round(projected, 2), "confidence": "high" if len(monthly_totals) >= 3 else "low", }) results.sort(key=lambda r: r["projected"]) return results def forecast_year(self) -> list[dict]: monthly = self.forecast_month() return [ {**f, "projected": round(f["projected"] * 12, 2)} for f in monthly ] ``` **Step 4: Run tests** Run: `pytest tests/services/test_forecasting.py -v` Expected: All 3 tests PASS **Step 5: Commit** ```bash git add src/services/forecasting.py tests/services/test_forecasting.py git commit -m "feat: forecasting service with weighted averages and what-if" ``` --- ## Phase 5: Desktop UI ### Task 12: Main Window and Sidebar Navigation **Files:** - Create: `src/main.py` - Create: `src/ui/main_window.py` - Create: `src/ui/sidebar.py` **Step 1: Implement main window with sidebar** Note: UI tasks are harder to TDD strictly. Write the UI code, then verify manually with `python -m src.main`. ```python # src/ui/sidebar.py from PySide6.QtWidgets import QWidget, QVBoxLayout, QPushButton, QFrame from PySide6.QtCore import Signal class Sidebar(QFrame): view_changed = Signal(str) VIEWS = [ ("Import", "import"), ("Transactions", "transactions"), ("Analysis", "analysis"), ("Recurring", "recurring"), ("Settings", "settings"), ] def __init__(self, parent=None): super().__init__(parent) self.setObjectName("sidebar") self.setFixedWidth(200) layout = QVBoxLayout(self) layout.setContentsMargins(0, 10, 0, 10) self._buttons: dict[str, QPushButton] = {} for label, key in self.VIEWS: btn = QPushButton(label) btn.setObjectName(f"sidebar-{key}") btn.setCheckable(True) btn.clicked.connect(lambda checked, k=key: self._on_click(k)) layout.addWidget(btn) self._buttons[key] = btn layout.addStretch() self._buttons["import"].setChecked(True) def _on_click(self, key: str): for k, btn in self._buttons.items(): btn.setChecked(k == key) self.view_changed.emit(key) ``` ```python # src/ui/main_window.py from PySide6.QtWidgets import QMainWindow, QHBoxLayout, QWidget, QStackedWidget, QLabel from src.ui.sidebar import Sidebar class MainWindow(QMainWindow): def __init__(self, session): super().__init__() self.session = session self.setWindowTitle("SpendingAnalysis") self.setMinimumSize(1200, 800) central = QWidget() self.setCentralWidget(central) layout = QHBoxLayout(central) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) self.sidebar = Sidebar() layout.addWidget(self.sidebar) self.stack = QStackedWidget() layout.addWidget(self.stack) # Placeholder views — replaced in later tasks self._views = {} for label, key in Sidebar.VIEWS: placeholder = QLabel(f"{label} View") placeholder.setStyleSheet("font-size: 24px; color: #888;") self._views[key] = self.stack.addWidget(placeholder) self.sidebar.view_changed.connect(self._switch_view) def _switch_view(self, key: str): self.stack.setCurrentIndex(self._views[key]) ``` ```python # src/main.py import sys from PySide6.QtWidgets import QApplication from src.db import get_session from src.seed import seed_categories from src.ui.main_window import MainWindow def main(): session = get_session() seed_categories(session) app = QApplication(sys.argv) window = MainWindow(session) window.show() sys.exit(app.exec()) if __name__ == "__main__": main() ``` **Step 2: Verify it launches** Run: `python -m src.main` Expected: Window opens with sidebar and placeholder views **Step 3: Commit** ```bash git add src/main.py src/ui/sidebar.py src/ui/main_window.py git commit -m "feat: main window with sidebar navigation" ``` --- ### Task 13: Import Wizard View **Files:** - Create: `src/ui/import_view.py` - Modify: `src/ui/main_window.py` (replace placeholder) This is the most complex UI component. It has three stages: 1. File selection (with drag-and-drop) 2. Column mapping (for new sources) or auto-match confirmation 3. Import preview and confirmation **Step 1: Implement the import view** Build `src/ui/import_view.py` with: - A `QStackedWidget` for the three stages - File drop zone that accepts CSV files - Column mapping form with combo boxes for each target field (date, amount, description, etc.) - Account selection/creation combo box - Preview table showing first 5 rows mapped to normalized columns - Import button with progress feedback - Results summary showing imported count, duplicates, uncategorized Wire it into `MainWindow` by replacing the import placeholder. Key signals: `import_complete` (emitted after successful import so other views can refresh). **Step 2: Test manually with both sample CSVs** Run: `python -m src.main` Test: Import Chase CSV first (should auto-detect headers), then Checking CSV (should show mapping wizard). **Step 3: Commit** ```bash git add src/ui/import_view.py src/ui/main_window.py git commit -m "feat: import wizard view with column mapping and file drop" ``` --- ### Task 14: Transactions View **Files:** - Create: `src/ui/transactions_view.py` - Modify: `src/ui/main_window.py` (replace placeholder) **Step 1: Implement transactions view** Build `src/ui/transactions_view.py` with: - `QTableView` backed by a custom `QAbstractTableModel` wrapping SQLAlchemy queries - Columns: Date, Description, Amount, Account, Category, Tag, Person - Filter bar at top: date range (two `QDateEdit`), account dropdown, person dropdown, category dropdown, "Uncategorized only" checkbox, search text field - Inline category editing: clicking the category cell opens a dropdown to change it - After category change, prompt: "Create rule for [merchant pattern]?" with Yes/No - Amount column formatted with colors (red for expenses, green for income) - Sortable by clicking column headers - Refresh on import completion **Step 2: Verify with imported data** Run: `python -m src.main`, import sample data, switch to Transactions view. **Step 3: Commit** ```bash git add src/ui/transactions_view.py src/ui/main_window.py git commit -m "feat: transactions view with filtering and inline categorization" ``` --- ### Task 15: Analysis View (Charts) **Files:** - Create: `src/ui/analysis_view.py` - Modify: `src/ui/main_window.py` (replace placeholder) **Step 1: Implement analysis view** Build `src/ui/analysis_view.py` with three tab panels: **Spending Over Time tab:** - Matplotlib figure embedded via `FigureCanvasQTAgg` - Period toggle (day/week/month) using `QButtonGroup` - Stacked bar chart: x-axis = time periods, y-axis = spending, stacked by category - Filter controls matching transactions view filters **Category Breakdown tab:** - Donut chart showing spending by category for selected date range - Horizontal bar chart ranking categories by total spend - Needs/wants/savings summary bar (three-segment horizontal bar) - Click donut slice to filter transactions view to that category **Forecasting tab:** - Month-ahead stacked bar alongside last 3 actual months - Year-ahead projection with trend line - "What if" controls: checkboxes for each recurring charge, sliders for category adjustments - Live-updating projection as toggles change All charts styled to match app theme (dark background, consistent category colors). **Step 2: Verify with imported data** Run: `python -m src.main`, import data, switch to Analysis view. **Step 3: Commit** ```bash git add src/ui/analysis_view.py src/ui/main_window.py git commit -m "feat: analysis view with spending trends, breakdowns, and forecasting" ``` --- ### Task 16: Recurring Charges View **Files:** - Create: `src/ui/recurring_view.py` - Modify: `src/ui/main_window.py` (replace placeholder) **Step 1: Implement recurring charges view** Build `src/ui/recurring_view.py` with: - Summary card at top: total monthly subscription burden, total annual cost - Table of detected recurring charges: description, typical amount, frequency, annual cost, last date, status (confirmed/pending/dismissed) - Confirm/Dismiss buttons per row - "Re-scan" button to re-run detection - Confirmed charges stored in a `recurring_charges` table (add model if needed) or as metadata on categorization rules **Step 2: Verify with imported data** Run: `python -m src.main`, import data, switch to Recurring view. **Step 3: Commit** ```bash git add src/ui/recurring_view.py src/ui/main_window.py git commit -m "feat: recurring charges view with detection and confirmation" ``` --- ### Task 17: Settings View **Files:** - Create: `src/ui/settings_view.py` - Modify: `src/ui/main_window.py` (replace placeholder) **Step 1: Implement settings view** Build `src/ui/settings_view.py` with tabs: **Categories tab:** - Editable list of categories with name, default tag, icon - Add/delete/rename buttons **Rules tab:** - Table of categorization rules: pattern, category, tag override, person, priority - Add/edit/delete with a dialog form - Drag to reorder priority (or up/down arrows) **Household tab:** - List of household members with name and relationship - Add/edit/delete **Accounts tab:** - List of accounts with name, institution, type, owner - Edit owner/shared status **CSV Mappings tab:** - List of saved mappings with name and account - Delete mappings that are no longer needed **Step 2: Verify settings management** Run: `python -m src.main`, switch to Settings, add a rule, verify it applies on next import. **Step 3: Commit** ```bash git add src/ui/settings_view.py src/ui/main_window.py git commit -m "feat: settings view for categories, rules, household, and accounts" ``` --- ## Phase 6: Polish ### Task 18: Dark/Light Theming **Files:** - Create: `src/ui/themes/dark.qss` - Create: `src/ui/themes/light.qss` - Modify: `src/ui/main_window.py` (theme toggle) - Modify: `src/ui/sidebar.py` (add theme toggle button at bottom) **Step 1: Create dark theme QSS** Style the sidebar, buttons, tables, charts, inputs with a modern dark palette. Key colors: - Background: `#1e1e2e` - Surface: `#2a2a3c` - Text: `#cdd6f4` - Accent: `#89b4fa` - Border: `#45475a` **Step 2: Create light theme QSS** Corresponding light palette. **Step 3: Add toggle to sidebar** Add a theme toggle button at the bottom of the sidebar. On click, swap the stylesheet and update matplotlib chart colors. **Step 4: Commit** ```bash git add src/ui/themes/ src/ui/main_window.py src/ui/sidebar.py git commit -m "feat: dark and light theme support" ``` --- ### Task 19: Cross-Account Transfer Detection **Files:** - Create: `src/services/transfer_detector.py` - Create: `tests/services/test_transfer_detector.py` - Modify: `src/services/importer.py` (run after import) **Step 1: Write failing tests** Test that matching transactions across accounts (same date +-2 days, same absolute amount, one positive one negative) are flagged as transfers. **Step 2: Implement transfer detector** Scan for pairs where: `abs(txn_a.amount) == abs(txn_b.amount)`, accounts differ, dates within 2 days, and one is a known transfer pattern (e.g., "Payment Thank You", "CREDIT CRD EPAY"). **Step 3: Run tests** Run: `pytest tests/services/test_transfer_detector.py -v` **Step 4: Commit** ```bash git add src/services/transfer_detector.py tests/services/test_transfer_detector.py src/services/importer.py git commit -m "feat: cross-account transfer detection" ``` --- ### Task 20: Default Categorization Rules (Seed) **Files:** - Modify: `src/seed.py` - Modify: `tests/test_seed.py` **Step 1: Add default rules based on real data patterns** Add a `seed_default_rules` function that creates the initial rule set from the design doc (CIMTECHNIQUES -> Income, HELLOFRESH -> Groceries, etc.). Requires household members to exist first, so only seed person-attributed rules if matching members are found. **Step 2: Run tests** Run: `pytest tests/test_seed.py -v` **Step 3: Commit** ```bash git add src/seed.py tests/test_seed.py git commit -m "feat: default categorization rules from known patterns" ``` --- ### Task 21: Integration Test with Real Data **Files:** - Create: `tests/test_integration.py` **Step 1: Write end-to-end test** ```python # tests/test_integration.py from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import Session from src.db import Base from src.models import * from src.seed import seed_categories, seed_household from src.services.importer import ImportService from src.services.analysis import AnalysisService from src.services.recurring import RecurringDetector RAWDATA = Path(__file__).parent.parent / "rawdata" def test_full_pipeline(): engine = create_engine("sqlite:///:memory:") Base.metadata.create_all(engine) session = Session(engine) seed_categories(session) andrew = seed_household(session, "Andrew", "self") # Set up accounts chase = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=andrew.id) checking = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=andrew.id, is_shared=True) session.add_all([chase, checking]) session.flush() svc = ImportService(session) # Import Chase r1 = svc.import_csv( RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV", account_id=chase.id, column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category"}, amount_logic="signed", ) assert r1["imported"] > 100 # Import Checking r2 = svc.import_csv( RAWDATA / "Checking1.csv", account_id=checking.id, column_map={"date": 0, "amount": 1, "description": 4}, amount_logic="signed", ) assert r2["imported"] > 50 # Analysis works analysis = AnalysisService(session) monthly = analysis.spending_by_period("month") assert len(monthly) >= 1 by_cat = analysis.spending_by_category() assert len(by_cat) >= 1 # Recurring detection works detector = RecurringDetector(session) recurring = detector.detect() assert len(recurring) >= 0 # May or may not find patterns depending on date range ``` **Step 2: Run test** Run: `pytest tests/test_integration.py -v` Expected: PASS **Step 3: Commit** ```bash git add tests/test_integration.py git commit -m "test: end-to-end integration test with real CSV data" ``` --- ## Summary | Phase | Tasks | Focus | |-------|-------|-------| | 1: Foundation | 1-3 | Project setup, models, seed data | | 2: Import Pipeline | 4-6 | Normalizer, CSV reader, import service | | 3: Categorization | 7-8 | Rule engine, post-import categorization | | 4: Analysis | 9-11 | Spending analysis, recurring detection, forecasting | | 5: UI | 12-17 | Main window, import wizard, transactions, analysis charts, recurring, settings | | 6: Polish | 18-21 | Theming, transfer detection, default rules, integration test | Tasks 1-11 are fully TDD. Tasks 12-18 are UI-focused with manual verification. Tasks 19-21 return to TDD for backend features and integration testing.