MOLTED EMAIL

Pydantic AI Integration Guide

Add a governed email mailbox to your Pydantic AI agents. Send, receive, and track email with built-in policy enforcement and type-safe tool definitions.

Pydantic AI is a type-safe Python agent framework from the creators of Pydantic. Its dependency injection system and typed tool definitions make it a natural fit for wrapping infrastructure APIs like Molted - you get compile-time safety on your tool inputs and clean separation between agent logic and email infrastructure.

This guide shows how to wire a Molted mailbox into a Pydantic AI agent as a typed tool. Every email the agent proposes runs through the Molted policy engine before delivery - 20+ rules evaluated in under a second. Policy runs at the infrastructure layer and cannot be overridden by the model.

Prerequisites

  • A Molted account with an API key (sign up at molted.email/signup)
  • A verified sending domain (see Domains)
  • Pydantic AI installed: pip install pydantic-ai

1. Create a mailbox for your agent

Each agent (or agent team) should have its own mailbox. Log in to the portal under Mailboxes, or create one via the API:

curl
curl -X POST https://api.molted.email/v1/me/mailboxes \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Outbound Agent",
    "emailAddress": "agent@yourdomain.com"
  }'

Note the mailboxId from the response - you will pass it in each send request.

2. Define your dependencies

Pydantic AI uses dependency injection to pass external services into tools. Define a dataclass to hold your Molted configuration:

deps.py
from dataclasses import dataclass
import httpx


@dataclass
class MailboxDeps:
    api_key: str
    tenant_id: str
    mailbox_id: str
    http_client: httpx.AsyncClient

Using an httpx.AsyncClient in deps lets you control connection pooling and reuse the same client across all tool calls in a run.

3. Define the send tool

Use the @agent.tool decorator with typed arguments. Pydantic validates the inputs before the tool body runs:

agent.py
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps

agent = Agent(
    "openai:gpt-4o",
    deps_type=MailboxDeps,
    system_prompt="""
    You are a customer outreach agent. When sending emails:
    - Always supply a dedupe_key to prevent accidental duplicate sends.
    - If a send is blocked by policy, report the reason to the user - do not retry.
    - 'duplicate_send' means the email was already sent recently - inform the user and stop.
    - 'suppressed_recipient' means this contact cannot be emailed - inform the user and stop.
    - 'rate_limit_exceeded' means the send rate has been reached - wait and inform the user.
    """,
)


@agent.tool
async def send_email(
    ctx: RunContext[MailboxDeps],
    to: str,
    subject: str,
    body: str,
    dedupe_key: str | None = None,
) -> str:
    """Send an email to a contact through the governed mailbox.

    The email is evaluated against policy rules before delivery.
    Blocked sends return a reason and are not retried.

    Args:
        to: Recipient email address.
        subject: Email subject line.
        body: Email body in plain text or HTML.
        dedupe_key: Optional unique key to prevent duplicate sends.
            Defaults to recipient+subject if omitted.
    """
    deps = ctx.deps
    payload = {
        "tenantId": deps.tenant_id,
        "mailboxId": deps.mailbox_id,
        "recipientEmail": to,
        "templateId": "_default",
        "dedupeKey": dedupe_key or f"{to}-{subject}",
        "agentId": "pydantic-ai-agent",
        "payload": {
            "subject": subject,
            "html": body,
            "text": body,
        },
    }

    resp = await deps.http_client.post(
        "https://api.molted.email/v1/agent/send",
        json=payload,
        headers={"Authorization": f"Bearer {deps.api_key}"},
        timeout=10,
    )
    resp.raise_for_status()
    result = resp.json()

    if result["status"] == "blocked":
        return (
            f"Email blocked by policy: {result['blockReason']}. "
            f"Decision trace: {result['requestId']}"
        )

    return f"Email queued. requestId={result['requestId']}, status={result['status']}"

The tool surfaces policy decisions directly to the model. If a send is blocked, the agent sees the reason and can take the appropriate action instead of failing silently or retrying a terminal block.

4. Add a delivery status tool

Your agent can check the delivery status of a previously queued email:

agent.py (continued)
@agent.tool
async def check_email_status(
    ctx: RunContext[MailboxDeps],
    request_id: str,
) -> str:
    """Check the delivery status of a previously queued email.

    Args:
        request_id: The requestId returned when the email was sent.
    """
    deps = ctx.deps
    resp = await deps.http_client.get(
        f"https://api.molted.email/v1/agent/send/{request_id}/status",
        headers={"Authorization": f"Bearer {deps.api_key}"},
        timeout=10,
    )
    resp.raise_for_status()
    data = resp.json()

    provider = data.get("provider", "unknown")
    last_event = data.get("lastEvent", "none")
    return f"Status: {data['status']}. Provider: {provider}. Last event: {last_event}."

5. Run the agent

main.py
import asyncio
import os
import httpx
from agent import agent
from deps import MailboxDeps


async def main():
    async with httpx.AsyncClient() as client:
        deps = MailboxDeps(
            api_key=os.environ["MOLTED_API_KEY"],
            tenant_id=os.environ["MOLTED_TENANT_ID"],
            mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
            http_client=client,
        )

        result = await agent.run(
            "Send a trial welcome email to alice@example.com. "
            "Subject: 'Welcome to the trial' "
            "Body: 'Thanks for signing up - your 14-day trial starts now.'",
            deps=deps,
        )
        print(result.output)


if __name__ == "__main__":
    asyncio.run(main())

6. Handle policy blocks

The policy engine may block a send for several reasons. The tool surfaces these directly to the agent.

ReasonWhat it means
duplicate_sendThe same dedupe_key was used within the cooldown window
rate_limit_exceededThe mailbox or tenant has hit a send rate limit
suppressed_recipientThe recipient has unsubscribed or bounced previously
cooldown_activeA cooldown window is in effect for this recipient
risk_budget_exceededThe agent's risk budget for this period is exhausted

Add explicit handling in your system prompt so the model knows the right action for each block - distinguishing retryable (rate_limit_exceeded) from terminal (suppressed_recipient, duplicate_send) cases.

7. Use structured outputs for email drafts

Pydantic AI's structured output support is useful for generating email drafts before sending. Define a model for the draft and let the agent fill it in before calling send_email:

structured_send.py
import asyncio
import os
import httpx
from pydantic import BaseModel
from pydantic_ai import Agent
from deps import MailboxDeps


class EmailDraft(BaseModel):
    to: str
    subject: str
    body: str
    dedupe_key: str


draft_agent = Agent(
    "openai:gpt-4o",
    output_type=EmailDraft,
    deps_type=MailboxDeps,
    system_prompt="Draft a professional outreach email based on the context provided.",
)


async def draft_and_send(context: str) -> None:
    async with httpx.AsyncClient() as client:
        deps = MailboxDeps(
            api_key=os.environ["MOLTED_API_KEY"],
            tenant_id=os.environ["MOLTED_TENANT_ID"],
            mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
            http_client=client,
        )

        # First, draft the email as a structured object
        draft_result = await draft_agent.run(context, deps=deps)
        draft = draft_result.output

        # Then send via the policy engine
        payload = {
            "tenantId": deps.tenant_id,
            "mailboxId": deps.mailbox_id,
            "recipientEmail": draft.to,
            "templateId": "_default",
            "dedupeKey": draft.dedupe_key,
            "agentId": "pydantic-ai-agent",
            "payload": {
                "subject": draft.subject,
                "html": draft.body,
                "text": draft.body,
            },
        }
        resp = await client.post(
            "https://api.molted.email/v1/agent/send",
            json=payload,
            headers={"Authorization": f"Bearer {deps.api_key}"},
            timeout=10,
        )
        result = resp.json()
        print(f"Send result: {result['status']}")


asyncio.run(draft_and_send(
    "Send a welcome email to new user alice@example.com who signed up for a 14-day trial."
))

This two-step pattern - draft first, send second - is useful when you want to log or inspect the email before it reaches the policy engine.

8. Build a multi-agent setup

Pydantic AI supports agent composition. Scope mailbox access to a dedicated email agent and call it from a coordinator:

multi_agent.py
import asyncio
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps

# Email specialist - the only agent with mailbox deps
email_agent = Agent(
    "openai:gpt-4o",
    deps_type=MailboxDeps,
    system_prompt="""
    You send and track emails for the team.
    Always use a dedupe_key. Report policy blocks clearly - do not retry terminal blocks.
    """,
)


@email_agent.tool
async def send_email(ctx: RunContext[MailboxDeps], to: str, subject: str, body: str, dedupe_key: str | None = None) -> str:
    """Send an email through the governed mailbox."""
    deps = ctx.deps
    payload = {
        "tenantId": deps.tenant_id,
        "mailboxId": deps.mailbox_id,
        "recipientEmail": to,
        "templateId": "_default",
        "dedupeKey": dedupe_key or f"{to}-{subject}",
        "agentId": "email-specialist",
        "payload": {"subject": subject, "html": body, "text": body},
    }
    resp = await deps.http_client.post(
        "https://api.molted.email/v1/agent/send",
        json=payload,
        headers={"Authorization": f"Bearer {deps.api_key}"},
        timeout=10,
    )
    resp.raise_for_status()
    result = resp.json()
    if result["status"] == "blocked":
        return f"Blocked: {result['blockReason']} (requestId={result['requestId']})"
    return f"Sent: requestId={result['requestId']}"


# Coordinator - delegates email tasks to the specialist
async def run_coordinator(task: str) -> str:
    async with httpx.AsyncClient() as client:
        deps = MailboxDeps(
            api_key=os.environ["MOLTED_API_KEY"],
            tenant_id=os.environ["MOLTED_TENANT_ID"],
            mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
            http_client=client,
        )
        # Delegate email task to the specialist agent
        result = await email_agent.run(task, deps=deps)
        return result.output


asyncio.run(run_coordinator(
    "Send a renewal reminder to bob@example.com - subject 'Your trial ends tomorrow', "
    "body 'Your 14-day trial ends tomorrow. Upgrade to keep access.'"
))

This keeps mailbox credentials scoped to the email agent. Other agents in your system do not have MailboxDeps and cannot call send_email directly.

9. Listen for inbound replies

Use the Molted CLI to stream inbound events to your agent so it can react to replies:

molted listen --pipe "python reply_handler.py" --events "inbound.*"

In reply_handler.py:

reply_handler.py
import sys
import json
import asyncio
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps


reply_agent = Agent(
    "openai:gpt-4o",
    deps_type=MailboxDeps,
    system_prompt="You handle inbound replies. Respond helpfully and concisely.",
)


@reply_agent.tool
async def send_email(ctx: RunContext[MailboxDeps], to: str, subject: str, body: str) -> str:
    """Send a reply email."""
    deps = ctx.deps
    payload = {
        "tenantId": deps.tenant_id,
        "mailboxId": deps.mailbox_id,
        "recipientEmail": to,
        "templateId": "_default",
        "dedupeKey": f"reply-{to}-{subject}",
        "agentId": "reply-agent",
        "payload": {"subject": subject, "html": body, "text": body},
    }
    resp = await deps.http_client.post(
        "https://api.molted.email/v1/agent/send",
        json=payload,
        headers={"Authorization": f"Bearer {deps.api_key}"},
        timeout=10,
    )
    result = resp.json()
    return f"Sent: {result['status']}"


async def handle_event(event: dict) -> None:
    if event.get("event") != "inbound.classified":
        return

    data = event["data"]
    if data.get("intent") == "support" and data.get("suggestedAction") == "reply":
        async with httpx.AsyncClient() as client:
            deps = MailboxDeps(
                api_key=os.environ["MOLTED_API_KEY"],
                tenant_id=os.environ["MOLTED_TENANT_ID"],
                mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
                http_client=client,
            )
            await reply_agent.run(
                f"Reply to {data['fromEmail']} who wrote: '{data['subject']}'.",
                deps=deps,
            )


for line in sys.stdin:
    event = json.loads(line.strip())
    asyncio.run(handle_event(event))

See Reactive Agent Guide for the full event reference and daemon setup.

What happens under the hood

When your agent calls send_email, the mailbox evaluates 20+ policy rules in sequence before anything leaves:

  1. Suppression check - has this recipient unsubscribed or bounced?
  2. Deduplication - has this dedupe_key been used within the cooldown window?
  3. Cooldown - is there an active cooldown for this recipient?
  4. Rate limits - has the mailbox or tenant exceeded its send rate (per-minute, per-hour, per-day)?
  5. Risk budget - has the agent exhausted its send budget for this period?
  6. Consent - does the contact have a valid consent record?

If all rules pass, the email is delivered. If any rule fires, the send is blocked and an immutable decision trace records which rule triggered and why. The agent sees the blockReason immediately and can respond accordingly.

The model cannot override these checks - they run at the infrastructure layer, outside the tool's return path.