Guide
Call a local Python function to load your inbox, inject the results into a prompt, and get a categorized summary — in a single @effect run.
from src.applemail import load_emails
@effect func load_emails(limit=20) => result
<<
Summarize the last 20 emails I've gotten.
- Categorize them.
- Give me an action item list.
- Ignore spam type emails.
Here are the emails:
${result}
>>
@effect run
Reads the Apple Mail Envelope Index via SQLite and returns a list of
Email objects. Called directly by @effect func —
no API wrapper needed.
import sqlite3
import os
import glob
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import emlx
MAIL_BASE = Path.home() / "Library" / "Mail"
# Apple Mail uses a Mac-epoch: seconds since 2001-01-01
MAC_EPOCH_OFFSET = 978307200
@dataclass
class Email:
message_id: str
subject: str
sender: str
recipients: list[str]
date: datetime | None
mailbox: str
flags: int
emlx_path: str | None = None
body_plain: str | None = None
body_html: str | None = None
def _find_envelope_index() -> Path:
matches = sorted(MAIL_BASE.glob("V*/MailData/Envelope Index"))
if not matches:
raise FileNotFoundError(
f"No Apple Mail Envelope Index found under {MAIL_BASE}. "
"Ensure Full Disk Access is granted to this terminal."
)
return matches[-1] # use highest version
def _mac_epoch_to_datetime(ts: float | None) -> datetime | None:
if ts is None:
return None
return datetime.fromtimestamp(ts + MAC_EPOCH_OFFSET, tz=timezone.utc)
def _find_emlx(mail_base: Path, message_id: int) -> str | None:
pattern = str(mail_base / "**" / f"{message_id}.emlx")
results = glob.glob(pattern, recursive=True)
return results[0] if results else None
def _table_columns(conn: sqlite3.Connection, table: str) -> list[str]:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
return [r[1] for r in rows]
def _recipients_fk_col(conn: sqlite3.Connection) -> str:
"""Return the column name in recipients that references messages.ROWID."""
cols = _table_columns(conn, "recipients")
for candidate in ("message_id", "message", "msg_id", "mid"):
if candidate in cols:
return candidate
# Fallback: first integer column that isn't the PK or address
raise RuntimeError(f"Cannot determine recipients FK column. Columns: {cols}")
def load_emails(
limit: int = 100,
mailbox_filter: str | None = None,
load_body: bool = False,
) -> Iterator[Email]:
"""
Yield Email objects from the Apple Mail Envelope Index.
Args:
limit: Maximum number of emails to return.
mailbox_filter: Optional mailbox name substring to filter by.
load_body: If True, read the .emlx file for body content (slower).
"""
index_path = _find_envelope_index()
mail_base = index_path.parent.parent # V*/
conn = sqlite3.connect(f"file:{index_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
query = """
SELECT
m.ROWID AS row_id,
m.message_id AS message_id,
COALESCE(su.subject, CAST(m.subject AS TEXT)) AS subject,
m.date_sent AS date_sent,
m.flags AS flags,
s.address AS sender_address,
s.comment AS sender_name,
mb.url AS mailbox_url
FROM messages m
LEFT JOIN subjects su ON m.subject = su.ROWID
LEFT JOIN addresses s ON m.sender = s.ROWID
LEFT JOIN mailboxes mb ON m.mailbox = mb.ROWID
"""
params: list = []
if mailbox_filter:
query += " WHERE mb.url LIKE ?"
params.append(f"%{mailbox_filter}%")
query += " ORDER BY m.date_sent DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
fk_col = _recipients_fk_col(conn)
# Fetch recipient addresses for each message
for row in cursor:
row_id = row["row_id"]
recipients = [
r["address"]
for r in conn.execute(
f"""
SELECT a.address FROM recipients r
JOIN addresses a ON r.address = a.ROWID
WHERE r.{fk_col} = ?
""",
(row_id,),
)
]
sender = row["sender_address"] or ""
if row["sender_name"]:
sender = f"{row['sender_name']} <{sender}>"
emlx_path = _find_emlx(mail_base, row_id) if load_body else None
body_plain = body_html = None
if emlx_path:
try:
msg = emlx.read(emlx_path)
body_plain = msg.plain_text
body_html = msg.html
except Exception:
pass
yield Email(
message_id=row["message_id"] or "",
subject=row["subject"] or "(no subject)",
sender=sender,
recipients=recipients,
date=_mac_epoch_to_datetime(row["date_sent"]),
mailbox=row["mailbox_url"] or "",
flags=row["flags"] or 0,
emlx_path=emlx_path,
body_plain=body_plain,
body_html=body_html,
)
finally:
conn.close()
def list_mailboxes() -> list[str]:
"""Return all mailbox URLs in the Envelope Index."""
index_path = _find_envelope_index()
conn = sqlite3.connect(f"file:{index_path}?mode=ro", uri=True)
try:
rows = conn.execute("SELECT url FROM mailboxes ORDER BY url").fetchall()
return [r[0] for r in rows if r[0]]
finally:
conn.close()
The only dependency is emlx — a small library for reading
Apple Mail .emlx files. Install with uv sync.
[project]
name = "summarize-emails"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"emlx>=1.0.4",
]