Covers: project setup, database models, CSV import pipeline with description normalization, rule-based categorization engine, spending analysis/recurring detection/forecasting services, PySide6 desktop UI with import wizard/transactions/analysis/recurring/settings views, dark/light theming, and integration testing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2379 lines
71 KiB
Markdown
2379 lines
71 KiB
Markdown
# SpendingAnalysis Implementation Plan
|
|
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|
|
|
**Goal:** Build a desktop spending analysis app that imports bank/credit card CSVs, auto-categorizes transactions, and provides visual analysis of spending habits with forecasting.
|
|
|
|
**Architecture:** Three-layer (data/service/UI) Python app. SQLite via SQLAlchemy for storage, pandas for CSV/analysis, PySide6 for desktop UI, matplotlib for charts. Categorization engine behind a Protocol for future AI extensibility.
|
|
|
|
**Tech Stack:** Python 3.12+, PySide6, SQLAlchemy, pandas, matplotlib, pytest
|
|
|
|
---
|
|
|
|
## Phase 1: Foundation
|
|
|
|
### Task 1: Project Setup
|
|
|
|
**Files:**
|
|
- Create: `pyproject.toml`
|
|
- Create: `src/__init__.py`
|
|
- Create: `src/models/__init__.py`
|
|
- Create: `src/services/__init__.py`
|
|
- Create: `src/ui/__init__.py`
|
|
- Create: `tests/__init__.py`
|
|
- Create: `tests/services/__init__.py`
|
|
- Create: `tests/models/__init__.py`
|
|
|
|
**Step 1: Create pyproject.toml**
|
|
|
|
```toml
|
|
[project]
|
|
name = "spending-analysis"
|
|
version = "0.1.0"
|
|
requires-python = ">=3.12"
|
|
dependencies = [
|
|
"PySide6>=6.6",
|
|
"sqlalchemy>=2.0",
|
|
"pandas>=2.0",
|
|
"matplotlib>=3.8",
|
|
]
|
|
|
|
[project.optional-dependencies]
|
|
dev = [
|
|
"pytest>=8.0",
|
|
"pytest-qt>=4.2",
|
|
]
|
|
|
|
[project.scripts]
|
|
spending-analysis = "src.main:main"
|
|
```
|
|
|
|
**Step 2: Create directory structure**
|
|
|
|
Create all `__init__.py` files listed above (empty files).
|
|
|
|
**Step 3: Create virtual environment and install**
|
|
|
|
Run: `python -m venv .venv && .venv\Scripts\activate && pip install -e ".[dev]"`
|
|
|
|
**Step 4: Verify installation**
|
|
|
|
Run: `python -c "import PySide6; import sqlalchemy; import pandas; import matplotlib; print('OK')"`
|
|
Expected: `OK`
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add pyproject.toml src/ tests/
|
|
git commit -m "feat: project setup with dependencies"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Database Models
|
|
|
|
**Files:**
|
|
- Create: `src/db.py`
|
|
- Create: `src/models/household.py`
|
|
- Create: `src/models/account.py`
|
|
- Create: `src/models/category.py`
|
|
- Create: `src/models/transaction.py`
|
|
- Create: `src/models/rule.py`
|
|
- Create: `src/models/csv_mapping.py`
|
|
- Create: `tests/models/test_models.py`
|
|
|
|
**Step 1: Write failing test for model creation**
|
|
|
|
```python
|
|
# tests/models/test_models.py
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models.household import HouseholdMember
|
|
from src.models.account import Account
|
|
from src.models.category import Category
|
|
from src.models.transaction import Transaction
|
|
from src.models.rule import CategorizationRule
|
|
from src.models.csv_mapping import CsvMapping
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def test_create_household_member():
|
|
session = make_session()
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.commit()
|
|
assert member.id is not None
|
|
assert member.name == "Andrew"
|
|
|
|
|
|
def test_create_account_with_owner():
|
|
session = make_session()
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(
|
|
name="Chase Freedom",
|
|
institution="Chase",
|
|
account_type="credit",
|
|
owner_id=member.id,
|
|
)
|
|
session.add(account)
|
|
session.commit()
|
|
assert account.owner.name == "Andrew"
|
|
|
|
|
|
def test_create_category():
|
|
session = make_session()
|
|
cat = Category(name="Groceries", default_tag="needs")
|
|
session.add(cat)
|
|
session.commit()
|
|
assert cat.id is not None
|
|
|
|
|
|
def test_create_transaction():
|
|
session = make_session()
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id)
|
|
cat = Category(name="Groceries", default_tag="needs")
|
|
session.add_all([account, cat])
|
|
session.flush()
|
|
txn = Transaction(
|
|
date="2026-01-15",
|
|
amount=-48.52,
|
|
description="WAL-MART #7181",
|
|
raw_description="PURCHASE AUTHORIZED ON 01/14 WAL-MART #7181 BEAUFORT SC CARD 5360",
|
|
account_id=account.id,
|
|
category_id=cat.id,
|
|
attributed_to_id=member.id,
|
|
tag="needs",
|
|
)
|
|
session.add(txn)
|
|
session.commit()
|
|
assert txn.id is not None
|
|
assert txn.account.name == "Checking"
|
|
assert txn.category.name == "Groceries"
|
|
|
|
|
|
def test_create_categorization_rule():
|
|
session = make_session()
|
|
cat = Category(name="Groceries", default_tag="needs")
|
|
session.add(cat)
|
|
session.flush()
|
|
rule = CategorizationRule(
|
|
pattern="WAL-MART",
|
|
category_id=cat.id,
|
|
priority=10,
|
|
)
|
|
session.add(rule)
|
|
session.commit()
|
|
assert rule.id is not None
|
|
|
|
|
|
def test_create_csv_mapping():
|
|
session = make_session()
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
mapping = CsvMapping(
|
|
name="Chase Credit Card",
|
|
fingerprint="Transaction Date,Post Date,Description,Category,Type,Amount,Memo",
|
|
column_map='{"date": "Transaction Date", "amount": "Amount", "description": "Description"}',
|
|
amount_logic="signed",
|
|
account_id=account.id,
|
|
)
|
|
session.add(mapping)
|
|
session.commit()
|
|
assert mapping.id is not None
|
|
```
|
|
|
|
**Step 2: Run test to verify it fails**
|
|
|
|
Run: `pytest tests/models/test_models.py -v`
|
|
Expected: FAIL (imports not found)
|
|
|
|
**Step 3: Implement db.py and all models**
|
|
|
|
```python
|
|
# src/db.py
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import DeclarativeBase, Session
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
def get_engine(db_path: Path | None = None):
|
|
if db_path is None:
|
|
db_path = Path.home() / ".spending_analysis" / "spending.db"
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
return create_engine(f"sqlite:///{db_path}")
|
|
|
|
def get_session(db_path: Path | None = None) -> Session:
|
|
engine = get_engine(db_path)
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
```
|
|
|
|
```python
|
|
# src/models/household.py
|
|
from sqlalchemy import String
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from src.db import Base
|
|
|
|
class HouseholdMember(Base):
|
|
__tablename__ = "household_members"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(100))
|
|
relationship: Mapped[str] = mapped_column(String(50))
|
|
accounts: Mapped[list["Account"]] = relationship(back_populates="owner")
|
|
```
|
|
|
|
```python
|
|
# src/models/account.py
|
|
from sqlalchemy import ForeignKey, String
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from src.db import Base
|
|
|
|
class Account(Base):
|
|
__tablename__ = "accounts"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(100))
|
|
institution: Mapped[str] = mapped_column(String(100))
|
|
account_type: Mapped[str] = mapped_column(String(20)) # checking, credit, savings
|
|
owner_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
|
|
is_shared: Mapped[bool] = mapped_column(default=False)
|
|
owner: Mapped["HouseholdMember | None"] = relationship(back_populates="accounts")
|
|
```
|
|
|
|
```python
|
|
# src/models/category.py
|
|
from sqlalchemy import String
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
from src.db import Base
|
|
|
|
class Category(Base):
|
|
__tablename__ = "categories"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(100), unique=True)
|
|
default_tag: Mapped[str | None] = mapped_column(String(20)) # needs, wants, savings
|
|
icon: Mapped[str | None] = mapped_column(String(50))
|
|
```
|
|
|
|
```python
|
|
# src/models/transaction.py
|
|
import datetime
|
|
from sqlalchemy import Date, ForeignKey, Numeric, String, Text
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from src.db import Base
|
|
|
|
class Transaction(Base):
|
|
__tablename__ = "transactions"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
date: Mapped[datetime.date] = mapped_column(Date)
|
|
amount: Mapped[float] = mapped_column(Numeric(10, 2))
|
|
description: Mapped[str] = mapped_column(String(300))
|
|
raw_description: Mapped[str | None] = mapped_column(Text)
|
|
account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id"))
|
|
category_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id"))
|
|
attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
|
|
tag: Mapped[str | None] = mapped_column(String(20))
|
|
source_category: Mapped[str | None] = mapped_column(String(100))
|
|
is_transfer: Mapped[bool] = mapped_column(default=False)
|
|
transfer_pair_id: Mapped[int | None] = mapped_column(ForeignKey("transactions.id"))
|
|
account: Mapped["Account"] = relationship()
|
|
category: Mapped["Category | None"] = relationship()
|
|
attributed_to: Mapped["HouseholdMember | None"] = relationship()
|
|
```
|
|
|
|
```python
|
|
# src/models/rule.py
|
|
from sqlalchemy import ForeignKey, String
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from src.db import Base
|
|
|
|
class CategorizationRule(Base):
|
|
__tablename__ = "categorization_rules"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
pattern: Mapped[str] = mapped_column(String(300))
|
|
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id"))
|
|
tag_override: Mapped[str | None] = mapped_column(String(20))
|
|
attributed_to_id: Mapped[int | None] = mapped_column(ForeignKey("household_members.id"))
|
|
priority: Mapped[int] = mapped_column(default=0)
|
|
category: Mapped["Category"] = relationship()
|
|
attributed_to: Mapped["HouseholdMember | None"] = relationship()
|
|
```
|
|
|
|
```python
|
|
# src/models/csv_mapping.py
|
|
from sqlalchemy import ForeignKey, String, Text
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from src.db import Base
|
|
|
|
class CsvMapping(Base):
|
|
__tablename__ = "csv_mappings"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(100))
|
|
fingerprint: Mapped[str] = mapped_column(String(500))
|
|
column_map: Mapped[str] = mapped_column(Text) # JSON
|
|
amount_logic: Mapped[str] = mapped_column(String(50)) # signed, separate_columns, type_column
|
|
account_id: Mapped[int] = mapped_column(ForeignKey("accounts.id"))
|
|
account: Mapped["Account"] = relationship()
|
|
```
|
|
|
|
Update `src/models/__init__.py` to re-export all models:
|
|
```python
|
|
from src.models.household import HouseholdMember
|
|
from src.models.account import Account
|
|
from src.models.category import Category
|
|
from src.models.transaction import Transaction
|
|
from src.models.rule import CategorizationRule
|
|
from src.models.csv_mapping import CsvMapping
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/models/test_models.py -v`
|
|
Expected: All 6 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/db.py src/models/ tests/models/
|
|
git commit -m "feat: database models and connection management"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Seed Data
|
|
|
|
**Files:**
|
|
- Create: `src/seed.py`
|
|
- Create: `tests/test_seed.py`
|
|
|
|
**Step 1: Write failing test**
|
|
|
|
```python
|
|
# tests/test_seed.py
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.seed import seed_categories, seed_household, DEFAULT_CATEGORIES
|
|
from src.models import Category, HouseholdMember
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def test_seed_categories():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
cats = session.query(Category).all()
|
|
assert len(cats) == len(DEFAULT_CATEGORIES)
|
|
names = {c.name for c in cats}
|
|
assert "Groceries" in names
|
|
assert "Transfer" in names
|
|
assert "Income" in names
|
|
|
|
|
|
def test_seed_categories_idempotent():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
seed_categories(session)
|
|
cats = session.query(Category).all()
|
|
assert len(cats) == len(DEFAULT_CATEGORIES)
|
|
|
|
|
|
def test_seed_household():
|
|
session = make_session()
|
|
seed_household(session, "Andrew", "self")
|
|
members = session.query(HouseholdMember).all()
|
|
assert len(members) == 1
|
|
assert members[0].name == "Andrew"
|
|
```
|
|
|
|
**Step 2: Run test to verify it fails**
|
|
|
|
Run: `pytest tests/test_seed.py -v`
|
|
Expected: FAIL
|
|
|
|
**Step 3: Implement seed.py**
|
|
|
|
```python
|
|
# src/seed.py
|
|
from sqlalchemy.orm import Session
|
|
from src.models import Category, HouseholdMember
|
|
|
|
DEFAULT_CATEGORIES = [
|
|
("Income", None, None),
|
|
("Housing", "needs", None),
|
|
("Groceries", "needs", None),
|
|
("Dining Out", "wants", None),
|
|
("Transportation", "needs", None),
|
|
("Gas", "needs", None),
|
|
("Utilities", "needs", None),
|
|
("Insurance", "needs", None),
|
|
("Healthcare", "needs", None),
|
|
("Entertainment", "wants", None),
|
|
("Shopping", "wants", None),
|
|
("Subscriptions", "wants", None),
|
|
("Personal Care", "wants", None),
|
|
("Family", "needs", None),
|
|
("Gifts & Donations", "wants", None),
|
|
("Debt Payment", "needs", None),
|
|
("Savings", "savings", None),
|
|
("Transfer", None, None),
|
|
("Travel", "wants", None),
|
|
("Home", "needs", None),
|
|
("Professional Services", "needs", None),
|
|
("Uncategorized", None, None),
|
|
]
|
|
|
|
|
|
def seed_categories(session: Session) -> None:
|
|
existing = {c.name for c in session.query(Category).all()}
|
|
for name, tag, icon in DEFAULT_CATEGORIES:
|
|
if name not in existing:
|
|
session.add(Category(name=name, default_tag=tag, icon=icon))
|
|
session.commit()
|
|
|
|
|
|
def seed_household(session: Session, name: str, relationship: str) -> HouseholdMember:
|
|
existing = session.query(HouseholdMember).filter_by(name=name).first()
|
|
if existing:
|
|
return existing
|
|
member = HouseholdMember(name=name, relationship=relationship)
|
|
session.add(member)
|
|
session.commit()
|
|
return member
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/test_seed.py -v`
|
|
Expected: All 3 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/seed.py tests/test_seed.py
|
|
git commit -m "feat: seed data for default categories and household"
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 2: CSV Import Pipeline
|
|
|
|
### Task 4: Description Normalizer
|
|
|
|
**Files:**
|
|
- Create: `src/services/normalizer.py`
|
|
- Create: `tests/services/test_normalizer.py`
|
|
|
|
**Step 1: Write failing tests using real transaction data**
|
|
|
|
```python
|
|
# tests/services/test_normalizer.py
|
|
from src.services.normalizer import normalize_description
|
|
|
|
|
|
def test_strip_authorization_prefix():
|
|
raw = "PURCHASE AUTHORIZED ON 02/06 WALMART.COM 8009256278 BENTONVILLE AR P000000089502338 CARD 5360"
|
|
result = normalize_description(raw)
|
|
assert "AUTHORIZED ON" not in result
|
|
assert "CARD 5360" not in result
|
|
assert "WALMART.COM" in result
|
|
|
|
|
|
def test_strip_recurring_prefix():
|
|
raw = "RECURRING PAYMENT AUTHORIZED ON 02/05 HELLOFRESH 646-846-3663 NY S356036316425851 CARD 5360"
|
|
result = normalize_description(raw)
|
|
assert "AUTHORIZED ON" not in result
|
|
assert "HELLOFRESH" in result
|
|
|
|
|
|
def test_strip_reference_ids():
|
|
raw = "RECURRING TRANSFER TO CONLON A WAY2SAVE SAVINGS REF #OP0WS99NKQ XXXXXX6065"
|
|
result = normalize_description(raw)
|
|
assert "REF #" not in result
|
|
assert "XXXXXX" not in result
|
|
assert "WAY2SAVE SAVINGS" in result
|
|
|
|
|
|
def test_strip_card_number():
|
|
raw = "PURCHASE AUTHORIZED ON 01/08 MRS B COMPANY LLC PORT ROYAL SC S386008692282379 CARD 5360"
|
|
result = normalize_description(raw)
|
|
assert "CARD 5360" not in result
|
|
assert "MRS B COMPANY LLC" in result
|
|
|
|
|
|
def test_strip_transaction_codes():
|
|
raw = "OASISBATCH PAYROLL 260109 MP027126352 DONNA CONLON"
|
|
result = normalize_description(raw)
|
|
assert "OASISBATCH PAYROLL" in result
|
|
assert "DONNA CONLON" in result
|
|
|
|
|
|
def test_clean_chase_description():
|
|
"""Chase descriptions are already clean, should pass through mostly unchanged."""
|
|
raw = "PUBLIX #1716"
|
|
result = normalize_description(raw)
|
|
assert result == "PUBLIX #1716"
|
|
|
|
|
|
def test_check_number():
|
|
raw = "CHECK # 104"
|
|
result = normalize_description(raw)
|
|
assert "CHECK" in result
|
|
|
|
|
|
def test_strip_html_entities():
|
|
raw = "TST*PONCHOS TACOS & BEE"
|
|
result = normalize_description(raw)
|
|
assert "&" not in result
|
|
assert "&" in result
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_normalizer.py -v`
|
|
|
|
**Step 3: Implement normalizer**
|
|
|
|
```python
|
|
# src/services/normalizer.py
|
|
import re
|
|
import html
|
|
|
|
|
|
def normalize_description(raw: str) -> str:
|
|
desc = html.unescape(raw.strip())
|
|
|
|
# Strip "PURCHASE AUTHORIZED ON MM/DD" or "RECURRING PAYMENT AUTHORIZED ON MM/DD"
|
|
desc = re.sub(r"^(PURCHASE|RECURRING PAYMENT|RECURRING TRANSFER)\s+AUTHORIZED ON \d{2}/\d{2}\s+", "", desc)
|
|
|
|
# Strip "CARD XXXX" at end
|
|
desc = re.sub(r"\s+CARD\s+\d{4}\s*$", "", desc)
|
|
|
|
# Strip long alphanumeric transaction/reference codes (10+ chars, mixed letters/digits)
|
|
desc = re.sub(r"\s+[A-Z0-9]{10,}\b", "", desc)
|
|
|
|
# Strip "REF #..." references
|
|
desc = re.sub(r"\s+REF\s+#\S+", "", desc)
|
|
|
|
# Strip masked account numbers
|
|
desc = re.sub(r"\s+X{4,}\d+", "", desc)
|
|
|
|
# Strip phone numbers
|
|
desc = re.sub(r"\s+\d{3}-\d{3}-\d{4}", "", desc)
|
|
|
|
# Strip trailing state abbreviations + zip that follow addresses
|
|
# But keep them if they're part of merchant name (be conservative)
|
|
desc = re.sub(r"\s+[A-Z]{2}\s+\d{5}(?:-\d{4})?\s*$", "", desc)
|
|
|
|
# Clean up multiple spaces
|
|
desc = re.sub(r"\s{2,}", " ", desc).strip()
|
|
|
|
return desc
|
|
```
|
|
|
|
**Step 4: Run tests, iterate on regex until all pass**
|
|
|
|
Run: `pytest tests/services/test_normalizer.py -v`
|
|
Expected: All 8 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/normalizer.py tests/services/test_normalizer.py
|
|
git commit -m "feat: transaction description normalizer"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: CSV Reader and Format Detection
|
|
|
|
**Files:**
|
|
- Create: `src/services/csv_reader.py`
|
|
- Create: `tests/services/test_csv_reader.py`
|
|
- Reference: `rawdata/Chase0372_Activity20260101_20260210_20260210.CSV`
|
|
- Reference: `rawdata/Checking1.csv`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_csv_reader.py
|
|
from pathlib import Path
|
|
from src.services.csv_reader import read_csv, detect_format
|
|
|
|
RAWDATA = Path(__file__).parent.parent.parent / "rawdata"
|
|
|
|
|
|
def test_detect_chase_format():
|
|
result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
|
|
assert result["has_headers"] is True
|
|
assert "Transaction Date" in result["headers"]
|
|
assert result["delimiter"] == ","
|
|
assert result["row_count"] > 0
|
|
|
|
|
|
def test_detect_checking_format():
|
|
result = detect_format(RAWDATA / "Checking1.csv")
|
|
assert result["has_headers"] is False
|
|
assert result["column_count"] == 5
|
|
assert result["row_count"] > 0
|
|
|
|
|
|
def test_read_chase_csv():
|
|
rows = read_csv(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
|
|
assert len(rows) > 100
|
|
first = rows[0]
|
|
assert "Transaction Date" in first
|
|
assert "Amount" in first
|
|
assert "Description" in first
|
|
|
|
|
|
def test_read_headerless_csv():
|
|
rows = read_csv(RAWDATA / "Checking1.csv")
|
|
assert len(rows) > 50
|
|
first = rows[0]
|
|
# Headerless: keys should be column indices
|
|
assert 0 in first or "0" in first
|
|
|
|
|
|
def test_preview_rows():
|
|
result = detect_format(RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV")
|
|
assert "preview" in result
|
|
assert len(result["preview"]) <= 5
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_csv_reader.py -v`
|
|
|
|
**Step 3: Implement csv_reader.py**
|
|
|
|
```python
|
|
# src/services/csv_reader.py
|
|
import csv
|
|
from pathlib import Path
|
|
|
|
|
|
def detect_format(file_path: Path) -> dict:
|
|
with open(file_path, "r", encoding="utf-8-sig") as f:
|
|
sample = f.read(8192)
|
|
|
|
dialect = csv.Sniffer().sniff(sample)
|
|
has_headers = csv.Sniffer().has_header(sample)
|
|
|
|
with open(file_path, "r", encoding="utf-8-sig") as f:
|
|
reader = csv.reader(f, dialect)
|
|
all_rows = list(reader)
|
|
|
|
# Filter empty rows
|
|
all_rows = [r for r in all_rows if any(cell.strip() for cell in r)]
|
|
|
|
result = {
|
|
"delimiter": dialect.delimiter,
|
|
"has_headers": has_headers,
|
|
"file_path": str(file_path),
|
|
}
|
|
|
|
if has_headers:
|
|
result["headers"] = all_rows[0]
|
|
data_rows = all_rows[1:]
|
|
else:
|
|
result["column_count"] = len(all_rows[0]) if all_rows else 0
|
|
data_rows = all_rows
|
|
|
|
result["row_count"] = len(data_rows)
|
|
result["preview"] = data_rows[:5]
|
|
|
|
return result
|
|
|
|
|
|
def read_csv(file_path: Path) -> list[dict]:
|
|
with open(file_path, "r", encoding="utf-8-sig") as f:
|
|
sample = f.read(8192)
|
|
|
|
dialect = csv.Sniffer().sniff(sample)
|
|
has_headers = csv.Sniffer().has_header(sample)
|
|
|
|
with open(file_path, "r", encoding="utf-8-sig") as f:
|
|
if has_headers:
|
|
reader = csv.DictReader(f, dialect=dialect)
|
|
return [dict(row) for row in reader if any(v.strip() for v in row.values())]
|
|
else:
|
|
reader = csv.reader(f, dialect)
|
|
rows = []
|
|
for row in reader:
|
|
if any(cell.strip() for cell in row):
|
|
rows.append({i: cell for i, cell in enumerate(row)})
|
|
return rows
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_csv_reader.py -v`
|
|
Expected: All 5 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/csv_reader.py tests/services/test_csv_reader.py
|
|
git commit -m "feat: CSV reader with format auto-detection"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: Import Service
|
|
|
|
**Files:**
|
|
- Create: `src/services/importer.py`
|
|
- Create: `tests/services/test_importer.py`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_importer.py
|
|
import datetime
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories
|
|
from src.services.importer import ImportService
|
|
|
|
RAWDATA = Path(__file__).parent.parent.parent / "rawdata"
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def setup_chase_account(session):
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
seed_categories(session)
|
|
return account
|
|
|
|
|
|
def setup_checking_account(session):
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id, is_shared=True)
|
|
session.add(account)
|
|
session.flush()
|
|
seed_categories(session)
|
|
return account
|
|
|
|
|
|
def test_import_chase_csv():
|
|
session = make_session()
|
|
account = setup_chase_account(session)
|
|
column_map = {
|
|
"date": "Transaction Date",
|
|
"amount": "Amount",
|
|
"description": "Description",
|
|
"source_category": "Category",
|
|
}
|
|
svc = ImportService(session)
|
|
result = svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=account.id,
|
|
column_map=column_map,
|
|
amount_logic="signed",
|
|
)
|
|
assert result["imported"] > 0
|
|
assert result["duplicates"] == 0
|
|
txns = session.query(Transaction).all()
|
|
assert len(txns) == result["imported"]
|
|
# Check a known transaction
|
|
sephora = [t for t in txns if "SEPHORA" in t.description]
|
|
assert len(sephora) == 1
|
|
assert sephora[0].amount == -75.00
|
|
|
|
|
|
def test_import_checking_csv():
|
|
session = make_session()
|
|
account = setup_checking_account(session)
|
|
column_map = {
|
|
"date": 0,
|
|
"amount": 1,
|
|
"description": 4,
|
|
}
|
|
svc = ImportService(session)
|
|
result = svc.import_csv(
|
|
RAWDATA / "Checking1.csv",
|
|
account_id=account.id,
|
|
column_map=column_map,
|
|
amount_logic="signed",
|
|
)
|
|
assert result["imported"] > 0
|
|
txns = session.query(Transaction).all()
|
|
assert len(txns) > 50
|
|
|
|
|
|
def test_duplicate_detection():
|
|
session = make_session()
|
|
account = setup_chase_account(session)
|
|
column_map = {
|
|
"date": "Transaction Date",
|
|
"amount": "Amount",
|
|
"description": "Description",
|
|
}
|
|
svc = ImportService(session)
|
|
result1 = svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=account.id,
|
|
column_map=column_map,
|
|
amount_logic="signed",
|
|
)
|
|
result2 = svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=account.id,
|
|
column_map=column_map,
|
|
amount_logic="signed",
|
|
)
|
|
assert result2["duplicates"] == result1["imported"]
|
|
assert result2["imported"] == 0
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_importer.py -v`
|
|
|
|
**Step 3: Implement import service**
|
|
|
|
```python
|
|
# src/services/importer.py
|
|
import datetime
|
|
from pathlib import Path
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.models.transaction import Transaction
|
|
from src.services.csv_reader import read_csv
|
|
from src.services.normalizer import normalize_description
|
|
|
|
|
|
class ImportService:
|
|
def __init__(self, session: Session):
|
|
self.session = session
|
|
|
|
def import_csv(
|
|
self,
|
|
file_path: Path,
|
|
account_id: int,
|
|
column_map: dict,
|
|
amount_logic: str = "signed",
|
|
) -> dict:
|
|
rows = read_csv(file_path)
|
|
imported = 0
|
|
duplicates = 0
|
|
|
|
for row in rows:
|
|
date_val = self._parse_date(row[column_map["date"]])
|
|
amount_val = self._parse_amount(row, column_map, amount_logic)
|
|
raw_desc = row[column_map["description"]].strip()
|
|
description = normalize_description(raw_desc)
|
|
source_cat = row.get(column_map.get("source_category", "__missing__"), "").strip() or None
|
|
|
|
if date_val is None or amount_val is None:
|
|
continue
|
|
|
|
# Duplicate check
|
|
existing = (
|
|
self.session.query(Transaction)
|
|
.filter_by(date=date_val, amount=amount_val, raw_description=raw_desc, account_id=account_id)
|
|
.first()
|
|
)
|
|
if existing:
|
|
duplicates += 1
|
|
continue
|
|
|
|
txn = Transaction(
|
|
date=date_val,
|
|
amount=amount_val,
|
|
description=description,
|
|
raw_description=raw_desc,
|
|
account_id=account_id,
|
|
source_category=source_cat,
|
|
)
|
|
self.session.add(txn)
|
|
imported += 1
|
|
|
|
self.session.commit()
|
|
return {"imported": imported, "duplicates": duplicates, "total_rows": len(rows)}
|
|
|
|
def _parse_date(self, val: str) -> datetime.date | None:
|
|
val = val.strip().strip('"')
|
|
for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m-%d-%Y"):
|
|
try:
|
|
return datetime.datetime.strptime(val, fmt).date()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
def _parse_amount(self, row: dict, column_map: dict, logic: str) -> float | None:
|
|
try:
|
|
if logic == "signed":
|
|
return float(row[column_map["amount"]].strip().strip('"').replace(",", ""))
|
|
# Add other logic types as needed
|
|
except (ValueError, KeyError):
|
|
return None
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_importer.py -v`
|
|
Expected: All 3 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/importer.py tests/services/test_importer.py
|
|
git commit -m "feat: CSV import service with duplicate detection"
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 3: Categorization
|
|
|
|
### Task 7: Categorization Engine
|
|
|
|
**Files:**
|
|
- Create: `src/services/categorizer.py`
|
|
- Create: `tests/services/test_categorizer.py`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_categorizer.py
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories
|
|
from src.services.categorizer import RuleBasedCategorizer, Categorizer
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def test_categorizer_protocol():
|
|
"""RuleBasedCategorizer implements Categorizer protocol."""
|
|
session = make_session()
|
|
cat = RuleBasedCategorizer(session)
|
|
assert isinstance(cat, Categorizer)
|
|
|
|
|
|
def test_match_simple_pattern():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10)
|
|
session.add(rule)
|
|
session.commit()
|
|
|
|
cat = RuleBasedCategorizer(session)
|
|
txn = Transaction(date="2026-01-15", amount=-44.90, description="PUBLIX #1716", account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result is not None
|
|
assert result.category_id == groceries.id
|
|
|
|
|
|
def test_match_pipe_separated_pattern():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
rule = CategorizationRule(pattern="PUBLIX|ALDI|PIGGLY WIGGLY", category_id=groceries.id, priority=10)
|
|
session.add(rule)
|
|
session.commit()
|
|
|
|
cat = RuleBasedCategorizer(session)
|
|
for desc in ["PUBLIX #1716", "ALDI 76180 BEAUFORT", "PIGGLY WIGGLY #286"]:
|
|
txn = Transaction(date="2026-01-15", amount=-20.00, description=desc, account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result is not None, f"Failed to match: {desc}"
|
|
assert result.category_id == groceries.id
|
|
|
|
|
|
def test_no_match_returns_none():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
cat = RuleBasedCategorizer(session)
|
|
txn = Transaction(date="2026-01-15", amount=-10.00, description="UNKNOWN MERCHANT", account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result is None
|
|
|
|
|
|
def test_priority_ordering():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
shopping = session.query(Category).filter_by(name="Shopping").one()
|
|
# Lower priority number = higher priority
|
|
rule1 = CategorizationRule(pattern="WAL-MART", category_id=groceries.id, priority=1)
|
|
rule2 = CategorizationRule(pattern="WAL", category_id=shopping.id, priority=10)
|
|
session.add_all([rule1, rule2])
|
|
session.commit()
|
|
|
|
cat = RuleBasedCategorizer(session)
|
|
txn = Transaction(date="2026-01-15", amount=-48.52, description="WAL-MART #7181", account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result.category_id == groceries.id
|
|
|
|
|
|
def test_tag_override():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
dining = session.query(Category).filter_by(name="Dining Out").one()
|
|
rule = CategorizationRule(pattern="CHICK-FIL-A", category_id=dining.id, tag_override="needs", priority=10)
|
|
session.add(rule)
|
|
session.commit()
|
|
|
|
cat = RuleBasedCategorizer(session)
|
|
txn = Transaction(date="2026-01-15", amount=-5.39, description="CHICK-FIL-A #01476", account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result.tag == "needs" # Override from category default "wants"
|
|
|
|
|
|
def test_person_attribution():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
donna = HouseholdMember(name="Donna", relationship="wife")
|
|
session.add(donna)
|
|
session.flush()
|
|
income = session.query(Category).filter_by(name="Income").one()
|
|
rule = CategorizationRule(pattern="OASISBATCH PAYROLL", category_id=income.id, attributed_to_id=donna.id, priority=10)
|
|
session.add(rule)
|
|
session.commit()
|
|
|
|
cat = RuleBasedCategorizer(session)
|
|
txn = Transaction(date="2026-01-09", amount=1087.83, description="OASISBATCH PAYROLL 260109 DONNA CONLON", account_id=1)
|
|
result = cat.categorize(txn)
|
|
assert result.category_id == income.id
|
|
assert result.attributed_to_id == donna.id
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_categorizer.py -v`
|
|
|
|
**Step 3: Implement categorizer**
|
|
|
|
```python
|
|
# src/services/categorizer.py
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import Protocol, runtime_checkable
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.models.rule import CategorizationRule
|
|
from src.models.transaction import Transaction
|
|
|
|
|
|
@dataclass
|
|
class CategoryResult:
|
|
category_id: int
|
|
tag: str | None = None
|
|
attributed_to_id: int | None = None
|
|
|
|
|
|
@runtime_checkable
|
|
class Categorizer(Protocol):
|
|
def categorize(self, transaction: Transaction) -> CategoryResult | None: ...
|
|
|
|
|
|
class RuleBasedCategorizer:
|
|
def __init__(self, session: Session):
|
|
self.session = session
|
|
self._rules: list[CategorizationRule] | None = None
|
|
|
|
def _load_rules(self) -> list[CategorizationRule]:
|
|
if self._rules is None:
|
|
self._rules = (
|
|
self.session.query(CategorizationRule)
|
|
.order_by(CategorizationRule.priority)
|
|
.all()
|
|
)
|
|
return self._rules
|
|
|
|
def invalidate_cache(self) -> None:
|
|
self._rules = None
|
|
|
|
def categorize(self, transaction: Transaction) -> CategoryResult | None:
|
|
rules = self._load_rules()
|
|
desc = transaction.description.upper()
|
|
|
|
for rule in rules:
|
|
# Support pipe-separated patterns
|
|
patterns = [p.strip() for p in rule.pattern.split("|")]
|
|
for pattern in patterns:
|
|
if re.search(re.escape(pattern.upper()), desc):
|
|
tag = rule.tag_override
|
|
if tag is None and rule.category:
|
|
tag = rule.category.default_tag
|
|
return CategoryResult(
|
|
category_id=rule.category_id,
|
|
tag=tag,
|
|
attributed_to_id=rule.attributed_to_id,
|
|
)
|
|
|
|
return None
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_categorizer.py -v`
|
|
Expected: All 7 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/categorizer.py tests/services/test_categorizer.py
|
|
git commit -m "feat: rule-based categorization engine with protocol"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: Post-Import Categorization Integration
|
|
|
|
**Files:**
|
|
- Modify: `src/services/importer.py`
|
|
- Create: `tests/services/test_import_categorize.py`
|
|
|
|
**Step 1: Write failing test for categorize-on-import**
|
|
|
|
```python
|
|
# tests/services/test_import_categorize.py
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories
|
|
from src.services.importer import ImportService
|
|
|
|
RAWDATA = Path(__file__).parent.parent.parent / "rawdata"
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def test_import_applies_categorization_rules():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
rule = CategorizationRule(pattern="PUBLIX", category_id=groceries.id, priority=10)
|
|
session.add(rule)
|
|
session.commit()
|
|
|
|
svc = ImportService(session)
|
|
svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=account.id,
|
|
column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"},
|
|
amount_logic="signed",
|
|
)
|
|
|
|
publix_txns = session.query(Transaction).filter(Transaction.description.contains("PUBLIX")).all()
|
|
assert len(publix_txns) > 0
|
|
for txn in publix_txns:
|
|
assert txn.category_id == groceries.id
|
|
assert txn.tag == "needs"
|
|
|
|
|
|
def test_uncategorized_transactions_have_no_category():
|
|
session = make_session()
|
|
seed_categories(session)
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
|
|
# No rules defined — everything should be uncategorized
|
|
svc = ImportService(session)
|
|
svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=account.id,
|
|
column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description"},
|
|
amount_logic="signed",
|
|
)
|
|
|
|
uncategorized = session.query(Transaction).filter(Transaction.category_id.is_(None)).count()
|
|
total = session.query(Transaction).count()
|
|
assert uncategorized == total
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_import_categorize.py -v`
|
|
|
|
**Step 3: Update ImportService to run categorization after import**
|
|
|
|
Add to `ImportService.import_csv()`, after the main import loop but before `session.commit()`:
|
|
|
|
```python
|
|
from src.services.categorizer import RuleBasedCategorizer
|
|
|
|
# In import_csv, after building all transactions:
|
|
categorizer = RuleBasedCategorizer(self.session)
|
|
for txn in self.session.new:
|
|
if isinstance(txn, Transaction) and txn.category_id is None:
|
|
result = categorizer.categorize(txn)
|
|
if result:
|
|
txn.category_id = result.category_id
|
|
txn.tag = result.tag
|
|
txn.attributed_to_id = result.attributed_to_id
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_import_categorize.py -v`
|
|
Expected: All 2 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/importer.py tests/services/test_import_categorize.py
|
|
git commit -m "feat: auto-categorize transactions on import"
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 4: Analysis Services
|
|
|
|
### Task 9: Spending Analysis Service
|
|
|
|
**Files:**
|
|
- Create: `src/services/analysis.py`
|
|
- Create: `tests/services/test_analysis.py`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_analysis.py
|
|
import datetime
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories
|
|
from src.services.analysis import AnalysisService
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def make_test_data(session):
|
|
seed_categories(session)
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
dining = session.query(Category).filter_by(name="Dining Out").one()
|
|
|
|
txns = [
|
|
Transaction(date=datetime.date(2026, 1, 5), amount=-50.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"),
|
|
Transaction(date=datetime.date(2026, 1, 12), amount=-30.0, description="ALDI", account_id=account.id, category_id=groceries.id, tag="needs"),
|
|
Transaction(date=datetime.date(2026, 1, 20), amount=-15.0, description="CHICK-FIL-A", account_id=account.id, category_id=dining.id, tag="wants"),
|
|
Transaction(date=datetime.date(2026, 2, 3), amount=-60.0, description="PUBLIX", account_id=account.id, category_id=groceries.id, tag="needs"),
|
|
Transaction(date=datetime.date(2026, 2, 7), amount=-25.0, description="KFC", account_id=account.id, category_id=dining.id, tag="wants"),
|
|
]
|
|
session.add_all(txns)
|
|
session.commit()
|
|
return account, member
|
|
|
|
|
|
def test_spending_by_month():
|
|
session = make_session()
|
|
make_test_data(session)
|
|
svc = AnalysisService(session)
|
|
result = svc.spending_by_period("month")
|
|
assert len(result) == 2 # Jan and Feb
|
|
jan = [r for r in result if r["period"] == "2026-01"][0]
|
|
assert jan["total"] == -95.0
|
|
|
|
|
|
def test_spending_by_category():
|
|
session = make_session()
|
|
make_test_data(session)
|
|
svc = AnalysisService(session)
|
|
result = svc.spending_by_category(
|
|
start=datetime.date(2026, 1, 1),
|
|
end=datetime.date(2026, 2, 28),
|
|
)
|
|
groceries_row = [r for r in result if r["category"] == "Groceries"][0]
|
|
assert groceries_row["total"] == -140.0
|
|
|
|
|
|
def test_spending_by_tag():
|
|
session = make_session()
|
|
make_test_data(session)
|
|
svc = AnalysisService(session)
|
|
result = svc.spending_by_tag(
|
|
start=datetime.date(2026, 1, 1),
|
|
end=datetime.date(2026, 2, 28),
|
|
)
|
|
needs = [r for r in result if r["tag"] == "needs"][0]
|
|
wants = [r for r in result if r["tag"] == "wants"][0]
|
|
assert needs["total"] == -140.0
|
|
assert wants["total"] == -40.0
|
|
|
|
|
|
def test_spending_filtered_by_person():
|
|
session = make_session()
|
|
account, member = make_test_data(session)
|
|
# Attribute some transactions to Andrew
|
|
txns = session.query(Transaction).all()
|
|
for t in txns:
|
|
t.attributed_to_id = member.id
|
|
session.commit()
|
|
|
|
svc = AnalysisService(session)
|
|
result = svc.spending_by_category(
|
|
start=datetime.date(2026, 1, 1),
|
|
end=datetime.date(2026, 2, 28),
|
|
person_id=member.id,
|
|
)
|
|
total = sum(r["total"] for r in result)
|
|
assert total == -180.0
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_analysis.py -v`
|
|
|
|
**Step 3: Implement analysis service**
|
|
|
|
```python
|
|
# src/services/analysis.py
|
|
import datetime
|
|
from sqlalchemy import func, extract
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.models.transaction import Transaction
|
|
from src.models.category import Category
|
|
|
|
|
|
class AnalysisService:
|
|
def __init__(self, session: Session):
|
|
self.session = session
|
|
|
|
def _base_query(self, start=None, end=None, person_id=None, account_id=None, category_id=None):
|
|
q = self.session.query(Transaction).filter(Transaction.is_transfer == False)
|
|
if start:
|
|
q = q.filter(Transaction.date >= start)
|
|
if end:
|
|
q = q.filter(Transaction.date <= end)
|
|
if person_id:
|
|
q = q.filter(Transaction.attributed_to_id == person_id)
|
|
if account_id:
|
|
q = q.filter(Transaction.account_id == account_id)
|
|
if category_id:
|
|
q = q.filter(Transaction.category_id == category_id)
|
|
return q
|
|
|
|
def spending_by_period(self, period: str = "month", **filters) -> list[dict]:
|
|
q = self._base_query(**filters)
|
|
if period == "month":
|
|
q = (
|
|
q.with_entities(
|
|
func.strftime("%Y-%m", Transaction.date).label("period"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.group_by("period")
|
|
.order_by("period")
|
|
)
|
|
elif period == "week":
|
|
q = (
|
|
q.with_entities(
|
|
func.strftime("%Y-W%W", Transaction.date).label("period"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.group_by("period")
|
|
.order_by("period")
|
|
)
|
|
elif period == "day":
|
|
q = (
|
|
q.with_entities(
|
|
func.strftime("%Y-%m-%d", Transaction.date).label("period"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.group_by("period")
|
|
.order_by("period")
|
|
)
|
|
return [{"period": r.period, "total": float(r.total)} for r in q.all()]
|
|
|
|
def spending_by_category(self, start=None, end=None, **filters) -> list[dict]:
|
|
q = self._base_query(start=start, end=end, **filters)
|
|
q = (
|
|
q.join(Category, Transaction.category_id == Category.id, isouter=True)
|
|
.with_entities(
|
|
func.coalesce(Category.name, "Uncategorized").label("category"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
func.count(Transaction.id).label("count"),
|
|
)
|
|
.group_by("category")
|
|
.order_by(func.sum(Transaction.amount))
|
|
)
|
|
return [{"category": r.category, "total": float(r.total), "count": r.count} for r in q.all()]
|
|
|
|
def spending_by_tag(self, start=None, end=None, **filters) -> list[dict]:
|
|
q = self._base_query(start=start, end=end, **filters)
|
|
q = (
|
|
q.with_entities(
|
|
func.coalesce(Transaction.tag, "untagged").label("tag"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.group_by("tag")
|
|
.order_by(func.sum(Transaction.amount))
|
|
)
|
|
return [{"tag": r.tag, "total": float(r.total)} for r in q.all()]
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_analysis.py -v`
|
|
Expected: All 4 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/analysis.py tests/services/test_analysis.py
|
|
git commit -m "feat: spending analysis service with period, category, and tag breakdowns"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 10: Recurring Charge Detection
|
|
|
|
**Files:**
|
|
- Create: `src/services/recurring.py`
|
|
- Create: `tests/services/test_recurring.py`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_recurring.py
|
|
import datetime
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.services.recurring import RecurringDetector
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def make_recurring_data(session):
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Checking", institution="WF", account_type="checking", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
|
|
# Netflix monthly: ~$19.07 on 3rd/4th of month
|
|
for month in [1, 2]:
|
|
session.add(Transaction(
|
|
date=datetime.date(2026, month, 4),
|
|
amount=-19.07, description="Netflix.com", account_id=account.id,
|
|
))
|
|
|
|
# HelloFresh weekly: ~$142.87
|
|
for day in [1, 8, 15, 22, 29]:
|
|
session.add(Transaction(
|
|
date=datetime.date(2026, 1, day),
|
|
amount=-142.87, description="HELLOFRESH", account_id=account.id,
|
|
))
|
|
|
|
# Random one-off purchase
|
|
session.add(Transaction(
|
|
date=datetime.date(2026, 1, 10),
|
|
amount=-95.38, description="CARL'S GOLFLAND INC", account_id=account.id,
|
|
))
|
|
|
|
session.commit()
|
|
return account
|
|
|
|
|
|
def test_detect_monthly_recurring():
|
|
session = make_session()
|
|
make_recurring_data(session)
|
|
detector = RecurringDetector(session)
|
|
results = detector.detect()
|
|
netflix = [r for r in results if "Netflix" in r["description"]]
|
|
assert len(netflix) == 1
|
|
assert netflix[0]["frequency"] == "monthly"
|
|
assert abs(netflix[0]["typical_amount"] - 19.07) < 0.01
|
|
|
|
|
|
def test_detect_weekly_recurring():
|
|
session = make_session()
|
|
make_recurring_data(session)
|
|
detector = RecurringDetector(session)
|
|
results = detector.detect()
|
|
hello = [r for r in results if "HELLOFRESH" in r["description"]]
|
|
assert len(hello) == 1
|
|
assert hello[0]["frequency"] == "weekly"
|
|
|
|
|
|
def test_one_off_not_detected():
|
|
session = make_session()
|
|
make_recurring_data(session)
|
|
detector = RecurringDetector(session)
|
|
results = detector.detect()
|
|
golf = [r for r in results if "GOLFLAND" in r["description"]]
|
|
assert len(golf) == 0
|
|
|
|
|
|
def test_annual_cost_calculation():
|
|
session = make_session()
|
|
make_recurring_data(session)
|
|
detector = RecurringDetector(session)
|
|
results = detector.detect()
|
|
netflix = [r for r in results if "Netflix" in r["description"]][0]
|
|
assert abs(netflix["annual_cost"] - 19.07 * 12) < 1.0
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_recurring.py -v`
|
|
|
|
**Step 3: Implement recurring detector**
|
|
|
|
```python
|
|
# src/services/recurring.py
|
|
from collections import defaultdict
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.models.transaction import Transaction
|
|
|
|
|
|
class RecurringDetector:
|
|
def __init__(self, session: Session):
|
|
self.session = session
|
|
|
|
def detect(self, amount_tolerance: float = 0.10) -> list[dict]:
|
|
txns = (
|
|
self.session.query(Transaction)
|
|
.filter(Transaction.amount < 0)
|
|
.filter(Transaction.is_transfer == False)
|
|
.order_by(Transaction.date)
|
|
.all()
|
|
)
|
|
|
|
# Group by description (normalized)
|
|
groups: dict[str, list[Transaction]] = defaultdict(list)
|
|
for txn in txns:
|
|
groups[txn.description].append(txn)
|
|
|
|
results = []
|
|
for desc, group in groups.items():
|
|
if len(group) < 2:
|
|
continue
|
|
|
|
amounts = [abs(t.amount) for t in group]
|
|
avg_amount = sum(amounts) / len(amounts)
|
|
|
|
# Check amount consistency
|
|
if any(abs(a - avg_amount) / avg_amount > amount_tolerance for a in amounts):
|
|
continue
|
|
|
|
# Detect frequency from intervals
|
|
dates = sorted(t.date for t in group)
|
|
intervals = [(dates[i + 1] - dates[i]).days for i in range(len(dates) - 1)]
|
|
avg_interval = sum(intervals) / len(intervals)
|
|
|
|
frequency = self._classify_frequency(avg_interval)
|
|
if frequency is None:
|
|
continue
|
|
|
|
annual_multiplier = {"weekly": 52, "biweekly": 26, "monthly": 12, "quarterly": 4, "annual": 1}
|
|
|
|
results.append({
|
|
"description": desc,
|
|
"typical_amount": round(avg_amount, 2),
|
|
"frequency": frequency,
|
|
"annual_cost": round(avg_amount * annual_multiplier[frequency], 2),
|
|
"occurrences": len(group),
|
|
"last_date": max(dates),
|
|
})
|
|
|
|
results.sort(key=lambda r: r["annual_cost"], reverse=True)
|
|
return results
|
|
|
|
def _classify_frequency(self, avg_days: float) -> str | None:
|
|
if 5 <= avg_days <= 9:
|
|
return "weekly"
|
|
elif 12 <= avg_days <= 16:
|
|
return "biweekly"
|
|
elif 25 <= avg_days <= 35:
|
|
return "monthly"
|
|
elif 80 <= avg_days <= 100:
|
|
return "quarterly"
|
|
elif 350 <= avg_days <= 380:
|
|
return "annual"
|
|
return None
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_recurring.py -v`
|
|
Expected: All 4 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/recurring.py tests/services/test_recurring.py
|
|
git commit -m "feat: recurring charge detection service"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 11: Forecasting Service
|
|
|
|
**Files:**
|
|
- Create: `src/services/forecasting.py`
|
|
- Create: `tests/services/test_forecasting.py`
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
```python
|
|
# tests/services/test_forecasting.py
|
|
import datetime
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories
|
|
from src.services.forecasting import ForecastingService
|
|
|
|
|
|
def make_session():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
return Session(engine)
|
|
|
|
|
|
def make_forecast_data(session):
|
|
seed_categories(session)
|
|
member = HouseholdMember(name="Andrew", relationship="self")
|
|
session.add(member)
|
|
session.flush()
|
|
account = Account(name="Chase", institution="Chase", account_type="credit", owner_id=member.id)
|
|
session.add(account)
|
|
session.flush()
|
|
groceries = session.query(Category).filter_by(name="Groceries").one()
|
|
|
|
# 3 months of grocery spending: $500, $600, $700 (trending up)
|
|
for month, total in [(10, 500), (11, 600), (12, 700)]:
|
|
session.add(Transaction(
|
|
date=datetime.date(2025, month, 15),
|
|
amount=-total,
|
|
description="GROCERIES",
|
|
account_id=account.id,
|
|
category_id=groceries.id,
|
|
tag="needs",
|
|
))
|
|
session.commit()
|
|
return account
|
|
|
|
|
|
def test_monthly_forecast():
|
|
session = make_session()
|
|
make_forecast_data(session)
|
|
svc = ForecastingService(session)
|
|
forecast = svc.forecast_month()
|
|
groceries = [f for f in forecast if f["category"] == "Groceries"]
|
|
assert len(groceries) == 1
|
|
# Should be weighted toward recent months
|
|
assert groceries[0]["projected"] < 0
|
|
|
|
|
|
def test_annual_forecast():
|
|
session = make_session()
|
|
make_forecast_data(session)
|
|
svc = ForecastingService(session)
|
|
forecast = svc.forecast_year()
|
|
assert len(forecast) > 0
|
|
total = sum(f["projected"] for f in forecast)
|
|
assert total < 0 # We're spending money
|
|
|
|
|
|
def test_what_if_removes_recurring():
|
|
session = make_session()
|
|
make_forecast_data(session)
|
|
svc = ForecastingService(session)
|
|
base = svc.forecast_month()
|
|
adjusted = svc.forecast_month(exclude_descriptions=["GROCERIES"])
|
|
base_total = sum(f["projected"] for f in base)
|
|
adj_total = sum(f["projected"] for f in adjusted)
|
|
assert adj_total > base_total # Less spending when we exclude groceries
|
|
```
|
|
|
|
**Step 2: Run to verify failure**
|
|
|
|
Run: `pytest tests/services/test_forecasting.py -v`
|
|
|
|
**Step 3: Implement forecasting service**
|
|
|
|
```python
|
|
# src/services/forecasting.py
|
|
import datetime
|
|
from collections import defaultdict
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.models.transaction import Transaction
|
|
from src.models.category import Category
|
|
|
|
|
|
class ForecastingService:
|
|
def __init__(self, session: Session):
|
|
self.session = session
|
|
|
|
def _get_monthly_by_category(self, months_back: int = 3) -> dict[str, list[float]]:
|
|
cutoff = datetime.date.today() - datetime.timedelta(days=months_back * 31)
|
|
rows = (
|
|
self.session.query(
|
|
func.strftime("%Y-%m", Transaction.date).label("month"),
|
|
func.coalesce(Category.name, "Uncategorized").label("category"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.join(Category, Transaction.category_id == Category.id, isouter=True)
|
|
.filter(Transaction.date >= cutoff)
|
|
.filter(Transaction.is_transfer == False)
|
|
.filter(Transaction.amount < 0)
|
|
.group_by("month", "category")
|
|
.all()
|
|
)
|
|
|
|
by_cat: dict[str, list[float]] = defaultdict(list)
|
|
for row in rows:
|
|
by_cat[row.category].append(float(row.total))
|
|
return by_cat
|
|
|
|
def _weighted_average(self, values: list[float]) -> float:
|
|
if not values:
|
|
return 0.0
|
|
# More recent months get higher weight
|
|
weights = list(range(1, len(values) + 1))
|
|
total_weight = sum(weights)
|
|
return sum(v * w for v, w in zip(values, weights)) / total_weight
|
|
|
|
def forecast_month(self, exclude_descriptions: list[str] | None = None) -> list[dict]:
|
|
by_cat = self._get_monthly_by_category()
|
|
|
|
if exclude_descriptions:
|
|
# Re-query excluding those descriptions
|
|
cutoff = datetime.date.today() - datetime.timedelta(days=3 * 31)
|
|
q = (
|
|
self.session.query(
|
|
func.strftime("%Y-%m", Transaction.date).label("month"),
|
|
func.coalesce(Category.name, "Uncategorized").label("category"),
|
|
func.sum(Transaction.amount).label("total"),
|
|
)
|
|
.join(Category, Transaction.category_id == Category.id, isouter=True)
|
|
.filter(Transaction.date >= cutoff)
|
|
.filter(Transaction.is_transfer == False)
|
|
.filter(Transaction.amount < 0)
|
|
)
|
|
for desc in exclude_descriptions:
|
|
q = q.filter(~Transaction.description.contains(desc))
|
|
rows = q.group_by("month", "category").all()
|
|
by_cat = defaultdict(list)
|
|
for row in rows:
|
|
by_cat[row.category].append(float(row.total))
|
|
|
|
results = []
|
|
for cat, monthly_totals in by_cat.items():
|
|
projected = self._weighted_average(monthly_totals)
|
|
results.append({
|
|
"category": cat,
|
|
"projected": round(projected, 2),
|
|
"confidence": "high" if len(monthly_totals) >= 3 else "low",
|
|
})
|
|
|
|
results.sort(key=lambda r: r["projected"])
|
|
return results
|
|
|
|
def forecast_year(self) -> list[dict]:
|
|
monthly = self.forecast_month()
|
|
return [
|
|
{**f, "projected": round(f["projected"] * 12, 2)}
|
|
for f in monthly
|
|
]
|
|
```
|
|
|
|
**Step 4: Run tests**
|
|
|
|
Run: `pytest tests/services/test_forecasting.py -v`
|
|
Expected: All 3 tests PASS
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/services/forecasting.py tests/services/test_forecasting.py
|
|
git commit -m "feat: forecasting service with weighted averages and what-if"
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 5: Desktop UI
|
|
|
|
### Task 12: Main Window and Sidebar Navigation
|
|
|
|
**Files:**
|
|
- Create: `src/main.py`
|
|
- Create: `src/ui/main_window.py`
|
|
- Create: `src/ui/sidebar.py`
|
|
|
|
**Step 1: Implement main window with sidebar**
|
|
|
|
Note: UI tasks are harder to TDD strictly. Write the UI code, then verify manually with `python -m src.main`.
|
|
|
|
```python
|
|
# src/ui/sidebar.py
|
|
from PySide6.QtWidgets import QWidget, QVBoxLayout, QPushButton, QFrame
|
|
from PySide6.QtCore import Signal
|
|
|
|
|
|
class Sidebar(QFrame):
|
|
view_changed = Signal(str)
|
|
|
|
VIEWS = [
|
|
("Import", "import"),
|
|
("Transactions", "transactions"),
|
|
("Analysis", "analysis"),
|
|
("Recurring", "recurring"),
|
|
("Settings", "settings"),
|
|
]
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self.setObjectName("sidebar")
|
|
self.setFixedWidth(200)
|
|
layout = QVBoxLayout(self)
|
|
layout.setContentsMargins(0, 10, 0, 10)
|
|
|
|
self._buttons: dict[str, QPushButton] = {}
|
|
for label, key in self.VIEWS:
|
|
btn = QPushButton(label)
|
|
btn.setObjectName(f"sidebar-{key}")
|
|
btn.setCheckable(True)
|
|
btn.clicked.connect(lambda checked, k=key: self._on_click(k))
|
|
layout.addWidget(btn)
|
|
self._buttons[key] = btn
|
|
|
|
layout.addStretch()
|
|
self._buttons["import"].setChecked(True)
|
|
|
|
def _on_click(self, key: str):
|
|
for k, btn in self._buttons.items():
|
|
btn.setChecked(k == key)
|
|
self.view_changed.emit(key)
|
|
```
|
|
|
|
```python
|
|
# src/ui/main_window.py
|
|
from PySide6.QtWidgets import QMainWindow, QHBoxLayout, QWidget, QStackedWidget, QLabel
|
|
from src.ui.sidebar import Sidebar
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
def __init__(self, session):
|
|
super().__init__()
|
|
self.session = session
|
|
self.setWindowTitle("SpendingAnalysis")
|
|
self.setMinimumSize(1200, 800)
|
|
|
|
central = QWidget()
|
|
self.setCentralWidget(central)
|
|
layout = QHBoxLayout(central)
|
|
layout.setContentsMargins(0, 0, 0, 0)
|
|
layout.setSpacing(0)
|
|
|
|
self.sidebar = Sidebar()
|
|
layout.addWidget(self.sidebar)
|
|
|
|
self.stack = QStackedWidget()
|
|
layout.addWidget(self.stack)
|
|
|
|
# Placeholder views — replaced in later tasks
|
|
self._views = {}
|
|
for label, key in Sidebar.VIEWS:
|
|
placeholder = QLabel(f"{label} View")
|
|
placeholder.setStyleSheet("font-size: 24px; color: #888;")
|
|
self._views[key] = self.stack.addWidget(placeholder)
|
|
|
|
self.sidebar.view_changed.connect(self._switch_view)
|
|
|
|
def _switch_view(self, key: str):
|
|
self.stack.setCurrentIndex(self._views[key])
|
|
```
|
|
|
|
```python
|
|
# src/main.py
|
|
import sys
|
|
from PySide6.QtWidgets import QApplication
|
|
from src.db import get_session
|
|
from src.seed import seed_categories
|
|
from src.ui.main_window import MainWindow
|
|
|
|
|
|
def main():
|
|
session = get_session()
|
|
seed_categories(session)
|
|
|
|
app = QApplication(sys.argv)
|
|
window = MainWindow(session)
|
|
window.show()
|
|
sys.exit(app.exec())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
```
|
|
|
|
**Step 2: Verify it launches**
|
|
|
|
Run: `python -m src.main`
|
|
Expected: Window opens with sidebar and placeholder views
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/main.py src/ui/sidebar.py src/ui/main_window.py
|
|
git commit -m "feat: main window with sidebar navigation"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 13: Import Wizard View
|
|
|
|
**Files:**
|
|
- Create: `src/ui/import_view.py`
|
|
- Modify: `src/ui/main_window.py` (replace placeholder)
|
|
|
|
This is the most complex UI component. It has three stages:
|
|
1. File selection (with drag-and-drop)
|
|
2. Column mapping (for new sources) or auto-match confirmation
|
|
3. Import preview and confirmation
|
|
|
|
**Step 1: Implement the import view**
|
|
|
|
Build `src/ui/import_view.py` with:
|
|
- A `QStackedWidget` for the three stages
|
|
- File drop zone that accepts CSV files
|
|
- Column mapping form with combo boxes for each target field (date, amount, description, etc.)
|
|
- Account selection/creation combo box
|
|
- Preview table showing first 5 rows mapped to normalized columns
|
|
- Import button with progress feedback
|
|
- Results summary showing imported count, duplicates, uncategorized
|
|
|
|
Wire it into `MainWindow` by replacing the import placeholder.
|
|
|
|
Key signals: `import_complete` (emitted after successful import so other views can refresh).
|
|
|
|
**Step 2: Test manually with both sample CSVs**
|
|
|
|
Run: `python -m src.main`
|
|
Test: Import Chase CSV first (should auto-detect headers), then Checking CSV (should show mapping wizard).
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/ui/import_view.py src/ui/main_window.py
|
|
git commit -m "feat: import wizard view with column mapping and file drop"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 14: Transactions View
|
|
|
|
**Files:**
|
|
- Create: `src/ui/transactions_view.py`
|
|
- Modify: `src/ui/main_window.py` (replace placeholder)
|
|
|
|
**Step 1: Implement transactions view**
|
|
|
|
Build `src/ui/transactions_view.py` with:
|
|
- `QTableView` backed by a custom `QAbstractTableModel` wrapping SQLAlchemy queries
|
|
- Columns: Date, Description, Amount, Account, Category, Tag, Person
|
|
- Filter bar at top: date range (two `QDateEdit`), account dropdown, person dropdown, category dropdown, "Uncategorized only" checkbox, search text field
|
|
- Inline category editing: clicking the category cell opens a dropdown to change it
|
|
- After category change, prompt: "Create rule for [merchant pattern]?" with Yes/No
|
|
- Amount column formatted with colors (red for expenses, green for income)
|
|
- Sortable by clicking column headers
|
|
- Refresh on import completion
|
|
|
|
**Step 2: Verify with imported data**
|
|
|
|
Run: `python -m src.main`, import sample data, switch to Transactions view.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/ui/transactions_view.py src/ui/main_window.py
|
|
git commit -m "feat: transactions view with filtering and inline categorization"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 15: Analysis View (Charts)
|
|
|
|
**Files:**
|
|
- Create: `src/ui/analysis_view.py`
|
|
- Modify: `src/ui/main_window.py` (replace placeholder)
|
|
|
|
**Step 1: Implement analysis view**
|
|
|
|
Build `src/ui/analysis_view.py` with three tab panels:
|
|
|
|
**Spending Over Time tab:**
|
|
- Matplotlib figure embedded via `FigureCanvasQTAgg`
|
|
- Period toggle (day/week/month) using `QButtonGroup`
|
|
- Stacked bar chart: x-axis = time periods, y-axis = spending, stacked by category
|
|
- Filter controls matching transactions view filters
|
|
|
|
**Category Breakdown tab:**
|
|
- Donut chart showing spending by category for selected date range
|
|
- Horizontal bar chart ranking categories by total spend
|
|
- Needs/wants/savings summary bar (three-segment horizontal bar)
|
|
- Click donut slice to filter transactions view to that category
|
|
|
|
**Forecasting tab:**
|
|
- Month-ahead stacked bar alongside last 3 actual months
|
|
- Year-ahead projection with trend line
|
|
- "What if" controls: checkboxes for each recurring charge, sliders for category adjustments
|
|
- Live-updating projection as toggles change
|
|
|
|
All charts styled to match app theme (dark background, consistent category colors).
|
|
|
|
**Step 2: Verify with imported data**
|
|
|
|
Run: `python -m src.main`, import data, switch to Analysis view.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/ui/analysis_view.py src/ui/main_window.py
|
|
git commit -m "feat: analysis view with spending trends, breakdowns, and forecasting"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 16: Recurring Charges View
|
|
|
|
**Files:**
|
|
- Create: `src/ui/recurring_view.py`
|
|
- Modify: `src/ui/main_window.py` (replace placeholder)
|
|
|
|
**Step 1: Implement recurring charges view**
|
|
|
|
Build `src/ui/recurring_view.py` with:
|
|
- Summary card at top: total monthly subscription burden, total annual cost
|
|
- Table of detected recurring charges: description, typical amount, frequency, annual cost, last date, status (confirmed/pending/dismissed)
|
|
- Confirm/Dismiss buttons per row
|
|
- "Re-scan" button to re-run detection
|
|
- Confirmed charges stored in a `recurring_charges` table (add model if needed) or as metadata on categorization rules
|
|
|
|
**Step 2: Verify with imported data**
|
|
|
|
Run: `python -m src.main`, import data, switch to Recurring view.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/ui/recurring_view.py src/ui/main_window.py
|
|
git commit -m "feat: recurring charges view with detection and confirmation"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 17: Settings View
|
|
|
|
**Files:**
|
|
- Create: `src/ui/settings_view.py`
|
|
- Modify: `src/ui/main_window.py` (replace placeholder)
|
|
|
|
**Step 1: Implement settings view**
|
|
|
|
Build `src/ui/settings_view.py` with tabs:
|
|
|
|
**Categories tab:**
|
|
- Editable list of categories with name, default tag, icon
|
|
- Add/delete/rename buttons
|
|
|
|
**Rules tab:**
|
|
- Table of categorization rules: pattern, category, tag override, person, priority
|
|
- Add/edit/delete with a dialog form
|
|
- Drag to reorder priority (or up/down arrows)
|
|
|
|
**Household tab:**
|
|
- List of household members with name and relationship
|
|
- Add/edit/delete
|
|
|
|
**Accounts tab:**
|
|
- List of accounts with name, institution, type, owner
|
|
- Edit owner/shared status
|
|
|
|
**CSV Mappings tab:**
|
|
- List of saved mappings with name and account
|
|
- Delete mappings that are no longer needed
|
|
|
|
**Step 2: Verify settings management**
|
|
|
|
Run: `python -m src.main`, switch to Settings, add a rule, verify it applies on next import.
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/ui/settings_view.py src/ui/main_window.py
|
|
git commit -m "feat: settings view for categories, rules, household, and accounts"
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 6: Polish
|
|
|
|
### Task 18: Dark/Light Theming
|
|
|
|
**Files:**
|
|
- Create: `src/ui/themes/dark.qss`
|
|
- Create: `src/ui/themes/light.qss`
|
|
- Modify: `src/ui/main_window.py` (theme toggle)
|
|
- Modify: `src/ui/sidebar.py` (add theme toggle button at bottom)
|
|
|
|
**Step 1: Create dark theme QSS**
|
|
|
|
Style the sidebar, buttons, tables, charts, inputs with a modern dark palette. Key colors:
|
|
- Background: `#1e1e2e`
|
|
- Surface: `#2a2a3c`
|
|
- Text: `#cdd6f4`
|
|
- Accent: `#89b4fa`
|
|
- Border: `#45475a`
|
|
|
|
**Step 2: Create light theme QSS**
|
|
|
|
Corresponding light palette.
|
|
|
|
**Step 3: Add toggle to sidebar**
|
|
|
|
Add a theme toggle button at the bottom of the sidebar. On click, swap the stylesheet and update matplotlib chart colors.
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add src/ui/themes/ src/ui/main_window.py src/ui/sidebar.py
|
|
git commit -m "feat: dark and light theme support"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 19: Cross-Account Transfer Detection
|
|
|
|
**Files:**
|
|
- Create: `src/services/transfer_detector.py`
|
|
- Create: `tests/services/test_transfer_detector.py`
|
|
- Modify: `src/services/importer.py` (run after import)
|
|
|
|
**Step 1: Write failing tests**
|
|
|
|
Test that matching transactions across accounts (same date +-2 days, same absolute amount, one positive one negative) are flagged as transfers.
|
|
|
|
**Step 2: Implement transfer detector**
|
|
|
|
Scan for pairs where: `abs(txn_a.amount) == abs(txn_b.amount)`, accounts differ, dates within 2 days, and one is a known transfer pattern (e.g., "Payment Thank You", "CREDIT CRD EPAY").
|
|
|
|
**Step 3: Run tests**
|
|
|
|
Run: `pytest tests/services/test_transfer_detector.py -v`
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add src/services/transfer_detector.py tests/services/test_transfer_detector.py src/services/importer.py
|
|
git commit -m "feat: cross-account transfer detection"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 20: Default Categorization Rules (Seed)
|
|
|
|
**Files:**
|
|
- Modify: `src/seed.py`
|
|
- Modify: `tests/test_seed.py`
|
|
|
|
**Step 1: Add default rules based on real data patterns**
|
|
|
|
Add a `seed_default_rules` function that creates the initial rule set from the design doc (CIMTECHNIQUES -> Income, HELLOFRESH -> Groceries, etc.). Requires household members to exist first, so only seed person-attributed rules if matching members are found.
|
|
|
|
**Step 2: Run tests**
|
|
|
|
Run: `pytest tests/test_seed.py -v`
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add src/seed.py tests/test_seed.py
|
|
git commit -m "feat: default categorization rules from known patterns"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 21: Integration Test with Real Data
|
|
|
|
**Files:**
|
|
- Create: `tests/test_integration.py`
|
|
|
|
**Step 1: Write end-to-end test**
|
|
|
|
```python
|
|
# tests/test_integration.py
|
|
from pathlib import Path
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session
|
|
|
|
from src.db import Base
|
|
from src.models import *
|
|
from src.seed import seed_categories, seed_household
|
|
from src.services.importer import ImportService
|
|
from src.services.analysis import AnalysisService
|
|
from src.services.recurring import RecurringDetector
|
|
|
|
RAWDATA = Path(__file__).parent.parent / "rawdata"
|
|
|
|
|
|
def test_full_pipeline():
|
|
engine = create_engine("sqlite:///:memory:")
|
|
Base.metadata.create_all(engine)
|
|
session = Session(engine)
|
|
|
|
seed_categories(session)
|
|
andrew = seed_household(session, "Andrew", "self")
|
|
|
|
# Set up accounts
|
|
chase = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=andrew.id)
|
|
checking = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=andrew.id, is_shared=True)
|
|
session.add_all([chase, checking])
|
|
session.flush()
|
|
|
|
svc = ImportService(session)
|
|
|
|
# Import Chase
|
|
r1 = svc.import_csv(
|
|
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
|
|
account_id=chase.id,
|
|
column_map={"date": "Transaction Date", "amount": "Amount", "description": "Description", "source_category": "Category"},
|
|
amount_logic="signed",
|
|
)
|
|
assert r1["imported"] > 100
|
|
|
|
# Import Checking
|
|
r2 = svc.import_csv(
|
|
RAWDATA / "Checking1.csv",
|
|
account_id=checking.id,
|
|
column_map={"date": 0, "amount": 1, "description": 4},
|
|
amount_logic="signed",
|
|
)
|
|
assert r2["imported"] > 50
|
|
|
|
# Analysis works
|
|
analysis = AnalysisService(session)
|
|
monthly = analysis.spending_by_period("month")
|
|
assert len(monthly) >= 1
|
|
|
|
by_cat = analysis.spending_by_category()
|
|
assert len(by_cat) >= 1
|
|
|
|
# Recurring detection works
|
|
detector = RecurringDetector(session)
|
|
recurring = detector.detect()
|
|
assert len(recurring) >= 0 # May or may not find patterns depending on date range
|
|
```
|
|
|
|
**Step 2: Run test**
|
|
|
|
Run: `pytest tests/test_integration.py -v`
|
|
Expected: PASS
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add tests/test_integration.py
|
|
git commit -m "test: end-to-end integration test with real CSV data"
|
|
```
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
| Phase | Tasks | Focus |
|
|
|-------|-------|-------|
|
|
| 1: Foundation | 1-3 | Project setup, models, seed data |
|
|
| 2: Import Pipeline | 4-6 | Normalizer, CSV reader, import service |
|
|
| 3: Categorization | 7-8 | Rule engine, post-import categorization |
|
|
| 4: Analysis | 9-11 | Spending analysis, recurring detection, forecasting |
|
|
| 5: UI | 12-17 | Main window, import wizard, transactions, analysis charts, recurring, settings |
|
|
| 6: Polish | 18-21 | Theming, transfer detection, default rules, integration test |
|
|
|
|
Tasks 1-11 are fully TDD. Tasks 12-18 are UI-focused with manual verification. Tasks 19-21 return to TDD for backend features and integration testing.
|