185 lines
5.6 KiB
Python
185 lines
5.6 KiB
Python
"""Scan an export directory, pair EML + metadata, check dedup, return new emails.
|
|
|
|
Proton exports produce two files per message:
|
|
{messageID}.eml
|
|
{messageID}.metadata.json
|
|
|
|
Metadata fields of interest (Proton-specific):
|
|
Subject, SenderAddress, SenderName, ToList, CCList, BCCList,
|
|
Time (Unix timestamp), Unread, LabelIDs, ExternalID, NumAttachments
|
|
|
|
These are mapped to standard RFC 5322 headers when they are missing from the
|
|
raw EML (Proton sometimes omits headers in the raw export).
|
|
"""
|
|
|
|
import email
|
|
import email.policy
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from email.message import EmailMessage
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from .database import filter_new
|
|
from .logger import get_logger
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class RichEmail:
|
|
"""An enriched email ready for delivery."""
|
|
message_id: str
|
|
message: EmailMessage # the (possibly augmented) email object
|
|
raw_bytes: bytes # final RFC 2822 bytes
|
|
metadata: dict = field(default_factory=dict)
|
|
|
|
|
|
class ProcessorError(Exception):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def scan_and_filter(export_dir: Path) -> list[RichEmail]:
|
|
"""Scan *export_dir*, pair EML+metadata, filter already-seen IDs.
|
|
|
|
Returns a list of RichEmail objects for new messages only.
|
|
"""
|
|
pairs = _find_pairs(export_dir)
|
|
log.info("Found %d exported message(s) in %s", len(pairs), export_dir)
|
|
|
|
ids = list(pairs.keys())
|
|
new_ids = filter_new(ids)
|
|
log.info("%d new message(s) after deduplication.", len(new_ids))
|
|
|
|
results: list[RichEmail] = []
|
|
for mid in new_ids:
|
|
eml_path, meta_path = pairs[mid]
|
|
try:
|
|
rich = _build_rich_email(mid, eml_path, meta_path)
|
|
results.append(rich)
|
|
except Exception as exc:
|
|
log.warning("Skipping %s — could not process: %s", mid, exc)
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _find_pairs(export_dir: Path) -> dict[str, tuple[Path, Optional[Path]]]:
|
|
"""Return {messageID: (eml_path, meta_path_or_None)} for every .eml found."""
|
|
pairs: dict[str, tuple[Path, Optional[Path]]] = {}
|
|
|
|
for eml_path in export_dir.glob("*.eml"):
|
|
mid = eml_path.stem
|
|
meta_path = eml_path.with_suffix(".metadata.json")
|
|
if not meta_path.exists():
|
|
meta_path = None
|
|
log.debug("No metadata file for %s", mid)
|
|
pairs[mid] = (eml_path, meta_path)
|
|
|
|
return pairs
|
|
|
|
|
|
def _build_rich_email(
|
|
message_id: str,
|
|
eml_path: Path,
|
|
meta_path: Optional[Path],
|
|
) -> RichEmail:
|
|
raw = eml_path.read_bytes()
|
|
msg: EmailMessage = email.message_from_bytes(
|
|
raw, policy=email.policy.default
|
|
) # type: ignore[assignment]
|
|
|
|
metadata: dict = {}
|
|
if meta_path:
|
|
try:
|
|
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
|
|
except Exception as exc:
|
|
log.warning("Could not parse metadata for %s: %s", message_id, exc)
|
|
|
|
# Augment missing headers from metadata
|
|
_merge_metadata(msg, metadata)
|
|
|
|
final_bytes = msg.as_bytes(policy=email.policy.SMTP)
|
|
return RichEmail(
|
|
message_id=message_id,
|
|
message=msg,
|
|
raw_bytes=final_bytes,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
def _merge_metadata(msg: EmailMessage, meta: dict) -> None:
|
|
"""Back-fill standard headers from Proton metadata where missing."""
|
|
if not meta:
|
|
return
|
|
|
|
# Subject
|
|
if not msg.get("Subject") and meta.get("Subject"):
|
|
msg["Subject"] = meta["Subject"]
|
|
|
|
# From
|
|
if not msg.get("From"):
|
|
sender_addr = meta.get("SenderAddress", "")
|
|
sender_name = meta.get("SenderName", "")
|
|
if sender_addr:
|
|
from_value = (
|
|
f'"{sender_name}" <{sender_addr}>'
|
|
if sender_name
|
|
else sender_addr
|
|
)
|
|
msg["From"] = from_value
|
|
|
|
# To
|
|
if not msg.get("To"):
|
|
to_list = meta.get("ToList", [])
|
|
if to_list:
|
|
msg["To"] = _format_address_list(to_list)
|
|
|
|
# CC
|
|
if not msg.get("Cc"):
|
|
cc_list = meta.get("CCList", [])
|
|
if cc_list:
|
|
msg["Cc"] = _format_address_list(cc_list)
|
|
|
|
# BCC
|
|
if not msg.get("Bcc"):
|
|
bcc_list = meta.get("BCCList", [])
|
|
if bcc_list:
|
|
msg["Bcc"] = _format_address_list(bcc_list)
|
|
|
|
# Date — Proton uses Unix timestamp in "Time"
|
|
if not msg.get("Date") and meta.get("Time"):
|
|
import email.utils
|
|
msg["Date"] = email.utils.formatdate(meta["Time"], localtime=False)
|
|
|
|
# Message-ID — prefer ExternalID if the EML header is missing
|
|
if not msg.get("Message-ID") and meta.get("ExternalID"):
|
|
msg["Message-ID"] = f"<{meta['ExternalID']}>"
|
|
|
|
# X-Proton-* passthrough headers for labels and read status
|
|
if meta.get("LabelIDs"):
|
|
msg["X-Proton-LabelIDs"] = ",".join(str(l) for l in meta["LabelIDs"])
|
|
if "Unread" in meta:
|
|
msg["X-Proton-Unread"] = str(meta["Unread"])
|
|
|
|
|
|
def _format_address_list(entries: list) -> str:
|
|
"""Convert Proton address list entries to RFC 5322 address string."""
|
|
parts = []
|
|
for entry in entries:
|
|
if isinstance(entry, dict):
|
|
name = entry.get("Name", "")
|
|
addr = entry.get("Address", "")
|
|
if addr:
|
|
parts.append(f'"{name}" <{addr}>' if name else addr)
|
|
elif isinstance(entry, str):
|
|
parts.append(entry)
|
|
return ", ".join(parts)
|