feat: add ImportService for CSV import with duplicate detection

Orchestrates CSV reading, description normalization, and transaction
storage. Uses count-based duplicate detection so legitimate repeated
transactions (same date/amount/description) in a single file are all
imported, while re-importing the same file correctly identifies every
row as a duplicate.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-10 14:44:58 -05:00
parent b7746ece4f
commit 1e12e563a7
2 changed files with 197 additions and 0 deletions

View File

@@ -0,0 +1,110 @@
# tests/services/test_importer.py
import datetime
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from src.db import Base
from src.models import *
from src.seed import seed_categories
from src.services.importer import ImportService
RAWDATA = Path(__file__).parent.parent.parent / "rawdata"
def make_session():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
return Session(engine)
def setup_chase_account(session):
member = HouseholdMember(name="Andrew", relationship="self")
session.add(member)
session.flush()
account = Account(name="Chase Freedom", institution="Chase", account_type="credit", owner_id=member.id)
session.add(account)
session.flush()
seed_categories(session)
return account
def setup_checking_account(session):
member = HouseholdMember(name="Andrew", relationship="self")
session.add(member)
session.flush()
account = Account(name="WF Checking", institution="Wells Fargo", account_type="checking", owner_id=member.id, is_shared=True)
session.add(account)
session.flush()
seed_categories(session)
return account
def test_import_chase_csv():
session = make_session()
account = setup_chase_account(session)
column_map = {
"date": "Transaction Date",
"amount": "Amount",
"description": "Description",
"source_category": "Category",
}
svc = ImportService(session)
result = svc.import_csv(
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
account_id=account.id,
column_map=column_map,
amount_logic="signed",
)
assert result["imported"] > 0
assert result["duplicates"] == 0
txns = session.query(Transaction).all()
assert len(txns) == result["imported"]
sephora = [t for t in txns if "SEPHORA" in t.description]
assert len(sephora) == 1
assert float(sephora[0].amount) == -75.00
def test_import_checking_csv():
session = make_session()
account = setup_checking_account(session)
column_map = {
"date": 0,
"amount": 1,
"description": 4,
}
svc = ImportService(session)
result = svc.import_csv(
RAWDATA / "Checking1.csv",
account_id=account.id,
column_map=column_map,
amount_logic="signed",
)
assert result["imported"] > 0
txns = session.query(Transaction).all()
assert len(txns) > 50
def test_duplicate_detection():
session = make_session()
account = setup_chase_account(session)
column_map = {
"date": "Transaction Date",
"amount": "Amount",
"description": "Description",
}
svc = ImportService(session)
result1 = svc.import_csv(
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
account_id=account.id,
column_map=column_map,
amount_logic="signed",
)
result2 = svc.import_csv(
RAWDATA / "Chase0372_Activity20260101_20260210_20260210.CSV",
account_id=account.id,
column_map=column_map,
amount_logic="signed",
)
assert result2["duplicates"] == result1["imported"]
assert result2["imported"] == 0