Building a Reactive Agent
Use molted listen to build agents that respond to inbound email, bounces, and delivery events in real-time.
Most agent email workflows are one-directional: the agent sends, and that is the end of it. A reactive agent closes the loop - it listens for what happened after the send, then acts on it.
molted listen turns the CLI into a persistent event loop. It connects to the SSE event stream and either prints events to your terminal or pipes them line-by-line to a long-running handler process. Your handler reads JSON from stdin and decides what to do next.
How it works
┌─────────────────┐
inbound reply ──► Molted policy engine ──► SSE stream ──────► molted listen ──► │ your handler │
│ (node/python/ │
delivery event ──────────────────────────────────────────────►│ bash/etc.) │
└─────────────────┘molted listen --pipe "node agent.js" spawns your handler as a child process and writes one JSON line to its stdin for each event. The handler stays running between events - it can maintain state, hold open connections, and take follow-up actions without being re-invoked each time.
Getting started
Install the CLI and authenticate:
npm install -g molted
molted auth loginPrint events to the terminal to see what is flowing through your mailbox:
molted listenSwitch to JSON mode for structured output:
molted listen --json
molted listen --json | jq '.event'Filter to specific event types using wildcards:
# Only inbound events
molted listen --events "inbound.*"
# Only bounce and complaint events
molted listen --events "delivery.bounced,delivery.complained"Writing a handler
A handler is any process that reads JSON lines from stdin. Each line is a complete event object:
{
"event": "inbound.classified",
"id": "evt_abc123",
"data": {
"messageId": "msg_abc123",
"fromEmail": "customer@example.com",
"toEmail": "support@yourdomain.com",
"subject": "Question about my account",
"intent": "support",
"confidence": 0.94,
"suggestedAction": "notify_owner"
},
"timestamp": "2026-04-01T12:00:00Z"
}Node.js handler
A complete reactive agent in Node.js that handles inbound replies, bounces, and complaints:
import readline from 'node:readline';
import fetch from 'node-fetch';
const API_KEY = process.env.MOLTED_API_KEY;
const TENANT_ID = process.env.MOLTED_TENANT_ID;
const API_BASE = 'https://api.molted.email';
const rl = readline.createInterface({ input: process.stdin });
rl.on('line', async (line) => {
let parsed;
try {
parsed = JSON.parse(line);
} catch {
return;
}
const { event, data } = parsed;
if (event === 'inbound.classified') {
await handleInbound(data);
} else if (event === 'delivery.bounced') {
await handleBounce(data);
} else if (event === 'delivery.complained') {
await handleComplaint(data);
}
});
async function handleInbound(data) {
const { intent, confidence, fromEmail, messageId } = data;
if (intent === 'interested' && confidence > 0.8) {
// High-confidence interest - fetch the full thread and auto-reply
const thread = await fetchThread(messageId);
await sendReply(fromEmail, buildReply(thread));
console.error(`[agent] replied to ${fromEmail} (intent: interested)`);
} else if (intent === 'support') {
// Route to support queue
await notifySupport(data);
console.error(`[agent] escalated support request from ${fromEmail}`);
} else if (intent === 'unsubscribe' || intent === 'legal') {
// Immediately suppress the contact
await suppressContact(fromEmail);
console.error(`[agent] suppressed ${fromEmail} (intent: ${intent})`);
}
}
async function handleBounce(data) {
const { contactEmail, payload } = data;
if (payload?.bounceType === 'hard') {
await suppressContact(contactEmail);
console.error(`[agent] suppressed ${contactEmail} (hard bounce)`);
}
}
async function handleComplaint(data) {
const { contactEmail } = data;
await suppressContact(contactEmail);
console.error(`[agent] suppressed ${contactEmail} (spam complaint)`);
}
async function fetchThread(messageId) {
const res = await fetch(`${API_BASE}/v1/inbound/${messageId}`, {
headers: { Authorization: `Bearer ${API_KEY}` },
});
return res.json();
}
async function suppressContact(email) {
await fetch(`${API_BASE}/v1/suppressions`, {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ tenantId: TENANT_ID, email, reason: 'agent_decision' }),
});
}
async function notifySupport(data) {
// Your notification logic here - Slack, webhook, etc.
}
async function sendReply(to, body) {
await fetch(`${API_BASE}/v1/send/request`, {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
tenantId: TENANT_ID,
recipientEmail: to,
payload: { subject: 'Re: Your message', html: body },
dedupeKey: `reply-${to}-${Date.now()}`,
}),
});
}
function buildReply(thread) {
return `<p>Thanks for your interest! Someone from our team will follow up shortly.</p>`;
}Run it:
molted listen --pipe "node agent.js" --events "inbound.*,delivery.bounced,delivery.complained"Python handler
import sys
import json
import os
import requests
API_KEY = os.environ['MOLTED_API_KEY']
TENANT_ID = os.environ['MOLTED_TENANT_ID']
API_BASE = 'https://api.molted.email'
def suppress_contact(email):
requests.post(
f'{API_BASE}/v1/suppressions',
headers={'Authorization': f'Bearer {API_KEY}'},
json={'tenantId': TENANT_ID, 'email': email, 'reason': 'agent_decision'},
)
def handle_inbound(data):
intent = data.get('intent')
confidence = data.get('confidence', 0)
from_email = data.get('fromEmail')
if intent in ('unsubscribe', 'legal'):
suppress_contact(from_email)
print(f'[agent] suppressed {from_email} (intent: {intent})', file=sys.stderr)
def handle_bounce(data):
payload = data.get('payload', {})
if payload.get('bounceType') == 'hard':
suppress_contact(data['contactEmail'])
print(f'[agent] suppressed {data["contactEmail"]} (hard bounce)', file=sys.stderr)
def handle_complaint(data):
suppress_contact(data['contactEmail'])
print(f'[agent] suppressed {data["contactEmail"]} (spam complaint)', file=sys.stderr)
handlers = {
'inbound.classified': handle_inbound,
'delivery.bounced': handle_bounce,
'delivery.complained': handle_complaint,
}
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
parsed = json.loads(line)
except json.JSONDecodeError:
continue
event = parsed.get('event')
data = parsed.get('data', {})
if event in handlers:
handlers[event](data)Run it:
molted listen --pipe "python agent.py" --events "inbound.*,delivery.bounced,delivery.complained"Bash handler
For quick prototyping or simple automation, a bash handler works well:
#!/bin/bash
while IFS= read -r line; do
EVENT=$(echo "$line" | jq -r '.event')
EMAIL=$(echo "$line" | jq -r '.data.contactEmail // .data.fromEmail // empty')
case "$EVENT" in
delivery.bounced)
BOUNCE_TYPE=$(echo "$line" | jq -r '.data.payload.bounceType // empty')
if [ "$BOUNCE_TYPE" = "hard" ] && [ -n "$EMAIL" ]; then
echo "[agent] suppressing $EMAIL (hard bounce)" >&2
molted suppressions add "$EMAIL" --reason "hard_bounce"
fi
;;
delivery.complained)
if [ -n "$EMAIL" ]; then
echo "[agent] suppressing $EMAIL (complaint)" >&2
molted suppressions add "$EMAIL" --reason "complaint"
fi
;;
esac
donechmod +x agent.sh
molted listen --pipe "./agent.sh" --events "delivery.bounced,delivery.complained"Running as a persistent daemon
For production use, run the listener under a process manager so it restarts automatically on crash or reboot.
With pm2
pm2 start "molted listen --pipe 'node agent.js' --events 'inbound.*,delivery.*'" --name my-agent
pm2 saveCheck status and view logs:
pm2 status
pm2 logs my-agent
pm2 logs my-agent --lines 50 --nostreamWith systemd
[Unit]
Description=Molted reactive agent
After=network.target
[Service]
Type=simple
User=myuser
WorkingDirectory=/opt/my-agent
ExecStart=/usr/bin/molted listen --pipe "node agent.js" --events "inbound.*,delivery.*"
Environment=MOLTED_API_KEY=your_api_key
Environment=MOLTED_TENANT_ID=your_tenant_id
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.targetsudo systemctl enable molted-agent
sudo systemctl start molted-agent
sudo journalctl -u molted-agent -fConcurrency and error handling
By default, the listener processes one event at a time (--concurrency 1). Raise this to process events in parallel:
# Process up to 5 events simultaneously
molted listen --pipe "node agent.js" --concurrency 5If your handler's stdin write fails, the listener retries up to 3 times by default. Set --retries to adjust:
molted listen --pipe "node agent.js" --retries 5When the handler process crashes, the listener automatically restarts it. Your handler does not need to manage its own crash recovery.
Replay and recovery
If your agent goes offline, resume from where it left off using --since:
# Resume from a specific event ID
molted listen --pipe "node agent.js" --since evt_abc123Save event.id from each processed event to a file or database. On restart, pass the last processed ID to --since. This gives you at-least-once delivery semantics: your handler should be idempotent for the event types it cares about.
Event reference
The full list of event types that can arrive on the stream:
| Event | Fires when |
|---|---|
delivery.queued | Email accepted and queued. |
delivery.accepted | Email accepted by the provider. |
delivery.sent | Email sent to the provider. |
delivery.delivered | Email delivered to the recipient's inbox. |
delivery.deferred | Delivery temporarily deferred. |
delivery.bounced | Email bounced (hard or soft). |
delivery.complained | Recipient marked as spam. |
delivery.failed | Delivery failed permanently. |
inbound.classified | Inbound message classified with intent. |
inbound.routed | Inbound message routed to a handler. |
policy.blocked | Send request blocked by the policy engine. |
send.queued | Send request queued for processing. |
send.approval_pending | Send requires human approval. |
send.approval_decided | Approval decision made. |
followup.scheduled | A follow-up email scheduled. |
followup.executed | A scheduled follow-up sent. |
journey.step_completed | A journey step completed for a contact. |
journey.completed | A journey run completed. |
coordination.lease_acquired | An agent acquired a contact lease. |
coordination.lease_released | A contact lease was released. |
coordination.consensus_requested | A consensus vote was created. |
For full payload shapes, see Event Streaming.