MOLTED EMAIL

Building a Reactive Agent

Use molted listen to build agents that respond to inbound email, bounces, and delivery events in real-time.

Most agent email workflows are one-directional: the agent sends, and that is the end of it. A reactive agent closes the loop - it listens for what happened after the send, then acts on it.

molted listen turns the CLI into a persistent event loop. It connects to the SSE event stream and either prints events to your terminal or pipes them line-by-line to a long-running handler process. Your handler reads JSON from stdin and decides what to do next.

How it works

                                                              ┌─────────────────┐
inbound reply ──► Molted policy engine ──► SSE stream ──────► molted listen ──► │  your handler   │
                                                              │  (node/python/  │
delivery event ──────────────────────────────────────────────►│   bash/etc.)    │
                                                              └─────────────────┘

molted listen --pipe "node agent.js" spawns your handler as a child process and writes one JSON line to its stdin for each event. The handler stays running between events - it can maintain state, hold open connections, and take follow-up actions without being re-invoked each time.

Getting started

Install the CLI and authenticate:

npm install -g molted
molted auth login

Print events to the terminal to see what is flowing through your mailbox:

molted listen

Switch to JSON mode for structured output:

molted listen --json
molted listen --json | jq '.event'

Filter to specific event types using wildcards:

# Only inbound events
molted listen --events "inbound.*"

# Only bounce and complaint events
molted listen --events "delivery.bounced,delivery.complained"

Writing a handler

A handler is any process that reads JSON lines from stdin. Each line is a complete event object:

{
  "event": "inbound.classified",
  "id": "evt_abc123",
  "data": {
    "messageId": "msg_abc123",
    "fromEmail": "customer@example.com",
    "toEmail": "support@yourdomain.com",
    "subject": "Question about my account",
    "intent": "support",
    "confidence": 0.94,
    "suggestedAction": "notify_owner"
  },
  "timestamp": "2026-04-01T12:00:00Z"
}

Node.js handler

A complete reactive agent in Node.js that handles inbound replies, bounces, and complaints:

agent.js
import readline from 'node:readline';
import fetch from 'node-fetch';

const API_KEY = process.env.MOLTED_API_KEY;
const TENANT_ID = process.env.MOLTED_TENANT_ID;
const API_BASE = 'https://api.molted.email';

const rl = readline.createInterface({ input: process.stdin });

rl.on('line', async (line) => {
  let parsed;
  try {
    parsed = JSON.parse(line);
  } catch {
    return;
  }

  const { event, data } = parsed;

  if (event === 'inbound.classified') {
    await handleInbound(data);
  } else if (event === 'delivery.bounced') {
    await handleBounce(data);
  } else if (event === 'delivery.complained') {
    await handleComplaint(data);
  }
});

async function handleInbound(data) {
  const { intent, confidence, fromEmail, messageId } = data;

  if (intent === 'interested' && confidence > 0.8) {
    // High-confidence interest - fetch the full thread and auto-reply
    const thread = await fetchThread(messageId);
    await sendReply(fromEmail, buildReply(thread));
    console.error(`[agent] replied to ${fromEmail} (intent: interested)`);
  } else if (intent === 'support') {
    // Route to support queue
    await notifySupport(data);
    console.error(`[agent] escalated support request from ${fromEmail}`);
  } else if (intent === 'unsubscribe' || intent === 'legal') {
    // Immediately suppress the contact
    await suppressContact(fromEmail);
    console.error(`[agent] suppressed ${fromEmail} (intent: ${intent})`);
  }
}

async function handleBounce(data) {
  const { contactEmail, payload } = data;

  if (payload?.bounceType === 'hard') {
    await suppressContact(contactEmail);
    console.error(`[agent] suppressed ${contactEmail} (hard bounce)`);
  }
}

async function handleComplaint(data) {
  const { contactEmail } = data;
  await suppressContact(contactEmail);
  console.error(`[agent] suppressed ${contactEmail} (spam complaint)`);
}

async function fetchThread(messageId) {
  const res = await fetch(`${API_BASE}/v1/inbound/${messageId}`, {
    headers: { Authorization: `Bearer ${API_KEY}` },
  });
  return res.json();
}

async function suppressContact(email) {
  await fetch(`${API_BASE}/v1/suppressions`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({ tenantId: TENANT_ID, email, reason: 'agent_decision' }),
  });
}

async function notifySupport(data) {
  // Your notification logic here - Slack, webhook, etc.
}

async function sendReply(to, body) {
  await fetch(`${API_BASE}/v1/send/request`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      tenantId: TENANT_ID,
      recipientEmail: to,
      payload: { subject: 'Re: Your message', html: body },
      dedupeKey: `reply-${to}-${Date.now()}`,
    }),
  });
}

function buildReply(thread) {
  return `<p>Thanks for your interest! Someone from our team will follow up shortly.</p>`;
}

Run it:

molted listen --pipe "node agent.js" --events "inbound.*,delivery.bounced,delivery.complained"

Python handler

agent.py
import sys
import json
import os
import requests

API_KEY = os.environ['MOLTED_API_KEY']
TENANT_ID = os.environ['MOLTED_TENANT_ID']
API_BASE = 'https://api.molted.email'

def suppress_contact(email):
    requests.post(
        f'{API_BASE}/v1/suppressions',
        headers={'Authorization': f'Bearer {API_KEY}'},
        json={'tenantId': TENANT_ID, 'email': email, 'reason': 'agent_decision'},
    )

def handle_inbound(data):
    intent = data.get('intent')
    confidence = data.get('confidence', 0)
    from_email = data.get('fromEmail')

    if intent in ('unsubscribe', 'legal'):
        suppress_contact(from_email)
        print(f'[agent] suppressed {from_email} (intent: {intent})', file=sys.stderr)

def handle_bounce(data):
    payload = data.get('payload', {})
    if payload.get('bounceType') == 'hard':
        suppress_contact(data['contactEmail'])
        print(f'[agent] suppressed {data["contactEmail"]} (hard bounce)', file=sys.stderr)

def handle_complaint(data):
    suppress_contact(data['contactEmail'])
    print(f'[agent] suppressed {data["contactEmail"]} (spam complaint)', file=sys.stderr)

handlers = {
    'inbound.classified': handle_inbound,
    'delivery.bounced': handle_bounce,
    'delivery.complained': handle_complaint,
}

for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    try:
        parsed = json.loads(line)
    except json.JSONDecodeError:
        continue

    event = parsed.get('event')
    data = parsed.get('data', {})

    if event in handlers:
        handlers[event](data)

Run it:

molted listen --pipe "python agent.py" --events "inbound.*,delivery.bounced,delivery.complained"

Bash handler

For quick prototyping or simple automation, a bash handler works well:

agent.sh
#!/bin/bash

while IFS= read -r line; do
  EVENT=$(echo "$line" | jq -r '.event')
  EMAIL=$(echo "$line" | jq -r '.data.contactEmail // .data.fromEmail // empty')

  case "$EVENT" in
    delivery.bounced)
      BOUNCE_TYPE=$(echo "$line" | jq -r '.data.payload.bounceType // empty')
      if [ "$BOUNCE_TYPE" = "hard" ] && [ -n "$EMAIL" ]; then
        echo "[agent] suppressing $EMAIL (hard bounce)" >&2
        molted suppressions add "$EMAIL" --reason "hard_bounce"
      fi
      ;;
    delivery.complained)
      if [ -n "$EMAIL" ]; then
        echo "[agent] suppressing $EMAIL (complaint)" >&2
        molted suppressions add "$EMAIL" --reason "complaint"
      fi
      ;;
  esac
done
chmod +x agent.sh
molted listen --pipe "./agent.sh" --events "delivery.bounced,delivery.complained"

Running as a persistent daemon

For production use, run the listener under a process manager so it restarts automatically on crash or reboot.

With pm2

pm2 start "molted listen --pipe 'node agent.js' --events 'inbound.*,delivery.*'" --name my-agent
pm2 save

Check status and view logs:

pm2 status
pm2 logs my-agent
pm2 logs my-agent --lines 50 --nostream

With systemd

/etc/systemd/system/molted-agent.service
[Unit]
Description=Molted reactive agent
After=network.target

[Service]
Type=simple
User=myuser
WorkingDirectory=/opt/my-agent
ExecStart=/usr/bin/molted listen --pipe "node agent.js" --events "inbound.*,delivery.*"
Environment=MOLTED_API_KEY=your_api_key
Environment=MOLTED_TENANT_ID=your_tenant_id
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
sudo systemctl enable molted-agent
sudo systemctl start molted-agent
sudo journalctl -u molted-agent -f

Concurrency and error handling

By default, the listener processes one event at a time (--concurrency 1). Raise this to process events in parallel:

# Process up to 5 events simultaneously
molted listen --pipe "node agent.js" --concurrency 5

If your handler's stdin write fails, the listener retries up to 3 times by default. Set --retries to adjust:

molted listen --pipe "node agent.js" --retries 5

When the handler process crashes, the listener automatically restarts it. Your handler does not need to manage its own crash recovery.

Replay and recovery

If your agent goes offline, resume from where it left off using --since:

# Resume from a specific event ID
molted listen --pipe "node agent.js" --since evt_abc123

Save event.id from each processed event to a file or database. On restart, pass the last processed ID to --since. This gives you at-least-once delivery semantics: your handler should be idempotent for the event types it cares about.

Event reference

The full list of event types that can arrive on the stream:

EventFires when
delivery.queuedEmail accepted and queued.
delivery.acceptedEmail accepted by the provider.
delivery.sentEmail sent to the provider.
delivery.deliveredEmail delivered to the recipient's inbox.
delivery.deferredDelivery temporarily deferred.
delivery.bouncedEmail bounced (hard or soft).
delivery.complainedRecipient marked as spam.
delivery.failedDelivery failed permanently.
inbound.classifiedInbound message classified with intent.
inbound.routedInbound message routed to a handler.
policy.blockedSend request blocked by the policy engine.
send.queuedSend request queued for processing.
send.approval_pendingSend requires human approval.
send.approval_decidedApproval decision made.
followup.scheduledA follow-up email scheduled.
followup.executedA scheduled follow-up sent.
journey.step_completedA journey step completed for a contact.
journey.completedA journey run completed.
coordination.lease_acquiredAn agent acquired a contact lease.
coordination.lease_releasedA contact lease was released.
coordination.consensus_requestedA consensus vote was created.

For full payload shapes, see Event Streaming.