Files
hermes-config/scripts/weekly_reading_digest.py
T

358 lines
14 KiB
Python

#!/usr/bin/env python3
"""Weekly reading digest: check Calibre-web, detect new reads, recommend, format message."""
import json, os, re, urllib.request, urllib.parse, http.cookiejar, base64
from datetime import datetime, timezone, timedelta
from collections import defaultdict
BASE = "http://192.168.68.190:8083"
USER = "andy"
PASS = "Nimbly-Rumble-Unlucky9"
STATE_FILE = "/root/.hermes/reading_state.json"
LIBRARY_FILE = "/root/.hermes/reading_library.json"
AUTH = base64.b64encode(f"{USER}:{PASS}".encode()).decode()
NS = {"atom": "http://www.w3.org/2005/Atom"}
def log(msg):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] {msg}")
def opds_fetch(path):
req = urllib.request.Request(f"{BASE}{path}")
req.add_header("Authorization", f"Basic {AUTH}")
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.read().decode()
def session_login():
"""Login and return cookie-authenticated opener."""
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
# Get CSRF token
resp = opener.open(f"{BASE}/login")
html = resp.read().decode()
m = re.search(r'csrf_token" value="([^"]+)"', html)
csrf = m.group(1) if m else ""
# Login
data = urllib.parse.urlencode({
"csrf_token": csrf, "username": USER, "password": PASS,
"remember_me": "true", "submit": ""
}).encode()
opener.open(f"{BASE}/login", data=data)
return opener
def get_all_library_books():
"""Fetch full library from OPDS feeds."""
root_xml = opds_fetch("/opds/books")
root = ET.fromstring(root_xml)
letter_paths = []
for entry in root.findall("atom:entry", NS):
link = entry.find("atom:link", NS)
if link is not None:
h = link.get("href", "")
if h:
letter_paths.append(h)
all_books = []
seen = set()
for path in letter_paths:
xml = opds_fetch(path)
feed = ET.fromstring(xml)
for entry in feed.findall("atom:entry", NS):
title_el = entry.find("atom:title", NS)
title = title_el.text.strip() if title_el is not None and title_el.text else "Unknown"
author_el = entry.find("atom:author", NS)
author = "Unknown"
if author_el is not None:
n = author_el.find("atom:name", NS)
if n is not None and n.text:
author = n.text.strip()
key = f"{title}|{author}"
if key in seen:
continue
seen.add(key)
pub_el = entry.find("atom:published", NS)
pub_year = pub_el.text[:4] if pub_el is not None and pub_el.text else ""
cats = [c.get("label", "") for c in entry.findall("atom:category", NS)]
all_books.append({"title": title, "author": author, "pub_year": pub_year, "categories": cats})
return all_books
def get_read_books(opener):
"""Get list of currently read books from Calibre-web."""
resp = opener.open(f"{BASE}/read/stored")
html = resp.read().decode()
# Parse read count from heading
m = re.search(r'Read Books \((\d+)\)', html)
read_count = int(m.group(1)) if m else 0
# Only extract book links AFTER the "Read Books (N)" heading
# Find the heading position
heading_marker = f'<h2 class="Read Books ({read_count})">'
read_section_start = html.find(heading_marker)
if read_section_start == -1:
log("WARNING: Could not find Read Books section heading")
return [], 0
# Slice HTML from that point onward
read_section = html[read_section_start:]
# Extract unique book IDs from the read section
book_ids = list(dict.fromkeys(re.findall(r'href="/book/(\d+)"', read_section)))
# Fetch titles for each book
read_titles = []
for bid in book_ids:
resp = opener.open(f"{BASE}/book/{bid}")
bhtml = resp.read().decode()
tm = re.search(r'<h2[^>]*>\s*([^<]+)', bhtml)
title = tm.group(1).strip() if tm else f"Book {bid}"
# Clean HTML entities
title = title.replace('&#39;', "'").replace('&amp;', '&').replace('&quot;', '"')
read_titles.append(title)
return read_titles, read_count
def get_series_from_title(title, library):
"""Guess series from title patterns."""
series_map = {
"Reacher": "Jack Reacher",
"Mistborn": "Mistborn",
"Stormlight": "Stormlight Archive",
"Skyward": "Skyward",
"Dungeon Crawler Carl": "Dungeon Crawler Carl",
"Murderbot": "Murderbot Diaries",
"Winternight": "Winternight Trilogy",
"Earthsea": "Earthsea Cycle",
}
for keyword, series_name in series_map.items():
if keyword.lower() in title.lower():
return series_name
return None
def generate_digest(library, state, new_reads):
"""Generate a human-friendly digest message."""
lines = []
now = datetime.now(timezone.utc) - timedelta(hours=4) # EDT
lines.append(f"📚 **weekly reading digest** — {now.strftime('%A, %B %d')}")
lines.append("")
known = state.get("known_read_books", [])
history = state.get("reading_history", [])
total_read = len(known)
# Previously known count (before this run)
prev_count = state.get("_prev_read_count", 0) or (total_read - len(new_reads))
if new_reads:
lines.append("**newly finished:**")
for book in new_reads:
lines.append(f" ✅ {book['title']}{book['author']}")
lines.append("")
# Pace analysis
sorted_history = sorted(history, key=lambda x: x.get("detected_on", ""))
if len(sorted_history) >= 2:
# Calculate average pace
dates = []
for h in sorted_history:
try:
dates.append(datetime.strptime(h["detected_on"], "%Y-%m-%d"))
except:
pass
if len(dates) >= 2:
gaps = [(dates[i] - dates[i-1]).days for i in range(1, len(dates))]
avg_gap = sum(gaps) / len(gaps)
last_gap = gaps[-1]
lines.append(f"**pace:** you've read {total_read} book{'s' if total_read != 1 else ''} total")
lines.append(f" avg {avg_gap:.0f} days between books")
if last_gap > avg_gap and last_gap > 14:
lines.append(f" ⏳ last one took {last_gap} days — slower than usual but still going")
elif last_gap > 14:
lines.append(f" it's been {last_gap} days since your last finish — no rush, just checking in")
lines.append("")
# Days since last read
if sorted_history:
last_date = sorted_history[-1].get("detected_on", "")
if last_date:
try:
last_dt = datetime.strptime(last_date, "%Y-%m-%d")
days_since = (datetime.now() - last_dt).days
if days_since == 0:
lines.append(f"you just finished a book — nice!")
elif days_since <= 3:
lines.append(f"last finish was {days_since} days ago — fresh!")
elif days_since <= 14:
lines.append(f"it's been {days_since} days since you finished your last book")
else:
lines.append(f"⏰ it's been {days_since} days — want me to suggest something?")
lines.append("")
except:
pass
# Recommendation
# Find unread books from authors/genres the user likes
user_authors = set(h["author"] for h in sorted_history)
user_interests = {
"Lee Child": "Jack Reacher series",
"Matt Dinniman": "Dungeon Crawler Carl",
"Joe Abercrombie": "First Law world (grimdark fantasy)",
"Brandon Sanderson": "Cosmere (Mistborn / Stormlight)",
"Martha Wells": "Murderbot Diaries",
"Christopher Buehlman": "historical/sff",
"Adrian Tchaikovsky": "sff",
"Susanna Clarke": "literary fantasy",
"Scott Hawkins": "The Library at Mount Char",
}
# Current series the user is working through
reading_series = []
for h in sorted_history:
if h["author"] == "Lee Child" and "Reacher" not in h.get("title", ""):
pass
s = get_series_from_title(h.get("title", ""), library)
if s:
reading_series.append(s)
# Find unread books that match interests
recommendations = []
# If reading Reacher series, suggest next unread Reacher book (by series order)
reacher_series_order = [
"Killing Floor", "Die Trying", "Tripwire", "Running Blind", "Echo Burning",
"Without Fail", "Persuader", "The Enemy", "One Shot", "The Hard Way",
"Bad Luck and Trouble", "Nothing to Lose", "Gone Tomorrow", "61 Hours",
"Worth Dying For", "The Affair", "A Wanted Man", "Never Go Back",
"Personal", "Make Me", "Night School", "The Midnight Line", "Past Tense",
"Blue Moon", "The Sentinel", "Better Off Dead", "No Plan B", "The Secret",
"Safe Enough"
]
reacher_books = [b for b in library if "Lee Child" in b["author"]]
# Map library titles to series order
read_reacher_titles = set(h["title"] for h in sorted_history if h["author"] == "Lee Child")
# Find the highest read series number
last_read_idx = -1
for i, title in enumerate(reacher_series_order):
if any(title.lower() in rt.lower() or rt.lower() in title.lower() for rt in read_reacher_titles):
last_read_idx = i
# Suggest next unread
for i in range(last_read_idx + 1, len(reacher_series_order)):
series_title = reacher_series_order[i]
# Check if this book exists in library
for b in reacher_books:
if series_title.lower() in b["title"].lower() or b["title"].lower() in series_title.lower():
read_before = any(
(t.lower() in b["title"].lower() or b["title"].lower() in t.lower())
for t in read_reacher_titles
)
if not read_before:
recommendations.append(f"next in the Reacher series: **{b['title']}** (book #{i+1})")
break
if len(recommendations) > 0 and recommendations[-1].startswith("next in the Reacher"):
break
# If reading DCC, suggest next
dcc_books = [b for b in library if "Matt Dinniman" in b["author"]]
read_dcc = set(h["title"] for h in sorted_history if h["author"] == "Matt Dinniman")
unread_dcc = [b for b in dcc_books if b["title"] not in read_dcc]
if unread_dcc:
next_dcc = unread_dcc[0]
recommendations.append(f"next Dungeon Crawler Carl: **{next_dcc['title']}**")
# If they like fantasy/sanderson, suggest from unread Sanderson
sanderson_books = [b for b in library if "Brandon Sanderson" in b["author"] and "Sanderson" in b["author"]]
read_sanderson = set(h["title"] for h in sorted_history if h["author"] == "Brandon Sanderson")
unread_sanderson = [b for b in sanderson_books if b["title"] not in read_sanderson]
if unread_sanderson and not recommendations:
rec = unread_sanderson[0]
recommendations.append(f"from your Sanderson shelf: **{rec['title']}**")
# If nothing specific, suggest something random that's not horror (user likes sff/mystery)
if not recommendations:
# Pick something from a user-interest author
for author, desc in user_interests.items():
author_books = [b for b in library if author in b["author"] and b["title"] not in set(h["title"] for h in sorted_history)]
if author_books:
rec = author_books[0]
recommendations.append(f"from your {desc} shelf: **{rec['title']}** ({author})")
break
if recommendations:
lines.append("**suggested next reads:**")
for r in recommendations[:3]:
lines.append(f" 📖 {r}")
lines.append("")
lines.append("keep reading at your own pace. slow is still forward. 📚")
return "\n".join(lines)
# Main
import xml.etree.ElementTree as ET
log("Starting weekly reading digest check")
# Load state
state = {"known_read_books": [], "reading_history": [], "weekly_digest_sent": 0, "last_check": ""}
if os.path.exists(STATE_FILE):
with open(STATE_FILE) as f:
state = json.load(f)
known_titles = set(b["title"] for b in state.get("known_read_books", []))
# Load library (or fetch if not cached)
library = []
if os.path.exists(LIBRARY_FILE):
with open(LIBRARY_FILE) as f:
data = json.load(f)
library = data.get("library", [])
else:
log("No library cache, fetching...")
library = get_all_library_books()
with open(LIBRARY_FILE, "w") as f:
json.dump({"library": library, "total_books_in_library": len(library), "read_count": 0}, f, indent=2)
log(f"Library: {len(library)} books")
# Current read list
opener = session_login()
current_titles, read_count = get_read_books(opener)
log(f"Currently read: {read_count} books marked as read")
log(f"Read titles: {current_titles}")
# Detect new reads
current_set = set(current_titles)
new_reads = []
for title in current_titles:
if title not in known_titles:
# Find author
author = "Unknown"
for b in library:
if b["title"] == title:
author = b["author"]
break
new_reads.append({"title": title, "author": author, "detected_on": datetime.now().strftime("%Y-%m-%d")})
# Update state
if new_reads:
log(f"New books detected: {[n['title'] for n in new_reads]}")
state["known_read_books"].extend(new_reads)
state["reading_history"].extend(new_reads)
state["_new_this_week"] = len(new_reads)
# Preserve prev count for comparison
state["_prev_read_count"] = len(known_titles)
state["last_check"] = datetime.now().strftime("%Y-%m-%d")
state["weekly_digest_sent"] = state.get("weekly_digest_sent", 0) + 1
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
log("State updated")
# Generate digest
digest = generate_digest(library, state, new_reads)
print("\n" + "="*50)
print(digest)
print("="*50)