#!/usr/bin/env python3 """Weekly reading digest: check Calibre-web, detect new reads, recommend, format message.""" import json, os, re, urllib.request, urllib.parse, http.cookiejar, base64 from datetime import datetime, timezone, timedelta from collections import defaultdict BASE = "http://192.168.68.190:8083" USER = "andy" PASS = "Nimbly-Rumble-Unlucky9" STATE_FILE = "/root/.hermes/reading_state.json" LIBRARY_FILE = "/root/.hermes/reading_library.json" AUTH = base64.b64encode(f"{USER}:{PASS}".encode()).decode() NS = {"atom": "http://www.w3.org/2005/Atom"} def log(msg): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{now}] {msg}") def opds_fetch(path): req = urllib.request.Request(f"{BASE}{path}") req.add_header("Authorization", f"Basic {AUTH}") with urllib.request.urlopen(req, timeout=15) as resp: return resp.read().decode() def session_login(): """Login and return cookie-authenticated opener.""" cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) # Get CSRF token resp = opener.open(f"{BASE}/login") html = resp.read().decode() m = re.search(r'csrf_token" value="([^"]+)"', html) csrf = m.group(1) if m else "" # Login data = urllib.parse.urlencode({ "csrf_token": csrf, "username": USER, "password": PASS, "remember_me": "true", "submit": "" }).encode() opener.open(f"{BASE}/login", data=data) return opener def get_all_library_books(): """Fetch full library from OPDS feeds.""" root_xml = opds_fetch("/opds/books") root = ET.fromstring(root_xml) letter_paths = [] for entry in root.findall("atom:entry", NS): link = entry.find("atom:link", NS) if link is not None: h = link.get("href", "") if h: letter_paths.append(h) all_books = [] seen = set() for path in letter_paths: xml = opds_fetch(path) feed = ET.fromstring(xml) for entry in feed.findall("atom:entry", NS): title_el = entry.find("atom:title", NS) title = title_el.text.strip() if title_el is not None and title_el.text else "Unknown" author_el = entry.find("atom:author", NS) author = "Unknown" if author_el is not None: n = author_el.find("atom:name", NS) if n is not None and n.text: author = n.text.strip() key = f"{title}|{author}" if key in seen: continue seen.add(key) pub_el = entry.find("atom:published", NS) pub_year = pub_el.text[:4] if pub_el is not None and pub_el.text else "" cats = [c.get("label", "") for c in entry.findall("atom:category", NS)] all_books.append({"title": title, "author": author, "pub_year": pub_year, "categories": cats}) return all_books def get_read_books(opener): """Get list of currently read books from Calibre-web.""" resp = opener.open(f"{BASE}/read/stored") html = resp.read().decode() # Parse read count from heading m = re.search(r'Read Books \((\d+)\)', html) read_count = int(m.group(1)) if m else 0 # Only extract book links AFTER the "Read Books (N)" heading # Find the heading position heading_marker = f'

' read_section_start = html.find(heading_marker) if read_section_start == -1: log("WARNING: Could not find Read Books section heading") return [], 0 # Slice HTML from that point onward read_section = html[read_section_start:] # Extract unique book IDs from the read section book_ids = list(dict.fromkeys(re.findall(r'href="/book/(\d+)"', read_section))) # Fetch titles for each book read_titles = [] for bid in book_ids: resp = opener.open(f"{BASE}/book/{bid}") bhtml = resp.read().decode() tm = re.search(r']*>\s*([^<]+)', bhtml) title = tm.group(1).strip() if tm else f"Book {bid}" # Clean HTML entities title = title.replace(''', "'").replace('&', '&').replace('"', '"') read_titles.append(title) return read_titles, read_count def get_series_from_title(title, library): """Guess series from title patterns.""" series_map = { "Reacher": "Jack Reacher", "Mistborn": "Mistborn", "Stormlight": "Stormlight Archive", "Skyward": "Skyward", "Dungeon Crawler Carl": "Dungeon Crawler Carl", "Murderbot": "Murderbot Diaries", "Winternight": "Winternight Trilogy", "Earthsea": "Earthsea Cycle", } for keyword, series_name in series_map.items(): if keyword.lower() in title.lower(): return series_name return None def generate_digest(library, state, new_reads): """Generate a human-friendly digest message.""" lines = [] now = datetime.now(timezone.utc) - timedelta(hours=4) # EDT lines.append(f"📚 **weekly reading digest** — {now.strftime('%A, %B %d')}") lines.append("") known = state.get("known_read_books", []) history = state.get("reading_history", []) total_read = len(known) # Previously known count (before this run) prev_count = state.get("_prev_read_count", 0) or (total_read - len(new_reads)) if new_reads: lines.append("**newly finished:**") for book in new_reads: lines.append(f" ✅ {book['title']} — {book['author']}") lines.append("") # Pace analysis sorted_history = sorted(history, key=lambda x: x.get("detected_on", "")) if len(sorted_history) >= 2: # Calculate average pace dates = [] for h in sorted_history: try: dates.append(datetime.strptime(h["detected_on"], "%Y-%m-%d")) except: pass if len(dates) >= 2: gaps = [(dates[i] - dates[i-1]).days for i in range(1, len(dates))] avg_gap = sum(gaps) / len(gaps) last_gap = gaps[-1] lines.append(f"**pace:** you've read {total_read} book{'s' if total_read != 1 else ''} total") lines.append(f" avg {avg_gap:.0f} days between books") if last_gap > avg_gap and last_gap > 14: lines.append(f" ⏳ last one took {last_gap} days — slower than usual but still going") elif last_gap > 14: lines.append(f" it's been {last_gap} days since your last finish — no rush, just checking in") lines.append("") # Days since last read if sorted_history: last_date = sorted_history[-1].get("detected_on", "") if last_date: try: last_dt = datetime.strptime(last_date, "%Y-%m-%d") days_since = (datetime.now() - last_dt).days if days_since == 0: lines.append(f"you just finished a book — nice!") elif days_since <= 3: lines.append(f"last finish was {days_since} days ago — fresh!") elif days_since <= 14: lines.append(f"it's been {days_since} days since you finished your last book") else: lines.append(f"⏰ it's been {days_since} days — want me to suggest something?") lines.append("") except: pass # Recommendation # Find unread books from authors/genres the user likes user_authors = set(h["author"] for h in sorted_history) user_interests = { "Lee Child": "Jack Reacher series", "Matt Dinniman": "Dungeon Crawler Carl", "Joe Abercrombie": "First Law world (grimdark fantasy)", "Brandon Sanderson": "Cosmere (Mistborn / Stormlight)", "Martha Wells": "Murderbot Diaries", "Christopher Buehlman": "historical/sff", "Adrian Tchaikovsky": "sff", "Susanna Clarke": "literary fantasy", "Scott Hawkins": "The Library at Mount Char", } # Current series the user is working through reading_series = [] for h in sorted_history: if h["author"] == "Lee Child" and "Reacher" not in h.get("title", ""): pass s = get_series_from_title(h.get("title", ""), library) if s: reading_series.append(s) # Find unread books that match interests recommendations = [] # If reading Reacher series, suggest next unread Reacher book (by series order) reacher_series_order = [ "Killing Floor", "Die Trying", "Tripwire", "Running Blind", "Echo Burning", "Without Fail", "Persuader", "The Enemy", "One Shot", "The Hard Way", "Bad Luck and Trouble", "Nothing to Lose", "Gone Tomorrow", "61 Hours", "Worth Dying For", "The Affair", "A Wanted Man", "Never Go Back", "Personal", "Make Me", "Night School", "The Midnight Line", "Past Tense", "Blue Moon", "The Sentinel", "Better Off Dead", "No Plan B", "The Secret", "Safe Enough" ] reacher_books = [b for b in library if "Lee Child" in b["author"]] # Map library titles to series order read_reacher_titles = set(h["title"] for h in sorted_history if h["author"] == "Lee Child") # Find the highest read series number last_read_idx = -1 for i, title in enumerate(reacher_series_order): if any(title.lower() in rt.lower() or rt.lower() in title.lower() for rt in read_reacher_titles): last_read_idx = i # Suggest next unread for i in range(last_read_idx + 1, len(reacher_series_order)): series_title = reacher_series_order[i] # Check if this book exists in library for b in reacher_books: if series_title.lower() in b["title"].lower() or b["title"].lower() in series_title.lower(): read_before = any( (t.lower() in b["title"].lower() or b["title"].lower() in t.lower()) for t in read_reacher_titles ) if not read_before: recommendations.append(f"next in the Reacher series: **{b['title']}** (book #{i+1})") break if len(recommendations) > 0 and recommendations[-1].startswith("next in the Reacher"): break # If reading DCC, suggest next dcc_books = [b for b in library if "Matt Dinniman" in b["author"]] read_dcc = set(h["title"] for h in sorted_history if h["author"] == "Matt Dinniman") unread_dcc = [b for b in dcc_books if b["title"] not in read_dcc] if unread_dcc: next_dcc = unread_dcc[0] recommendations.append(f"next Dungeon Crawler Carl: **{next_dcc['title']}**") # If they like fantasy/sanderson, suggest from unread Sanderson sanderson_books = [b for b in library if "Brandon Sanderson" in b["author"] and "Sanderson" in b["author"]] read_sanderson = set(h["title"] for h in sorted_history if h["author"] == "Brandon Sanderson") unread_sanderson = [b for b in sanderson_books if b["title"] not in read_sanderson] if unread_sanderson and not recommendations: rec = unread_sanderson[0] recommendations.append(f"from your Sanderson shelf: **{rec['title']}**") # If nothing specific, suggest something random that's not horror (user likes sff/mystery) if not recommendations: # Pick something from a user-interest author for author, desc in user_interests.items(): author_books = [b for b in library if author in b["author"] and b["title"] not in set(h["title"] for h in sorted_history)] if author_books: rec = author_books[0] recommendations.append(f"from your {desc} shelf: **{rec['title']}** ({author})") break if recommendations: lines.append("**suggested next reads:**") for r in recommendations[:3]: lines.append(f" 📖 {r}") lines.append("") lines.append("keep reading at your own pace. slow is still forward. 📚") return "\n".join(lines) # Main import xml.etree.ElementTree as ET log("Starting weekly reading digest check") # Load state state = {"known_read_books": [], "reading_history": [], "weekly_digest_sent": 0, "last_check": ""} if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: state = json.load(f) known_titles = set(b["title"] for b in state.get("known_read_books", [])) # Load library (or fetch if not cached) library = [] if os.path.exists(LIBRARY_FILE): with open(LIBRARY_FILE) as f: data = json.load(f) library = data.get("library", []) else: log("No library cache, fetching...") library = get_all_library_books() with open(LIBRARY_FILE, "w") as f: json.dump({"library": library, "total_books_in_library": len(library), "read_count": 0}, f, indent=2) log(f"Library: {len(library)} books") # Current read list opener = session_login() current_titles, read_count = get_read_books(opener) log(f"Currently read: {read_count} books marked as read") log(f"Read titles: {current_titles}") # Detect new reads current_set = set(current_titles) new_reads = [] for title in current_titles: if title not in known_titles: # Find author author = "Unknown" for b in library: if b["title"] == title: author = b["author"] break new_reads.append({"title": title, "author": author, "detected_on": datetime.now().strftime("%Y-%m-%d")}) # Update state if new_reads: log(f"New books detected: {[n['title'] for n in new_reads]}") state["known_read_books"].extend(new_reads) state["reading_history"].extend(new_reads) state["_new_this_week"] = len(new_reads) # Preserve prev count for comparison state["_prev_read_count"] = len(known_titles) state["last_check"] = datetime.now().strftime("%Y-%m-%d") state["weekly_digest_sent"] = state.get("weekly_digest_sent", 0) + 1 with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) log("State updated") # Generate digest digest = generate_digest(library, state, new_reads) print("\n" + "="*50) print(digest) print("="*50)