"""SQLite-backed deduplication and delivery-state tracking.""" import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Generator, Iterable DB_PATH = Path(__file__).parent.parent / "data" / "mailrelay.db" # Delivery states STATE_PENDING = "pending" # MBOX generated, not yet downloaded STATE_DELIVERED = "delivered" # IMAP pushed or MBOX confirmed downloaded def _connect() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn @contextmanager def _db() -> Generator[sqlite3.Connection, None, None]: conn = _connect() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db() -> None: """Create tables if they don't exist.""" with _db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS messages ( message_id TEXT PRIMARY KEY, state TEXT NOT NULL DEFAULT 'delivered', mbox_path TEXT, created_at DATETIME DEFAULT (datetime('now')), updated_at DATETIME DEFAULT (datetime('now')) ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_state ON messages(state) """) def is_known(message_id: str) -> bool: """Return True if a message ID is already tracked (any state).""" with _db() as conn: row = conn.execute( "SELECT 1 FROM messages WHERE message_id = ?", (message_id,) ).fetchone() return row is not None def filter_new(message_ids: Iterable[str]) -> list[str]: """Return only the IDs not yet in the database.""" ids = list(message_ids) if not ids: return [] with _db() as conn: placeholders = ",".join("?" * len(ids)) known = { row[0] for row in conn.execute( f"SELECT message_id FROM messages WHERE message_id IN ({placeholders})", ids, ) } return [mid for mid in ids if mid not in known] def mark_pending(message_ids: Iterable[str], mbox_path: str) -> None: """Record message IDs as pending (MBOX created, not yet downloaded).""" with _db() as conn: conn.executemany( """ INSERT INTO messages (message_id, state, mbox_path) VALUES (?, ?, ?) ON CONFLICT(message_id) DO UPDATE SET state = excluded.state, mbox_path = excluded.mbox_path, updated_at = datetime('now') """, [(mid, STATE_PENDING, mbox_path) for mid in message_ids], ) def mark_delivered(message_ids: Iterable[str]) -> None: """Mark message IDs as fully delivered.""" ids = list(message_ids) if not ids: return with _db() as conn: conn.executemany( """ INSERT INTO messages (message_id, state) VALUES (?, ?) ON CONFLICT(message_id) DO UPDATE SET state = excluded.state, mbox_path = NULL, updated_at = datetime('now') """, [(mid, STATE_DELIVERED) for mid in ids], ) def get_pending_mboxes() -> list[dict]: """Return all distinct pending MBOX paths with their message IDs.""" with _db() as conn: rows = conn.execute( "SELECT message_id, mbox_path FROM messages WHERE state = ?", (STATE_PENDING,), ).fetchall() by_path: dict[str, list[str]] = {} for row in rows: path = row["mbox_path"] by_path.setdefault(path, []).append(row["message_id"]) return [{"mbox_path": path, "message_ids": ids} for path, ids in by_path.items()] def clear_pending_for_mbox(mbox_path: str) -> list[str]: """Remove pending state for a given MBOX (used on cleanup/re-process). Returns the list of message IDs that were pending for that MBOX. """ with _db() as conn: rows = conn.execute( "SELECT message_id FROM messages WHERE state = ? AND mbox_path = ?", (STATE_PENDING, mbox_path), ).fetchall() message_ids = [row["message_id"] for row in rows] conn.execute( "DELETE FROM messages WHERE state = ? AND mbox_path = ?", (STATE_PENDING, mbox_path), ) return message_ids