Pydantic AI Integration Guide
Add a governed email mailbox to your Pydantic AI agents. Send, receive, and track email with built-in policy enforcement and type-safe tool definitions.
Pydantic AI is a type-safe Python agent framework from the creators of Pydantic. Its dependency injection system and typed tool definitions make it a natural fit for wrapping infrastructure APIs like Molted - you get compile-time safety on your tool inputs and clean separation between agent logic and email infrastructure.
This guide shows how to wire a Molted mailbox into a Pydantic AI agent as a typed tool. Every email the agent proposes runs through the Molted policy engine before delivery - 20+ rules evaluated in under a second. Policy runs at the infrastructure layer and cannot be overridden by the model.
Prerequisites
- A Molted account with an API key (sign up at molted.email/signup)
- A verified sending domain (see Domains)
- Pydantic AI installed:
pip install pydantic-ai
1. Create a mailbox for your agent
Each agent (or agent team) should have its own mailbox. Log in to the portal under Mailboxes, or create one via the API:
curl -X POST https://api.molted.email/v1/me/mailboxes \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Outbound Agent",
"emailAddress": "agent@yourdomain.com"
}'Note the mailboxId from the response - you will pass it in each send request.
2. Define your dependencies
Pydantic AI uses dependency injection to pass external services into tools. Define a dataclass to hold your Molted configuration:
from dataclasses import dataclass
import httpx
@dataclass
class MailboxDeps:
api_key: str
tenant_id: str
mailbox_id: str
http_client: httpx.AsyncClientUsing an httpx.AsyncClient in deps lets you control connection pooling and reuse the same client across all tool calls in a run.
3. Define the send tool
Use the @agent.tool decorator with typed arguments. Pydantic validates the inputs before the tool body runs:
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps
agent = Agent(
"openai:gpt-4o",
deps_type=MailboxDeps,
system_prompt="""
You are a customer outreach agent. When sending emails:
- Always supply a dedupe_key to prevent accidental duplicate sends.
- If a send is blocked by policy, report the reason to the user - do not retry.
- 'duplicate_send' means the email was already sent recently - inform the user and stop.
- 'suppressed_recipient' means this contact cannot be emailed - inform the user and stop.
- 'rate_limit_exceeded' means the send rate has been reached - wait and inform the user.
""",
)
@agent.tool
async def send_email(
ctx: RunContext[MailboxDeps],
to: str,
subject: str,
body: str,
dedupe_key: str | None = None,
) -> str:
"""Send an email to a contact through the governed mailbox.
The email is evaluated against policy rules before delivery.
Blocked sends return a reason and are not retried.
Args:
to: Recipient email address.
subject: Email subject line.
body: Email body in plain text or HTML.
dedupe_key: Optional unique key to prevent duplicate sends.
Defaults to recipient+subject if omitted.
"""
deps = ctx.deps
payload = {
"tenantId": deps.tenant_id,
"mailboxId": deps.mailbox_id,
"recipientEmail": to,
"templateId": "_default",
"dedupeKey": dedupe_key or f"{to}-{subject}",
"agentId": "pydantic-ai-agent",
"payload": {
"subject": subject,
"html": body,
"text": body,
},
}
resp = await deps.http_client.post(
"https://api.molted.email/v1/agent/send",
json=payload,
headers={"Authorization": f"Bearer {deps.api_key}"},
timeout=10,
)
resp.raise_for_status()
result = resp.json()
if result["status"] == "blocked":
return (
f"Email blocked by policy: {result['blockReason']}. "
f"Decision trace: {result['requestId']}"
)
return f"Email queued. requestId={result['requestId']}, status={result['status']}"The tool surfaces policy decisions directly to the model. If a send is blocked, the agent sees the reason and can take the appropriate action instead of failing silently or retrying a terminal block.
4. Add a delivery status tool
Your agent can check the delivery status of a previously queued email:
@agent.tool
async def check_email_status(
ctx: RunContext[MailboxDeps],
request_id: str,
) -> str:
"""Check the delivery status of a previously queued email.
Args:
request_id: The requestId returned when the email was sent.
"""
deps = ctx.deps
resp = await deps.http_client.get(
f"https://api.molted.email/v1/agent/send/{request_id}/status",
headers={"Authorization": f"Bearer {deps.api_key}"},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
provider = data.get("provider", "unknown")
last_event = data.get("lastEvent", "none")
return f"Status: {data['status']}. Provider: {provider}. Last event: {last_event}."5. Run the agent
import asyncio
import os
import httpx
from agent import agent
from deps import MailboxDeps
async def main():
async with httpx.AsyncClient() as client:
deps = MailboxDeps(
api_key=os.environ["MOLTED_API_KEY"],
tenant_id=os.environ["MOLTED_TENANT_ID"],
mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
http_client=client,
)
result = await agent.run(
"Send a trial welcome email to alice@example.com. "
"Subject: 'Welcome to the trial' "
"Body: 'Thanks for signing up - your 14-day trial starts now.'",
deps=deps,
)
print(result.output)
if __name__ == "__main__":
asyncio.run(main())6. Handle policy blocks
The policy engine may block a send for several reasons. The tool surfaces these directly to the agent.
| Reason | What it means |
|---|---|
duplicate_send | The same dedupe_key was used within the cooldown window |
rate_limit_exceeded | The mailbox or tenant has hit a send rate limit |
suppressed_recipient | The recipient has unsubscribed or bounced previously |
cooldown_active | A cooldown window is in effect for this recipient |
risk_budget_exceeded | The agent's risk budget for this period is exhausted |
Add explicit handling in your system prompt so the model knows the right action for each block - distinguishing retryable (rate_limit_exceeded) from terminal (suppressed_recipient, duplicate_send) cases.
7. Use structured outputs for email drafts
Pydantic AI's structured output support is useful for generating email drafts before sending. Define a model for the draft and let the agent fill it in before calling send_email:
import asyncio
import os
import httpx
from pydantic import BaseModel
from pydantic_ai import Agent
from deps import MailboxDeps
class EmailDraft(BaseModel):
to: str
subject: str
body: str
dedupe_key: str
draft_agent = Agent(
"openai:gpt-4o",
output_type=EmailDraft,
deps_type=MailboxDeps,
system_prompt="Draft a professional outreach email based on the context provided.",
)
async def draft_and_send(context: str) -> None:
async with httpx.AsyncClient() as client:
deps = MailboxDeps(
api_key=os.environ["MOLTED_API_KEY"],
tenant_id=os.environ["MOLTED_TENANT_ID"],
mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
http_client=client,
)
# First, draft the email as a structured object
draft_result = await draft_agent.run(context, deps=deps)
draft = draft_result.output
# Then send via the policy engine
payload = {
"tenantId": deps.tenant_id,
"mailboxId": deps.mailbox_id,
"recipientEmail": draft.to,
"templateId": "_default",
"dedupeKey": draft.dedupe_key,
"agentId": "pydantic-ai-agent",
"payload": {
"subject": draft.subject,
"html": draft.body,
"text": draft.body,
},
}
resp = await client.post(
"https://api.molted.email/v1/agent/send",
json=payload,
headers={"Authorization": f"Bearer {deps.api_key}"},
timeout=10,
)
result = resp.json()
print(f"Send result: {result['status']}")
asyncio.run(draft_and_send(
"Send a welcome email to new user alice@example.com who signed up for a 14-day trial."
))This two-step pattern - draft first, send second - is useful when you want to log or inspect the email before it reaches the policy engine.
8. Build a multi-agent setup
Pydantic AI supports agent composition. Scope mailbox access to a dedicated email agent and call it from a coordinator:
import asyncio
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps
# Email specialist - the only agent with mailbox deps
email_agent = Agent(
"openai:gpt-4o",
deps_type=MailboxDeps,
system_prompt="""
You send and track emails for the team.
Always use a dedupe_key. Report policy blocks clearly - do not retry terminal blocks.
""",
)
@email_agent.tool
async def send_email(ctx: RunContext[MailboxDeps], to: str, subject: str, body: str, dedupe_key: str | None = None) -> str:
"""Send an email through the governed mailbox."""
deps = ctx.deps
payload = {
"tenantId": deps.tenant_id,
"mailboxId": deps.mailbox_id,
"recipientEmail": to,
"templateId": "_default",
"dedupeKey": dedupe_key or f"{to}-{subject}",
"agentId": "email-specialist",
"payload": {"subject": subject, "html": body, "text": body},
}
resp = await deps.http_client.post(
"https://api.molted.email/v1/agent/send",
json=payload,
headers={"Authorization": f"Bearer {deps.api_key}"},
timeout=10,
)
resp.raise_for_status()
result = resp.json()
if result["status"] == "blocked":
return f"Blocked: {result['blockReason']} (requestId={result['requestId']})"
return f"Sent: requestId={result['requestId']}"
# Coordinator - delegates email tasks to the specialist
async def run_coordinator(task: str) -> str:
async with httpx.AsyncClient() as client:
deps = MailboxDeps(
api_key=os.environ["MOLTED_API_KEY"],
tenant_id=os.environ["MOLTED_TENANT_ID"],
mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
http_client=client,
)
# Delegate email task to the specialist agent
result = await email_agent.run(task, deps=deps)
return result.output
asyncio.run(run_coordinator(
"Send a renewal reminder to bob@example.com - subject 'Your trial ends tomorrow', "
"body 'Your 14-day trial ends tomorrow. Upgrade to keep access.'"
))This keeps mailbox credentials scoped to the email agent. Other agents in your system do not have MailboxDeps and cannot call send_email directly.
9. Listen for inbound replies
Use the Molted CLI to stream inbound events to your agent so it can react to replies:
molted listen --pipe "python reply_handler.py" --events "inbound.*"In reply_handler.py:
import sys
import json
import asyncio
import os
import httpx
from pydantic_ai import Agent, RunContext
from deps import MailboxDeps
reply_agent = Agent(
"openai:gpt-4o",
deps_type=MailboxDeps,
system_prompt="You handle inbound replies. Respond helpfully and concisely.",
)
@reply_agent.tool
async def send_email(ctx: RunContext[MailboxDeps], to: str, subject: str, body: str) -> str:
"""Send a reply email."""
deps = ctx.deps
payload = {
"tenantId": deps.tenant_id,
"mailboxId": deps.mailbox_id,
"recipientEmail": to,
"templateId": "_default",
"dedupeKey": f"reply-{to}-{subject}",
"agentId": "reply-agent",
"payload": {"subject": subject, "html": body, "text": body},
}
resp = await deps.http_client.post(
"https://api.molted.email/v1/agent/send",
json=payload,
headers={"Authorization": f"Bearer {deps.api_key}"},
timeout=10,
)
result = resp.json()
return f"Sent: {result['status']}"
async def handle_event(event: dict) -> None:
if event.get("event") != "inbound.classified":
return
data = event["data"]
if data.get("intent") == "support" and data.get("suggestedAction") == "reply":
async with httpx.AsyncClient() as client:
deps = MailboxDeps(
api_key=os.environ["MOLTED_API_KEY"],
tenant_id=os.environ["MOLTED_TENANT_ID"],
mailbox_id=os.environ["MOLTED_MAILBOX_ID"],
http_client=client,
)
await reply_agent.run(
f"Reply to {data['fromEmail']} who wrote: '{data['subject']}'.",
deps=deps,
)
for line in sys.stdin:
event = json.loads(line.strip())
asyncio.run(handle_event(event))See Reactive Agent Guide for the full event reference and daemon setup.
What happens under the hood
When your agent calls send_email, the mailbox evaluates 20+ policy rules in sequence before anything leaves:
- Suppression check - has this recipient unsubscribed or bounced?
- Deduplication - has this
dedupe_keybeen used within the cooldown window? - Cooldown - is there an active cooldown for this recipient?
- Rate limits - has the mailbox or tenant exceeded its send rate (per-minute, per-hour, per-day)?
- Risk budget - has the agent exhausted its send budget for this period?
- Consent - does the contact have a valid consent record?
If all rules pass, the email is delivered. If any rule fires, the send is blocked and an immutable decision trace records which rule triggered and why. The agent sees the blockReason immediately and can respond accordingly.
The model cannot override these checks - they run at the infrastructure layer, outside the tool's return path.
Related
- Quickstart - send your first policy-checked email
- Policy Simulation - test rules against draft sends before deploying
- Reactive Agent Guide - build agents that respond to inbound email
- Human-in-the-Loop Guide - require human approval for high-risk sends
- Agent Coordination - multi-agent leases and consensus
- Autonomy Levels - configure trust levels per mailbox
- LangChain Integration Guide
- CrewAI Integration Guide
- AutoGen Integration Guide
- OpenAI Agents SDK Integration Guide
OpenAI Agents SDK Integration Guide
Add a governed email mailbox to your OpenAI Agents SDK agents. Send, receive, and track email with built-in policy enforcement.
Google ADK Integration Guide
Add a governed email mailbox to your Google ADK agents. Send, receive, and track email with built-in policy enforcement in Python or TypeScript.