Files
SpendingAnalysis/docs/plans/2026-02-10-spending-analysis-impl-plan.md
andy 3f8b1d9056 Add implementation plan with 21 tasks across 6 phases
Covers: project setup, database models, CSV import pipeline with
description normalization, rule-based categorization engine, spending
analysis/recurring detection/forecasting services, PySide6 desktop UI
with import wizard/transactions/analysis/recurring/settings views,
dark/light theming, and integration testing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 14:17:03 -05:00

71 KiB

SpendingAnalysis Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build a desktop spending analysis app that imports bank/credit card CSVs, auto-categorizes transactions, and provides visual analysis of spending habits with forecasting.

Architecture: Three-layer (data/service/UI) Python app. SQLite via SQLAlchemy for storage, pandas for CSV/analysis, PySide6 for desktop UI, matplotlib for charts. Categorization engine behind a Protocol for future AI extensibility.

Tech Stack: Python 3.12+, PySide6, SQLAlchemy, pandas, matplotlib, pytest


Phase 1: Foundation

Task 1: Project Setup

Files:

  • Create: pyproject.toml
  • Create: src/__init__.py
  • Create: src/models/__init__.py
  • Create: src/services/__init__.py
  • Create: src/ui/__init__.py
  • Create: tests/__init__.py
  • Create: tests/services/__init__.py
  • Create: tests/models/__init__.py

Step 1: Create pyproject.toml

[project]
name = "spending-analysis"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "PySide6>=6.6",
    "sqlalchemy>=2.0",
    "pandas>=2.0",
    "matplotlib>=3.8",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-qt>=4.2",
]

[project.scripts]
spending-analysis = "src.main:main"

Step 2: Create directory structure

Create all __init__.py files listed above (empty files).

Step 3: Create virtual environment and install

Run: python -m venv .venv && .venv\Scripts\activate && pip install -e ".[dev]"

Step 4: Verify installation

Run: python -c "import PySide6; import sqlalchemy; import pandas; import matplotlib; print('OK')" Expected: OK

Step 5: Commit

git add pyproject.toml src/ tests/
git commit -m "feat: project setup with dependencies"

Task 2: Database Models

Files:

  • Create: src/db.py
  • Create: src/models/household.py
  • Create: src/models/account.py
  • Create: src/models/category.py
  • Create: src/models/transaction.py
  • Create: src/models/rule.py
  • Create: src/models/csv_mapping.py
  • Create: tests/models/test_models.py

Step 1: Write failing test for model creation

# tests/models/test_models.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models.household import HouseholdMember
from src.models.account import Account
from src.models.category import Category
from src.models.transaction import Transaction
from src.models.rule import CategorizationRule
from src.models.csv_mapping import CsvMapping


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def test_create_household_member():
    session = make_session()
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.commit()
    assert member.id is not None
    assert member.name == "Andrew"


def test_create_account_with_owner():
    session = make_session()
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(
        name="Chase Freedom",
        institution="Chase",
        account_type="credit",
        owner_id=member.id,
    )
    session.add(account)
    session.commit()
    assert account.owner.name == "Andrew"


def test_create_category():
    session = make_session()
    cat = Category(name="Groceries", default_tag="needs")
    session.add(cat)
    session.commit()
    assert cat.id is not None


def test_create_transaction():
    session = make_session()
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id)
    cat = Category(name="Groceries", default_tag="needs")
    session.add_all([account, cat])
    session.flush()
    txn = Transaction(
        date="2026-01-15",
        amount=-48.52,
        description="WAL-MART #7181",
        raw_description="PURCHASE AUTHORIZED ON 01/14 WAL-MART #7181 BEAUFORT SC CARD 5360",
        account_id=account.id,
        category_id=cat.id,
        attributed_to_id=member.id,
        tag="needs",
    )
    session.add(txn)
    session.commit()
    assert txn.id is not None
    assert txn.account.name == "Checking"
    assert txn.category.name == "Groceries"


def test_create_categorization_rule():
    session = make_session()
    cat = Category(name="Groceries", default_tag="needs")
    session.add(cat)
    session.flush()
    rule = CategorizationRule(
        pattern="WAL-MART",
        category_id=cat.id,
        priority=10,
    )
    session.add(rule)
    session.commit()
    assert rule.id is not None


def test_create_csv_mapping():
    session = make_session()
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()
    mapping = CsvMapping(
        name="Chase Credit Card",
        fingerprint="Transaction Date,Post Date,Description,Category,Type,Amount,Memo",
        column_map='{"date": "Transaction Date", "amount": "Amount", "description": "Description"}',
        amount_logic="signed",
        account_id=account.id,
    )
    session.add(mapping)
    session.commit()
    assert mapping.id is not None

Step 2: Run test to verify it fails

Run: pytest tests/models/test_models.py -v Expected: FAIL (imports not found)

Step 3: Implement db.py and all models

# src/db.py
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session

class Base(DeclarativeBase):
    pass

def get_engine(db_path: Path | None = None):
    if db_path is None:
        db_path = Path.home() / ".spending_analysis" / "spending.db"
        db_path.parent.mkdir(parents=True, exist_ok=True)
    return create_engine(f"sqlite:///{db_path}")

def get_session(db_path: Path | None = None) -> Session:
    engine = get_engine(db_path)
    Base.metadata.create_all(engine)
    return Session(engine)
# src/models/household.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db import Base

class HouseholdMember(Base):
    __tablename__ = "household_members"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    relationship: Mapped[str] = mapped_column(String(50))
    accounts: Mapped[list["Account"]] = relationship(back_populates="owner")
# src/models/account.py
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db import Base

class Account(Base):
    __tablename__ = "accounts"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    institution: Mapped[str] = mapped_column(String(100))
    account_type: Mapped[str] = mapped_column(String(20))  # checking, credit, savings
    owner_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
    is_shared: Mapped[bool] = mapped_column(default=False)
    owner: Mapped["HouseholdMember | None"] = relationship(back_populates="accounts")
# src/models/category.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from src.db import Base

class Category(Base):
    __tablename__ = "categories"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True)
    default_tag: Mapped[str | None] = mapped_column(String(20))  # needs, wants, savings
    icon: Mapped[str | None] = mapped_column(String(50))
# src/models/transaction.py
import datetime
from sqlalchemy import Date, ForeignKey, Numeric, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db import Base

class Transaction(Base):
    __tablename__ = "transactions"
    id: Mapped[int] = mapped_column(primary_key=True)
    date: Mapped[datetime.date] = mapped_column(Date)
    amount: Mapped[float] = mapped_column(Numeric(10, 2))
    description: Mapped[str] = mapped_column(String(300))
    raw_description: Mapped[str | None] = mapped_column(Text)
    account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id"))
    category_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id"))
    attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
    tag: Mapped[str | None] = mapped_column(String(20))
    source_category: Mapped[str | None] = mapped_column(String(100))
    is_transfer: Mapped[bool] = mapped_column(default=False)
    transfer_pair_id: Mapped[int | None] = mapped_column(ForeignKey("transactions.id"))
    account: Mapped["Account"] = relationship()
    category: Mapped["Category | None"] = relationship()
    attributed_to: Mapped["HouseholdMember | None"] = relationship()
# src/models/rule.py
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db import Base

class CategorizationRule(Base):
    __tablename__ = "categorization_rules"
    id: Mapped[int] = mapped_column(primary_key=True)
    pattern: Mapped[str] = mapped_column(String(300))
    category_id: Mapped[int] = mapped_column(ForeignKey("categories.id"))
    tag_override: Mapped[str | None] = mapped_column(String(20))
    attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
    priority: Mapped[int] = mapped_column(default=0)
    category: Mapped["Category"] = relationship()
    attributed_to: Mapped["HouseholdMember | None"] = relationship()
# src/models/csv_mapping.py
from sqlalchemy import ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from src.db import Base

class CsvMapping(Base):
    __tablename__ = "csv_mappings"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    fingerprint: Mapped[str] = mapped_column(String(500))
    column_map: Mapped[str] = mapped_column(Text)  # JSON
    amount_logic: Mapped[str] = mapped_column(String(50))  # signed, separate_columns, type_column
    account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id"))
    account: Mapped["Account"] = relationship()

Update src/models/__init__.py to re-export all models:

from src.models.household import HouseholdMember
from src.models.account import Account
from src.models.category import Category
from src.models.transaction import Transaction
from src.models.rule import CategorizationRule
from src.models.csv_mapping import CsvMapping

Step 4: Run tests

Run: pytest tests/models/test_models.py -v Expected: All 6 tests PASS

Step 5: Commit

git add src/db.py src/models/ tests/models/
git commit -m "feat: database models and connection management"

Task 3: Seed Data

Files:

  • Create: src/seed.py
  • Create: tests/test_seed.py

Step 1: Write failing test

# tests/test_seed.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.seed import seed_categories, seed_household, DEFAULT_CATEGORIES
from src.models import Category, HouseholdMember


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def test_seed_categories():
    session = make_session()
    seed_categories(session)
    cats = session.query(Category).all()
    assert len(cats) == len(DEFAULT_CATEGORIES)
    names = {c.name for c in cats}
    assert "Groceries" in names
    assert "Transfer" in names
    assert "Income" in names


def test_seed_categories_idempotent():
    session = make_session()
    seed_categories(session)
    seed_categories(session)
    cats = session.query(Category).all()
    assert len(cats) == len(DEFAULT_CATEGORIES)


def test_seed_household():
    session = make_session()
    seed_household(session, "Andrew", "self")
    members = session.query(HouseholdMember).all()
    assert len(members) == 1
    assert members[0].name == "Andrew"

Step 2: Run test to verify it fails

Run: pytest tests/test_seed.py -v Expected: FAIL

Step 3: Implement seed.py

# src/seed.py
from sqlalchemy.orm import Session
from src.models import Category, HouseholdMember

DEFAULT_CATEGORIES = [
    ("Income", None, None),
    ("Housing", "needs", None),
    ("Groceries", "needs", None),
    ("Dining Out", "wants", None),
    ("Transportation", "needs", None),
    ("Gas", "needs", None),
    ("Utilities", "needs", None),
    ("Insurance", "needs", None),
    ("Healthcare", "needs", None),
    ("Entertainment", "wants", None),
    ("Shopping", "wants", None),
    ("Subscriptions", "wants", None),
    ("Personal Care", "wants", None),
    ("Family", "needs", None),
    ("Gifts & Donations", "wants", None),
    ("Debt Payment", "needs", None),
    ("Savings", "savings", None),
    ("Transfer", None, None),
    ("Travel", "wants", None),
    ("Home", "needs", None),
    ("Professional Services", "needs", None),
    ("Uncategorized", None, None),
]


def seed_categories(session: Session) -> None:
    existing = {c.name for c in session.query(Category).all()}
    for name, tag, icon in DEFAULT_CATEGORIES:
        if name not in existing:
            session.add(Category(name=name, default_tag=tag, icon=icon))
    session.commit()


def seed_household(session: Session, name: str, relationship: str) -> HouseholdMember:
    existing = session.query(HouseholdMember).filter_by(name=name).first()
    if existing:
        return existing
    member = HouseholdMember(name=name, relationship=relationship)
    session.add(member)
    session.commit()
    return member

Step 4: Run tests

Run: pytest tests/test_seed.py -v Expected: All 3 tests PASS

Step 5: Commit

git add src/seed.py tests/test_seed.py
git commit -m "feat: seed data for default categories and household"

Phase 2: CSV Import Pipeline

Task 4: Description Normalizer

Files:

  • Create: src/services/normalizer.py
  • Create: tests/services/test_normalizer.py

Step 1: Write failing tests using real transaction data

# tests/services/test_normalizer.py
from src.services.normalizer import normalize_description


def test_strip_authorization_prefix():
    raw = "PURCHASE AUTHORIZED ON 02/06 WALMART.COM 8009256278 BENTONVILLE AR P000000089502338 CARD 5360"
    result = normalize_description(raw)
    assert "AUTHORIZED ON" not in result
    assert "CARD 5360" not in result
    assert "WALMART.COM" in result


def test_strip_recurring_prefix():
    raw = "RECURRING PAYMENT AUTHORIZED ON 02/05 HELLOFRESH 646-846-3663 NY S356036316425851 CARD 5360"
    result = normalize_description(raw)
    assert "AUTHORIZED ON" not in result
    assert "HELLOFRESH" in result


def test_strip_reference_ids():
    raw = "RECURRING TRANSFER TO CONLON A WAY2SAVE SAVINGS REF #OP0WS99NKQ XXXXXX6065"
    result = normalize_description(raw)
    assert "REF #" not in result
    assert "XXXXXX" not in result
    assert "WAY2SAVE SAVINGS" in result


def test_strip_card_number():
    raw = "PURCHASE AUTHORIZED ON 01/08 MRS B COMPANY LLC PORT ROYAL SC S386008692282379 CARD 5360"
    result = normalize_description(raw)
    assert "CARD 5360" not in result
    assert "MRS B COMPANY LLC" in result


def test_strip_transaction_codes():
    raw = "OASISBATCH PAYROLL 260109 MP027126352 DONNA CONLON"
    result = normalize_description(raw)
    assert "OASISBATCH PAYROLL" in result
    assert "DONNA CONLON" in result


def test_clean_chase_description():
    """Chase descriptions are already clean, should pass through mostly unchanged."""
    raw = "PUBLIX #1716"
    result = normalize_description(raw)
    assert result == "PUBLIX #1716"


def test_check_number():
    raw = "CHECK # 104"
    result = normalize_description(raw)
    assert "CHECK" in result


def test_strip_html_entities():
    raw = "TST*PONCHOS TACOS &amp; BEE"
    result = normalize_description(raw)
    assert "&amp;" not in result
    assert "&" in result

Step 2: Run to verify failure

Run: pytest tests/services/test_normalizer.py -v

Step 3: Implement normalizer

# src/services/normalizer.py
import re
import html


def normalize_description(raw: str) -> str:
    desc = html.unescape(raw.strip())

    # Strip "PURCHASE AUTHORIZED ON MM/DD" or "RECURRING PAYMENT AUTHORIZED ON MM/DD"
    desc = re.sub(r"^(PURCHASE|RECURRING PAYMENT|RECURRING TRANSFER)\s+AUTHORIZED ON \d{2}/\d{2}\s+", "", desc)

    # Strip "CARD XXXX" at end
    desc = re.sub(r"\s+CARD\s+\d{4}\s*$", "", desc)

    # Strip long alphanumeric transaction/reference codes (10+ chars, mixed letters/digits)
    desc = re.sub(r"\s+[A-Z0-9]{10,}\b", "", desc)

    # Strip "REF #..." references
    desc = re.sub(r"\s+REF\s+#\S+", "", desc)

    # Strip masked account numbers
    desc = re.sub(r"\s+X{4,}\d+", "", desc)

    # Strip phone numbers
    desc = re.sub(r"\s+\d{3}-\d{3}-\d{4}", "", desc)

    # Strip trailing state abbreviations + zip that follow addresses
    # But keep them if they're part of merchant name (be conservative)
    desc = re.sub(r"\s+[A-Z]{2}\s+\d{5}(?:-\d{4})?\s*$", "", desc)

    # Clean up multiple spaces
    desc = re.sub(r"\s{2,}", " ", desc).strip()

    return desc

Step 4: Run tests, iterate on regex until all pass

Run: pytest tests/services/test_normalizer.py -v Expected: All 8 tests PASS

Step 5: Commit

git add src/services/normalizer.py tests/services/test_normalizer.py
git commit -m "feat: transaction description normalizer"

Task 5: CSV Reader and Format Detection

Files:

  • Create: src/services/csv_reader.py
  • Create: tests/services/test_csv_reader.py
  • Reference: rawdata/Chase0372_Activity20260101_20260210_20260210.CSV
  • Reference: rawdata/Checking1.csv

Step 1: Write failing tests

# tests/services/test_csv_reader.py
from pathlib import Path
from src.services.csv_reader import read_csv, detect_format

RAWDATA = Path(__file__).parent.parent.parent / "rawdata"


def test_detect_chase_format():
    result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
    assert result["has_headers"] is True
    assert "Transaction Date" in result["headers"]
    assert result["delimiter"] == ","
    assert result["row_count"] > 0


def test_detect_checking_format():
    result = detect_format(RAWDATA / "Checking1.csv")
    assert result["has_headers"] is False
    assert result["column_count"] == 5
    assert result["row_count"] > 0


def test_read_chase_csv():
    rows = read_csv(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
    assert len(rows) > 100
    first = rows[0]
    assert "Transaction Date" in first
    assert "Amount" in first
    assert "Description" in first


def test_read_headerless_csv():
    rows = read_csv(RAWDATA / "Checking1.csv")
    assert len(rows) > 50
    first = rows[0]
    # Headerless: keys should be column indices
    assert 0 in first or "0" in first


def test_preview_rows():
    result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
    assert "preview" in result
    assert len(result["preview"]) <= 5

Step 2: Run to verify failure

Run: pytest tests/services/test_csv_reader.py -v

Step 3: Implement csv_reader.py

# src/services/csv_reader.py
import csv
from pathlib import Path


def detect_format(file_path: Path) -> dict:
    with open(file_path, "r", encoding="utf-8-sig") as f:
        sample = f.read(8192)

    dialect = csv.Sniffer().sniff(sample)
    has_headers = csv.Sniffer().has_header(sample)

    with open(file_path, "r", encoding="utf-8-sig") as f:
        reader = csv.reader(f, dialect)
        all_rows = list(reader)

    # Filter empty rows
    all_rows = [r for r in all_rows if any(cell.strip() for cell in r)]

    result = {
        "delimiter": dialect.delimiter,
        "has_headers": has_headers,
        "file_path": str(file_path),
    }

    if has_headers:
        result["headers"] = all_rows[0]
        data_rows = all_rows[1:]
    else:
        result["column_count"] = len(all_rows[0]) if all_rows else 0
        data_rows = all_rows

    result["row_count"] = len(data_rows)
    result["preview"] = data_rows[:5]

    return result


def read_csv(file_path: Path) -> list[dict]:
    with open(file_path, "r", encoding="utf-8-sig") as f:
        sample = f.read(8192)

    dialect = csv.Sniffer().sniff(sample)
    has_headers = csv.Sniffer().has_header(sample)

    with open(file_path, "r", encoding="utf-8-sig") as f:
        if has_headers:
            reader = csv.DictReader(f, dialect=dialect)
            return [dict(row) for row in reader if any(v.strip() for v in row.values())]
        else:
            reader = csv.reader(f, dialect)
            rows = []
            for row in reader:
                if any(cell.strip() for cell in row):
                    rows.append({i: cell for i, cell in enumerate(row)})
            return rows

Step 4: Run tests

Run: pytest tests/services/test_csv_reader.py -v Expected: All 5 tests PASS

Step 5: Commit

git add src/services/csv_reader.py tests/services/test_csv_reader.py
git commit -m "feat: CSV reader with format auto-detection"

Task 6: Import Service

Files:

  • Create: src/services/importer.py
  • Create: tests/services/test_importer.py

Step 1: Write failing tests

# tests/services/test_importer.py
import datetime
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.importer import ImportService

RAWDATA = Path(__file__).parent.parent.parent / "rawdata"


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def setup_chase_account(session):
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()
    seed_categories(session)
    return account


def setup_checking_account(session):
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id, is_shared=True)
    session.add(account)
    session.flush()
    seed_categories(session)
    return account


def test_import_chase_csv():
    session = make_session()
    account = setup_chase_account(session)
    column_map = {
        "date": "Transaction Date",
        "amount": "Amount",
        "description": "Description",
        "source_category": "Category",
    }
    svc = ImportService(session)
    result = svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=account.id,
        column_map=column_map,
        amount_logic="signed",
    )
    assert result["imported"] > 0
    assert result["duplicates"] == 0
    txns = session.query(Transaction).all()
    assert len(txns) == result["imported"]
    # Check a known transaction
    sephora = [t for t in txns if "SEPHORA" in t.description]
    assert len(sephora) == 1
    assert sephora[0].amount == -75.00


def test_import_checking_csv():
    session = make_session()
    account = setup_checking_account(session)
    column_map = {
        "date": 0,
        "amount": 1,
        "description": 4,
    }
    svc = ImportService(session)
    result = svc.import_csv(
        RAWDATA / "Checking1.csv",
        account_id=account.id,
        column_map=column_map,
        amount_logic="signed",
    )
    assert result["imported"] > 0
    txns = session.query(Transaction).all()
    assert len(txns) > 50


def test_duplicate_detection():
    session = make_session()
    account = setup_chase_account(session)
    column_map = {
        "date": "Transaction Date",
        "amount": "Amount",
        "description": "Description",
    }
    svc = ImportService(session)
    result1 = svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=account.id,
        column_map=column_map,
        amount_logic="signed",
    )
    result2 = svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=account.id,
        column_map=column_map,
        amount_logic="signed",
    )
    assert result2["duplicates"] == result1["imported"]
    assert result2["imported"] == 0

Step 2: Run to verify failure

Run: pytest tests/services/test_importer.py -v

Step 3: Implement import service

# src/services/importer.py
import datetime
from pathlib import Path
from sqlalchemy.orm import Session

from src.models.transaction import Transaction
from src.services.csv_reader import read_csv
from src.services.normalizer import normalize_description


class ImportService:
    def __init__(self, session: Session):
        self.session = session

    def import_csv(
        self,
        file_path: Path,
        account_id: int,
        column_map: dict,
        amount_logic: str = "signed",
    ) -> dict:
        rows = read_csv(file_path)
        imported = 0
        duplicates = 0

        for row in rows:
            date_val = self._parse_date(row[column_map["date"]])
            amount_val = self._parse_amount(row, column_map, amount_logic)
            raw_desc = row[column_map["description"]].strip()
            description = normalize_description(raw_desc)
            source_cat = row.get(column_map.get("source_category", "__missing__"), "").strip() or None

            if date_val is None or amount_val is None:
                continue

            # Duplicate check
            existing = (
                self.session.query(Transaction)
                .filter_by(date=date_val, amount=amount_val, raw_description=raw_desc, account_id=account_id)
                .first()
            )
            if existing:
                duplicates += 1
                continue

            txn = Transaction(
                date=date_val,
                amount=amount_val,
                description=description,
                raw_description=raw_desc,
                account_id=account_id,
                source_category=source_cat,
            )
            self.session.add(txn)
            imported += 1

        self.session.commit()
        return {"imported": imported, "duplicates": duplicates, "total_rows": len(rows)}

    def _parse_date(self, val: str) -> datetime.date | None:
        val = val.strip().strip('"')
        for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m-%d-%Y"):
            try:
                return datetime.datetime.strptime(val, fmt).date()
            except ValueError:
                continue
        return None

    def _parse_amount(self, row: dict, column_map: dict, logic: str) -> float | None:
        try:
            if logic == "signed":
                return float(row[column_map["amount"]].strip().strip('"').replace(",", ""))
            # Add other logic types as needed
        except (ValueError, KeyError):
            return None

Step 4: Run tests

Run: pytest tests/services/test_importer.py -v Expected: All 3 tests PASS

Step 5: Commit

git add src/services/importer.py tests/services/test_importer.py
git commit -m "feat: CSV import service with duplicate detection"

Phase 3: Categorization

Task 7: Categorization Engine

Files:

  • Create: src/services/categorizer.py
  • Create: tests/services/test_categorizer.py

Step 1: Write failing tests

# tests/services/test_categorizer.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.categorizer import RuleBasedCategorizer, Categorizer


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def test_categorizer_protocol():
    """RuleBasedCategorizer implements Categorizer protocol."""
    session = make_session()
    cat = RuleBasedCategorizer(session)
    assert isinstance(cat, Categorizer)


def test_match_simple_pattern():
    session = make_session()
    seed_categories(session)
    groceries = session.query(Category).filter_by(name="Groceries").one()
    rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10)
    session.add(rule)
    session.commit()

    cat = RuleBasedCategorizer(session)
    txn = Transaction(date="2026-01-15", amount=-44.90, description="PUBLIX #1716", account_id=1)
    result = cat.categorize(txn)
    assert result is not None
    assert result.category_id == groceries.id


def test_match_pipe_separated_pattern():
    session = make_session()
    seed_categories(session)
    groceries = session.query(Category).filter_by(name="Groceries").one()
    rule = CategorizationRule(pattern="PUBLIX|ALDI|PIGGLY WIGGLY", category_id=groceries.id, priority=10)
    session.add(rule)
    session.commit()

    cat = RuleBasedCategorizer(session)
    for desc in ["PUBLIX #1716", "ALDI 76180  BEAUFORT", "PIGGLY WIGGLY #286"]:
        txn = Transaction(date="2026-01-15", amount=-20.00, description=desc, account_id=1)
        result = cat.categorize(txn)
        assert result is not None, f"Failed to match: {desc}"
        assert result.category_id == groceries.id


def test_no_match_returns_none():
    session = make_session()
    seed_categories(session)
    cat = RuleBasedCategorizer(session)
    txn = Transaction(date="2026-01-15", amount=-10.00, description="UNKNOWN MERCHANT", account_id=1)
    result = cat.categorize(txn)
    assert result is None


def test_priority_ordering():
    session = make_session()
    seed_categories(session)
    groceries = session.query(Category).filter_by(name="Groceries").one()
    shopping = session.query(Category).filter_by(name="Shopping").one()
    # Lower priority number = higher priority
    rule1 = CategorizationRule(pattern="WAL-MART", category_id=groceries.id, priority=1)
    rule2 = CategorizationRule(pattern="WAL", category_id=shopping.id, priority=10)
    session.add_all([rule1, rule2])
    session.commit()

    cat = RuleBasedCategorizer(session)
    txn = Transaction(date="2026-01-15", amount=-48.52, description="WAL-MART #7181", account_id=1)
    result = cat.categorize(txn)
    assert result.category_id == groceries.id


def test_tag_override():
    session = make_session()
    seed_categories(session)
    dining = session.query(Category).filter_by(name="Dining Out").one()
    rule = CategorizationRule(pattern="CHICK-FIL-A", category_id=dining.id, tag_override="needs", priority=10)
    session.add(rule)
    session.commit()

    cat = RuleBasedCategorizer(session)
    txn = Transaction(date="2026-01-15", amount=-5.39, description="CHICK-FIL-A #01476", account_id=1)
    result = cat.categorize(txn)
    assert result.tag == "needs"  # Override from category default "wants"


def test_person_attribution():
    session = make_session()
    seed_categories(session)
    donna = HouseholdMember(name="Donna", relationship="wife")
    session.add(donna)
    session.flush()
    income = session.query(Category).filter_by(name="Income").one()
    rule = CategorizationRule(pattern="OASISBATCH PAYROLL", category_id=income.id, attributed_to_id=donna.id, priority=10)
    session.add(rule)
    session.commit()

    cat = RuleBasedCategorizer(session)
    txn = Transaction(date="2026-01-09", amount=1087.83, description="OASISBATCH PAYROLL 260109 DONNA CONLON", account_id=1)
    result = cat.categorize(txn)
    assert result.category_id == income.id
    assert result.attributed_to_id == donna.id

Step 2: Run to verify failure

Run: pytest tests/services/test_categorizer.py -v

Step 3: Implement categorizer

# src/services/categorizer.py
import re
from dataclasses import dataclass
from typing import Protocol, runtime_checkable

from sqlalchemy.orm import Session

from src.models.rule import CategorizationRule
from src.models.transaction import Transaction


@dataclass
class CategoryResult:
    category_id: int
    tag: str | None = None
    attributed_to_id: int | None = None


@runtime_checkable
class Categorizer(Protocol):
    def categorize(self, transaction: Transaction) -> CategoryResult | None: ...


class RuleBasedCategorizer:
    def __init__(self, session: Session):
        self.session = session
        self._rules: list[CategorizationRule] | None = None

    def _load_rules(self) -> list[CategorizationRule]:
        if self._rules is None:
            self._rules = (
                self.session.query(CategorizationRule)
                .order_by(CategorizationRule.priority)
                .all()
            )
        return self._rules

    def invalidate_cache(self) -> None:
        self._rules = None

    def categorize(self, transaction: Transaction) -> CategoryResult | None:
        rules = self._load_rules()
        desc = transaction.description.upper()

        for rule in rules:
            # Support pipe-separated patterns
            patterns = [p.strip() for p in rule.pattern.split("|")]
            for pattern in patterns:
                if re.search(re.escape(pattern.upper()), desc):
                    tag = rule.tag_override
                    if tag is None and rule.category:
                        tag = rule.category.default_tag
                    return CategoryResult(
                        category_id=rule.category_id,
                        tag=tag,
                        attributed_to_id=rule.attributed_to_id,
                    )

        return None

Step 4: Run tests

Run: pytest tests/services/test_categorizer.py -v Expected: All 7 tests PASS

Step 5: Commit

git add src/services/categorizer.py tests/services/test_categorizer.py
git commit -m "feat: rule-based categorization engine with protocol"

Task 8: Post-Import Categorization Integration

Files:

  • Modify: src/services/importer.py
  • Create: tests/services/test_import_categorize.py

Step 1: Write failing test for categorize-on-import

# tests/services/test_import_categorize.py
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.importer import ImportService

RAWDATA = Path(__file__).parent.parent.parent / "rawdata"


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def test_import_applies_categorization_rules():
    session = make_session()
    seed_categories(session)
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()

    groceries = session.query(Category).filter_by(name="Groceries").one()
    rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10)
    session.add(rule)
    session.commit()

    svc = ImportService(session)
    svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=account.id,
        column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"},
        amount_logic="signed",
    )

    publix_txns = session.query(Transaction).filter(Transaction.description.contains("PUBLIX")).all()
    assert len(publix_txns) > 0
    for txn in publix_txns:
        assert txn.category_id == groceries.id
        assert txn.tag == "needs"


def test_uncategorized_transactions_have_no_category():
    session = make_session()
    seed_categories(session)
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()

    # No rules defined — everything should be uncategorized
    svc = ImportService(session)
    svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=account.id,
        column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"},
        amount_logic="signed",
    )

    uncategorized = session.query(Transaction).filter(Transaction.category_id.is_(None)).count()
    total = session.query(Transaction).count()
    assert uncategorized == total

Step 2: Run to verify failure

Run: pytest tests/services/test_import_categorize.py -v

Step 3: Update ImportService to run categorization after import

Add to ImportService.import_csv(), after the main import loop but before session.commit():

from src.services.categorizer import RuleBasedCategorizer

# In import_csv, after building all transactions:
categorizer = RuleBasedCategorizer(self.session)
for txn in self.session.new:
    if isinstance(txn, Transaction) and txn.category_id is None:
        result = categorizer.categorize(txn)
        if result:
            txn.category_id = result.category_id
            txn.tag = result.tag
            txn.attributed_to_id = result.attributed_to_id

Step 4: Run tests

Run: pytest tests/services/test_import_categorize.py -v Expected: All 2 tests PASS

Step 5: Commit

git add src/services/importer.py tests/services/test_import_categorize.py
git commit -m "feat: auto-categorize transactions on import"

Phase 4: Analysis Services

Task 9: Spending Analysis Service

Files:

  • Create: src/services/analysis.py
  • Create: tests/services/test_analysis.py

Step 1: Write failing tests

# tests/services/test_analysis.py
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.analysis import AnalysisService


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def make_test_data(session):
    seed_categories(session)
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()
    groceries = session.query(Category).filter_by(name="Groceries").one()
    dining = session.query(Category).filter_by(name="Dining Out").one()

    txns = [
        Transaction(date=datetime.date(2026, 1, 5), amount=-50.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"),
        Transaction(date=datetime.date(2026, 1, 12), amount=-30.0, description="ALDI", account_id=account.id, category_id=groceries.id, tag="needs"),
        Transaction(date=datetime.date(2026, 1, 20), amount=-15.0, description="CHICK-FIL-A", account_id=account.id, category_id=dining.id, tag="wants"),
        Transaction(date=datetime.date(2026, 2, 3), amount=-60.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"),
        Transaction(date=datetime.date(2026, 2, 7), amount=-25.0, description="KFC", account_id=account.id, category_id=dining.id, tag="wants"),
    ]
    session.add_all(txns)
    session.commit()
    return account, member


def test_spending_by_month():
    session = make_session()
    make_test_data(session)
    svc = AnalysisService(session)
    result = svc.spending_by_period("month")
    assert len(result) == 2  # Jan and Feb
    jan = [r for r in result if r["period"] == "2026-01"][0]
    assert jan["total"] == -95.0


def test_spending_by_category():
    session = make_session()
    make_test_data(session)
    svc = AnalysisService(session)
    result = svc.spending_by_category(
        start=datetime.date(2026, 1, 1),
        end=datetime.date(2026, 2, 28),
    )
    groceries_row = [r for r in result if r["category"] == "Groceries"][0]
    assert groceries_row["total"] == -140.0


def test_spending_by_tag():
    session = make_session()
    make_test_data(session)
    svc = AnalysisService(session)
    result = svc.spending_by_tag(
        start=datetime.date(2026, 1, 1),
        end=datetime.date(2026, 2, 28),
    )
    needs = [r for r in result if r["tag"] == "needs"][0]
    wants = [r for r in result if r["tag"] == "wants"][0]
    assert needs["total"] == -140.0
    assert wants["total"] == -40.0


def test_spending_filtered_by_person():
    session = make_session()
    account, member = make_test_data(session)
    # Attribute some transactions to Andrew
    txns = session.query(Transaction).all()
    for t in txns:
        t.attributed_to_id = member.id
    session.commit()

    svc = AnalysisService(session)
    result = svc.spending_by_category(
        start=datetime.date(2026, 1, 1),
        end=datetime.date(2026, 2, 28),
        person_id=member.id,
    )
    total = sum(r["total"] for r in result)
    assert total == -180.0

Step 2: Run to verify failure

Run: pytest tests/services/test_analysis.py -v

Step 3: Implement analysis service

# src/services/analysis.py
import datetime
from sqlalchemy import func, extract
from sqlalchemy.orm import Session

from src.models.transaction import Transaction
from src.models.category import Category


class AnalysisService:
    def __init__(self, session: Session):
        self.session = session

    def _base_query(self, start=None, end=None, person_id=None, account_id=None, category_id=None):
        q = self.session.query(Transaction).filter(Transaction.is_transfer == False)
        if start:
            q = q.filter(Transaction.date >= start)
        if end:
            q = q.filter(Transaction.date <= end)
        if person_id:
            q = q.filter(Transaction.attributed_to_id == person_id)
        if account_id:
            q = q.filter(Transaction.account_id == account_id)
        if category_id:
            q = q.filter(Transaction.category_id == category_id)
        return q

    def spending_by_period(self, period: str = "month", **filters) -> list[dict]:
        q = self._base_query(**filters)
        if period == "month":
            q = (
                q.with_entities(
                    func.strftime("%Y-%m", Transaction.date).label("period"),
                    func.sum(Transaction.amount).label("total"),
                )
                .group_by("period")
                .order_by("period")
            )
        elif period == "week":
            q = (
                q.with_entities(
                    func.strftime("%Y-W%W", Transaction.date).label("period"),
                    func.sum(Transaction.amount).label("total"),
                )
                .group_by("period")
                .order_by("period")
            )
        elif period == "day":
            q = (
                q.with_entities(
                    func.strftime("%Y-%m-%d", Transaction.date).label("period"),
                    func.sum(Transaction.amount).label("total"),
                )
                .group_by("period")
                .order_by("period")
            )
        return [{"period": r.period, "total": float(r.total)} for r in q.all()]

    def spending_by_category(self, start=None, end=None, **filters) -> list[dict]:
        q = self._base_query(start=start, end=end, **filters)
        q = (
            q.join(Category, Transaction.category_id == Category.id, isouter=True)
            .with_entities(
                func.coalesce(Category.name, "Uncategorized").label("category"),
                func.sum(Transaction.amount).label("total"),
                func.count(Transaction.id).label("count"),
            )
            .group_by("category")
            .order_by(func.sum(Transaction.amount))
        )
        return [{"category": r.category, "total": float(r.total), "count": r.count} for r in q.all()]

    def spending_by_tag(self, start=None, end=None, **filters) -> list[dict]:
        q = self._base_query(start=start, end=end, **filters)
        q = (
            q.with_entities(
                func.coalesce(Transaction.tag, "untagged").label("tag"),
                func.sum(Transaction.amount).label("total"),
            )
            .group_by("tag")
            .order_by(func.sum(Transaction.amount))
        )
        return [{"tag": r.tag, "total": float(r.total)} for r in q.all()]

Step 4: Run tests

Run: pytest tests/services/test_analysis.py -v Expected: All 4 tests PASS

Step 5: Commit

git add src/services/analysis.py tests/services/test_analysis.py
git commit -m "feat: spending analysis service with period, category, and tag breakdowns"

Task 10: Recurring Charge Detection

Files:

  • Create: src/services/recurring.py
  • Create: tests/services/test_recurring.py

Step 1: Write failing tests

# tests/services/test_recurring.py
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.services.recurring import RecurringDetector


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def make_recurring_data(session):
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Checking", institution="WF", account_type="checking", owner_id=member.id)
    session.add(account)
    session.flush()

    # Netflix monthly: ~$19.07 on 3rd/4th of month
    for month in [1, 2]:
        session.add(Transaction(
            date=datetime.date(2026, month, 4),
            amount=-19.07, description="Netflix.com", account_id=account.id,
        ))

    # HelloFresh weekly: ~$142.87
    for day in [1, 8, 15, 22, 29]:
        session.add(Transaction(
            date=datetime.date(2026, 1, day),
            amount=-142.87, description="HELLOFRESH", account_id=account.id,
        ))

    # Random one-off purchase
    session.add(Transaction(
        date=datetime.date(2026, 1, 10),
        amount=-95.38, description="CARL'S GOLFLAND INC", account_id=account.id,
    ))

    session.commit()
    return account


def test_detect_monthly_recurring():
    session = make_session()
    make_recurring_data(session)
    detector = RecurringDetector(session)
    results = detector.detect()
    netflix = [r for r in results if "Netflix" in r["description"]]
    assert len(netflix) == 1
    assert netflix[0]["frequency"] == "monthly"
    assert abs(netflix[0]["typical_amount"] - 19.07) < 0.01


def test_detect_weekly_recurring():
    session = make_session()
    make_recurring_data(session)
    detector = RecurringDetector(session)
    results = detector.detect()
    hello = [r for r in results if "HELLOFRESH" in r["description"]]
    assert len(hello) == 1
    assert hello[0]["frequency"] == "weekly"


def test_one_off_not_detected():
    session = make_session()
    make_recurring_data(session)
    detector = RecurringDetector(session)
    results = detector.detect()
    golf = [r for r in results if "GOLFLAND" in r["description"]]
    assert len(golf) == 0


def test_annual_cost_calculation():
    session = make_session()
    make_recurring_data(session)
    detector = RecurringDetector(session)
    results = detector.detect()
    netflix = [r for r in results if "Netflix" in r["description"]][0]
    assert abs(netflix["annual_cost"] - 19.07 * 12) < 1.0

Step 2: Run to verify failure

Run: pytest tests/services/test_recurring.py -v

Step 3: Implement recurring detector

# src/services/recurring.py
from collections import defaultdict
from sqlalchemy.orm import Session

from src.models.transaction import Transaction


class RecurringDetector:
    def __init__(self, session: Session):
        self.session = session

    def detect(self, amount_tolerance: float = 0.10) -> list[dict]:
        txns = (
            self.session.query(Transaction)
            .filter(Transaction.amount < 0)
            .filter(Transaction.is_transfer == False)
            .order_by(Transaction.date)
            .all()
        )

        # Group by description (normalized)
        groups: dict[str, list[Transaction]] = defaultdict(list)
        for txn in txns:
            groups[txn.description].append(txn)

        results = []
        for desc, group in groups.items():
            if len(group) < 2:
                continue

            amounts = [abs(t.amount) for t in group]
            avg_amount = sum(amounts) / len(amounts)

            # Check amount consistency
            if any(abs(a - avg_amount) / avg_amount > amount_tolerance for a in amounts):
                continue

            # Detect frequency from intervals
            dates = sorted(t.date for t in group)
            intervals = [(dates[i + 1] - dates[i]).days for i in range(len(dates) - 1)]
            avg_interval = sum(intervals) / len(intervals)

            frequency = self._classify_frequency(avg_interval)
            if frequency is None:
                continue

            annual_multiplier = {"weekly": 52, "biweekly": 26, "monthly": 12, "quarterly": 4, "annual": 1}

            results.append({
                "description": desc,
                "typical_amount": round(avg_amount, 2),
                "frequency": frequency,
                "annual_cost": round(avg_amount * annual_multiplier[frequency], 2),
                "occurrences": len(group),
                "last_date": max(dates),
            })

        results.sort(key=lambda r: r["annual_cost"], reverse=True)
        return results

    def _classify_frequency(self, avg_days: float) -> str | None:
        if 5 <= avg_days <= 9:
            return "weekly"
        elif 12 <= avg_days <= 16:
            return "biweekly"
        elif 25 <= avg_days <= 35:
            return "monthly"
        elif 80 <= avg_days <= 100:
            return "quarterly"
        elif 350 <= avg_days <= 380:
            return "annual"
        return None

Step 4: Run tests

Run: pytest tests/services/test_recurring.py -v Expected: All 4 tests PASS

Step 5: Commit

git add src/services/recurring.py tests/services/test_recurring.py
git commit -m "feat: recurring charge detection service"

Task 11: Forecasting Service

Files:

  • Create: src/services/forecasting.py
  • Create: tests/services/test_forecasting.py

Step 1: Write failing tests

# tests/services/test_forecasting.py
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.forecasting import ForecastingService


def make_session():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    return Session(engine)


def make_forecast_data(session):
    seed_categories(session)
    member = HouseholdMember(name="Andrew", relationship="self")
    session.add(member)
    session.flush()
    account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
    session.add(account)
    session.flush()
    groceries = session.query(Category).filter_by(name="Groceries").one()

    # 3 months of grocery spending: $500, $600, $700 (trending up)
    for month, total in [(10, 500), (11, 600), (12, 700)]:
        session.add(Transaction(
            date=datetime.date(2025, month, 15),
            amount=-total,
            description="GROCERIES",
            account_id=account.id,
            category_id=groceries.id,
            tag="needs",
        ))
    session.commit()
    return account


def test_monthly_forecast():
    session = make_session()
    make_forecast_data(session)
    svc = ForecastingService(session)
    forecast = svc.forecast_month()
    groceries = [f for f in forecast if f["category"] == "Groceries"]
    assert len(groceries) == 1
    # Should be weighted toward recent months
    assert groceries[0]["projected"] < 0


def test_annual_forecast():
    session = make_session()
    make_forecast_data(session)
    svc = ForecastingService(session)
    forecast = svc.forecast_year()
    assert len(forecast) > 0
    total = sum(f["projected"] for f in forecast)
    assert total < 0  # We're spending money


def test_what_if_removes_recurring():
    session = make_session()
    make_forecast_data(session)
    svc = ForecastingService(session)
    base = svc.forecast_month()
    adjusted = svc.forecast_month(exclude_descriptions=["GROCERIES"])
    base_total = sum(f["projected"] for f in base)
    adj_total = sum(f["projected"] for f in adjusted)
    assert adj_total > base_total  # Less spending when we exclude groceries

Step 2: Run to verify failure

Run: pytest tests/services/test_forecasting.py -v

Step 3: Implement forecasting service

# src/services/forecasting.py
import datetime
from collections import defaultdict
from sqlalchemy import func
from sqlalchemy.orm import Session

from src.models.transaction import Transaction
from src.models.category import Category


class ForecastingService:
    def __init__(self, session: Session):
        self.session = session

    def _get_monthly_by_category(self, months_back: int = 3) -> dict[str, list[float]]:
        cutoff = datetime.date.today() - datetime.timedelta(days=months_back * 31)
        rows = (
            self.session.query(
                func.strftime("%Y-%m", Transaction.date).label("month"),
                func.coalesce(Category.name, "Uncategorized").label("category"),
                func.sum(Transaction.amount).label("total"),
            )
            .join(Category, Transaction.category_id == Category.id, isouter=True)
            .filter(Transaction.date >= cutoff)
            .filter(Transaction.is_transfer == False)
            .filter(Transaction.amount < 0)
            .group_by("month", "category")
            .all()
        )

        by_cat: dict[str, list[float]] = defaultdict(list)
        for row in rows:
            by_cat[row.category].append(float(row.total))
        return by_cat

    def _weighted_average(self, values: list[float]) -> float:
        if not values:
            return 0.0
        # More recent months get higher weight
        weights = list(range(1, len(values) + 1))
        total_weight = sum(weights)
        return sum(v * w for v, w in zip(values, weights)) / total_weight

    def forecast_month(self, exclude_descriptions: list[str] | None = None) -> list[dict]:
        by_cat = self._get_monthly_by_category()

        if exclude_descriptions:
            # Re-query excluding those descriptions
            cutoff = datetime.date.today() - datetime.timedelta(days=3 * 31)
            q = (
                self.session.query(
                    func.strftime("%Y-%m", Transaction.date).label("month"),
                    func.coalesce(Category.name, "Uncategorized").label("category"),
                    func.sum(Transaction.amount).label("total"),
                )
                .join(Category, Transaction.category_id == Category.id, isouter=True)
                .filter(Transaction.date >= cutoff)
                .filter(Transaction.is_transfer == False)
                .filter(Transaction.amount < 0)
            )
            for desc in exclude_descriptions:
                q = q.filter(~Transaction.description.contains(desc))
            rows = q.group_by("month", "category").all()
            by_cat = defaultdict(list)
            for row in rows:
                by_cat[row.category].append(float(row.total))

        results = []
        for cat, monthly_totals in by_cat.items():
            projected = self._weighted_average(monthly_totals)
            results.append({
                "category": cat,
                "projected": round(projected, 2),
                "confidence": "high" if len(monthly_totals) >= 3 else "low",
            })

        results.sort(key=lambda r: r["projected"])
        return results

    def forecast_year(self) -> list[dict]:
        monthly = self.forecast_month()
        return [
            {**f, "projected": round(f["projected"] * 12, 2)}
            for f in monthly
        ]

Step 4: Run tests

Run: pytest tests/services/test_forecasting.py -v Expected: All 3 tests PASS

Step 5: Commit

git add src/services/forecasting.py tests/services/test_forecasting.py
git commit -m "feat: forecasting service with weighted averages and what-if"

Phase 5: Desktop UI

Task 12: Main Window and Sidebar Navigation

Files:

  • Create: src/main.py
  • Create: src/ui/main_window.py
  • Create: src/ui/sidebar.py

Step 1: Implement main window with sidebar

Note: UI tasks are harder to TDD strictly. Write the UI code, then verify manually with python -m src.main.

# src/ui/sidebar.py
from PySide6.QtWidgets import QWidget, QVBoxLayout, QPushButton, QFrame
from PySide6.QtCore import Signal


class Sidebar(QFrame):
    view_changed = Signal(str)

    VIEWS = [
        ("Import", "import"),
        ("Transactions", "transactions"),
        ("Analysis", "analysis"),
        ("Recurring", "recurring"),
        ("Settings", "settings"),
    ]

    def __init__(self, parent=None):
        super().__init__(parent)
        self.setObjectName("sidebar")
        self.setFixedWidth(200)
        layout = QVBoxLayout(self)
        layout.setContentsMargins(0, 10, 0, 10)

        self._buttons: dict[str, QPushButton] = {}
        for label, key in self.VIEWS:
            btn = QPushButton(label)
            btn.setObjectName(f"sidebar-{key}")
            btn.setCheckable(True)
            btn.clicked.connect(lambda checked, k=key: self._on_click(k))
            layout.addWidget(btn)
            self._buttons[key] = btn

        layout.addStretch()
        self._buttons["import"].setChecked(True)

    def _on_click(self, key: str):
        for k, btn in self._buttons.items():
            btn.setChecked(k == key)
        self.view_changed.emit(key)
# src/ui/main_window.py
from PySide6.QtWidgets import QMainWindow, QHBoxLayout, QWidget, QStackedWidget, QLabel
from src.ui.sidebar import Sidebar


class MainWindow(QMainWindow):
    def __init__(self, session):
        super().__init__()
        self.session = session
        self.setWindowTitle("SpendingAnalysis")
        self.setMinimumSize(1200, 800)

        central = QWidget()
        self.setCentralWidget(central)
        layout = QHBoxLayout(central)
        layout.setContentsMargins(0, 0, 0, 0)
        layout.setSpacing(0)

        self.sidebar = Sidebar()
        layout.addWidget(self.sidebar)

        self.stack = QStackedWidget()
        layout.addWidget(self.stack)

        # Placeholder views — replaced in later tasks
        self._views = {}
        for label, key in Sidebar.VIEWS:
            placeholder = QLabel(f"{label} View")
            placeholder.setStyleSheet("font-size: 24px; color: #888;")
            self._views[key] = self.stack.addWidget(placeholder)

        self.sidebar.view_changed.connect(self._switch_view)

    def _switch_view(self, key: str):
        self.stack.setCurrentIndex(self._views[key])
# src/main.py
import sys
from PySide6.QtWidgets import QApplication
from src.db import get_session
from src.seed import seed_categories
from src.ui.main_window import MainWindow


def main():
    session = get_session()
    seed_categories(session)

    app = QApplication(sys.argv)
    window = MainWindow(session)
    window.show()
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

Step 2: Verify it launches

Run: python -m src.main Expected: Window opens with sidebar and placeholder views

Step 3: Commit

git add src/main.py src/ui/sidebar.py src/ui/main_window.py
git commit -m "feat: main window with sidebar navigation"

Task 13: Import Wizard View

Files:

  • Create: src/ui/import_view.py
  • Modify: src/ui/main_window.py (replace placeholder)

This is the most complex UI component. It has three stages:

  1. File selection (with drag-and-drop)
  2. Column mapping (for new sources) or auto-match confirmation
  3. Import preview and confirmation

Step 1: Implement the import view

Build src/ui/import_view.py with:

  • A QStackedWidget for the three stages
  • File drop zone that accepts CSV files
  • Column mapping form with combo boxes for each target field (date, amount, description, etc.)
  • Account selection/creation combo box
  • Preview table showing first 5 rows mapped to normalized columns
  • Import button with progress feedback
  • Results summary showing imported count, duplicates, uncategorized

Wire it into MainWindow by replacing the import placeholder.

Key signals: import_complete (emitted after successful import so other views can refresh).

Step 2: Test manually with both sample CSVs

Run: python -m src.main Test: Import Chase CSV first (should auto-detect headers), then Checking CSV (should show mapping wizard).

Step 3: Commit

git add src/ui/import_view.py src/ui/main_window.py
git commit -m "feat: import wizard view with column mapping and file drop"

Task 14: Transactions View

Files:

  • Create: src/ui/transactions_view.py
  • Modify: src/ui/main_window.py (replace placeholder)

Step 1: Implement transactions view

Build src/ui/transactions_view.py with:

  • QTableView backed by a custom QAbstractTableModel wrapping SQLAlchemy queries
  • Columns: Date, Description, Amount, Account, Category, Tag, Person
  • Filter bar at top: date range (two QDateEdit), account dropdown, person dropdown, category dropdown, "Uncategorized only" checkbox, search text field
  • Inline category editing: clicking the category cell opens a dropdown to change it
  • After category change, prompt: "Create rule for [merchant pattern]?" with Yes/No
  • Amount column formatted with colors (red for expenses, green for income)
  • Sortable by clicking column headers
  • Refresh on import completion

Step 2: Verify with imported data

Run: python -m src.main, import sample data, switch to Transactions view.

Step 3: Commit

git add src/ui/transactions_view.py src/ui/main_window.py
git commit -m "feat: transactions view with filtering and inline categorization"

Task 15: Analysis View (Charts)

Files:

  • Create: src/ui/analysis_view.py
  • Modify: src/ui/main_window.py (replace placeholder)

Step 1: Implement analysis view

Build src/ui/analysis_view.py with three tab panels:

Spending Over Time tab:

  • Matplotlib figure embedded via FigureCanvasQTAgg
  • Period toggle (day/week/month) using QButtonGroup
  • Stacked bar chart: x-axis = time periods, y-axis = spending, stacked by category
  • Filter controls matching transactions view filters

Category Breakdown tab:

  • Donut chart showing spending by category for selected date range
  • Horizontal bar chart ranking categories by total spend
  • Needs/wants/savings summary bar (three-segment horizontal bar)
  • Click donut slice to filter transactions view to that category

Forecasting tab:

  • Month-ahead stacked bar alongside last 3 actual months
  • Year-ahead projection with trend line
  • "What if" controls: checkboxes for each recurring charge, sliders for category adjustments
  • Live-updating projection as toggles change

All charts styled to match app theme (dark background, consistent category colors).

Step 2: Verify with imported data

Run: python -m src.main, import data, switch to Analysis view.

Step 3: Commit

git add src/ui/analysis_view.py src/ui/main_window.py
git commit -m "feat: analysis view with spending trends, breakdowns, and forecasting"

Task 16: Recurring Charges View

Files:

  • Create: src/ui/recurring_view.py
  • Modify: src/ui/main_window.py (replace placeholder)

Step 1: Implement recurring charges view

Build src/ui/recurring_view.py with:

  • Summary card at top: total monthly subscription burden, total annual cost
  • Table of detected recurring charges: description, typical amount, frequency, annual cost, last date, status (confirmed/pending/dismissed)
  • Confirm/Dismiss buttons per row
  • "Re-scan" button to re-run detection
  • Confirmed charges stored in a recurring_charges table (add model if needed) or as metadata on categorization rules

Step 2: Verify with imported data

Run: python -m src.main, import data, switch to Recurring view.

Step 3: Commit

git add src/ui/recurring_view.py src/ui/main_window.py
git commit -m "feat: recurring charges view with detection and confirmation"

Task 17: Settings View

Files:

  • Create: src/ui/settings_view.py
  • Modify: src/ui/main_window.py (replace placeholder)

Step 1: Implement settings view

Build src/ui/settings_view.py with tabs:

Categories tab:

  • Editable list of categories with name, default tag, icon
  • Add/delete/rename buttons

Rules tab:

  • Table of categorization rules: pattern, category, tag override, person, priority
  • Add/edit/delete with a dialog form
  • Drag to reorder priority (or up/down arrows)

Household tab:

  • List of household members with name and relationship
  • Add/edit/delete

Accounts tab:

  • List of accounts with name, institution, type, owner
  • Edit owner/shared status

CSV Mappings tab:

  • List of saved mappings with name and account
  • Delete mappings that are no longer needed

Step 2: Verify settings management

Run: python -m src.main, switch to Settings, add a rule, verify it applies on next import.

Step 3: Commit

git add src/ui/settings_view.py src/ui/main_window.py
git commit -m "feat: settings view for categories, rules, household, and accounts"

Phase 6: Polish

Task 18: Dark/Light Theming

Files:

  • Create: src/ui/themes/dark.qss
  • Create: src/ui/themes/light.qss
  • Modify: src/ui/main_window.py (theme toggle)
  • Modify: src/ui/sidebar.py (add theme toggle button at bottom)

Step 1: Create dark theme QSS

Style the sidebar, buttons, tables, charts, inputs with a modern dark palette. Key colors:

  • Background: #1e1e2e
  • Surface: #2a2a3c
  • Text: #cdd6f4
  • Accent: #89b4fa
  • Border: #45475a

Step 2: Create light theme QSS

Corresponding light palette.

Step 3: Add toggle to sidebar

Add a theme toggle button at the bottom of the sidebar. On click, swap the stylesheet and update matplotlib chart colors.

Step 4: Commit

git add src/ui/themes/ src/ui/main_window.py src/ui/sidebar.py
git commit -m "feat: dark and light theme support"

Task 19: Cross-Account Transfer Detection

Files:

  • Create: src/services/transfer_detector.py
  • Create: tests/services/test_transfer_detector.py
  • Modify: src/services/importer.py (run after import)

Step 1: Write failing tests

Test that matching transactions across accounts (same date +-2 days, same absolute amount, one positive one negative) are flagged as transfers.

Step 2: Implement transfer detector

Scan for pairs where: abs(txn_a.amount) == abs(txn_b.amount), accounts differ, dates within 2 days, and one is a known transfer pattern (e.g., "Payment Thank You", "CREDIT CRD EPAY").

Step 3: Run tests

Run: pytest tests/services/test_transfer_detector.py -v

Step 4: Commit

git add src/services/transfer_detector.py tests/services/test_transfer_detector.py src/services/importer.py
git commit -m "feat: cross-account transfer detection"

Task 20: Default Categorization Rules (Seed)

Files:

  • Modify: src/seed.py
  • Modify: tests/test_seed.py

Step 1: Add default rules based on real data patterns

Add a seed_default_rules function that creates the initial rule set from the design doc (CIMTECHNIQUES -> Income, HELLOFRESH -> Groceries, etc.). Requires household members to exist first, so only seed person-attributed rules if matching members are found.

Step 2: Run tests

Run: pytest tests/test_seed.py -v

Step 3: Commit

git add src/seed.py tests/test_seed.py
git commit -m "feat: default categorization rules from known patterns"

Task 21: Integration Test with Real Data

Files:

  • Create: tests/test_integration.py

Step 1: Write end-to-end test

# tests/test_integration.py
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from src.db import Base
from src.models import *
from src.seed import seed_categories, seed_household
from src.services.importer import ImportService
from src.services.analysis import AnalysisService
from src.services.recurring import RecurringDetector

RAWDATA = Path(__file__).parent.parent / "rawdata"


def test_full_pipeline():
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    session = Session(engine)

    seed_categories(session)
    andrew = seed_household(session, "Andrew", "self")

    # Set up accounts
    chase = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=andrew.id)
    checking = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=andrew.id, is_shared=True)
    session.add_all([chase, checking])
    session.flush()

    svc = ImportService(session)

    # Import Chase
    r1 = svc.import_csv(
        RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
        account_id=chase.id,
        column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category"},
        amount_logic="signed",
    )
    assert r1["imported"] > 100

    # Import Checking
    r2 = svc.import_csv(
        RAWDATA / "Checking1.csv",
        account_id=checking.id,
        column_map={"date": 0, "amount": 1, "description": 4},
        amount_logic="signed",
    )
    assert r2["imported"] > 50

    # Analysis works
    analysis = AnalysisService(session)
    monthly = analysis.spending_by_period("month")
    assert len(monthly) >= 1

    by_cat = analysis.spending_by_category()
    assert len(by_cat) >= 1

    # Recurring detection works
    detector = RecurringDetector(session)
    recurring = detector.detect()
    assert len(recurring) >= 0  # May or may not find patterns depending on date range

Step 2: Run test

Run: pytest tests/test_integration.py -v Expected: PASS

Step 3: Commit

git add tests/test_integration.py
git commit -m "test: end-to-end integration test with real CSV data"

Summary

Phase Tasks Focus
1: Foundation 1-3 Project setup, models, seed data
2: Import Pipeline 4-6 Normalizer, CSV reader, import service
3: Categorization 7-8 Rule engine, post-import categorization
4: Analysis 9-11 Spending analysis, recurring detection, forecasting
5: UI 12-17 Main window, import wizard, transactions, analysis charts, recurring, settings
6: Polish 18-21 Theming, transfer detection, default rules, integration test

Tasks 1-11 are fully TDD. Tasks 12-18 are UI-focused with manual verification. Tasks 19-21 return to TDD for backend features and integration testing.