Email storage and processing
Store and process incoming emails using KV storage and queue systems for support tickets and workflow automation
Store and process incoming emails with comprehensive storage, queue processing, and support ticket automation for streamlined email workflow management.
Store emails in KV namespace for later processing:
interface Env { EMAIL: SendEmail; EMAILS: KVNamespace; SUPPORT_TICKETS: KVNamespace;}
export default { async email(message, env, ctx): Promise<void> { const emailId = `email-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Read email content const reader = message.raw.getReader(); const chunks = [];
try { while (true) { const { done, value } = await reader.read(); if (done) break; chunks.push(value); }
const decoder = new TextDecoder(); const rawContent = decoder.decode( new Uint8Array(chunks.reduce((acc, chunk) => [...acc, ...chunk], [])), );
// Store email metadata and content const emailData = { id: emailId, from: message.from, to: message.to, subject: message.headers.get("subject"), timestamp: new Date().toISOString(), size: message.rawSize, rawContent: rawContent, processed: false, };
await env.EMAILS.put(emailId, JSON.stringify(emailData));
// Process based on recipient if (message.to.includes("support@")) { await handleSupportEmail(message, env, emailId); } else { await message.forward("general@company.com"); } } finally { reader.releaseLock(); } },};
async function handleSupportEmail(message, env, emailId) { const ticketId = `TICKET-${Date.now()}`;
// Create support ticket const ticketData = { id: ticketId, emailId: emailId, from: message.from, subject: message.headers.get("subject"), status: "open", priority: "normal", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), };
await env.SUPPORT_TICKETS.put(ticketId, JSON.stringify(ticketData));
// Send auto-reply with ticket number await env.EMAIL.send({ to: message.from, from: "support@company.com", subject: `Support Ticket Created: ${ticketId}`, html: ` <h1>Support Ticket Created</h1> <p>Your support request has been received and assigned ticket number: <strong>${ticketId}</strong></p> <p>We will respond within 2-4 hours during business hours.</p> <hr> <p><em>Original subject: ${message.headers.get("subject")}</em></p> `, });
// Forward to support team await message.forward("support-team@company.com");}Process emails asynchronously using Cloudflare Queues:
interface Env { EMAIL: SendEmail; EMAIL_QUEUE: Queue; EMAIL_STORAGE: KVNamespace; EMAIL_ANALYTICS: AnalyticsEngine;}
interface EmailQueueMessage { emailId: string; from: string; to: string; subject: string; timestamp: string; priority: "low" | "normal" | "high" | "urgent"; category: string;}
export default { // Handle incoming emails async email(message, env, ctx): Promise<void> { const emailId = `email-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Store raw email content const reader = message.raw.getReader(); const chunks = [];
try { while (true) { const { done, value } = await reader.read(); if (done) break; chunks.push(value); }
const decoder = new TextDecoder(); const rawContent = decoder.decode( new Uint8Array(chunks.reduce((acc, chunk) => [...acc, ...chunk], [])), );
// Store email with metadata const emailData = { id: emailId, from: message.from, to: message.to, subject: message.headers.get("subject"), timestamp: new Date().toISOString(), size: message.rawSize, rawContent: rawContent, processed: false, status: "queued", };
await env.EMAIL_STORAGE.put(emailId, JSON.stringify(emailData));
// Determine priority and category const priority = determinePriority(message); const category = determineCategory(message);
// Queue email for processing const queueMessage: EmailQueueMessage = { emailId, from: message.from, to: message.to, subject: message.headers.get("subject") || "", timestamp: new Date().toISOString(), priority, category, };
await env.EMAIL_QUEUE.send(queueMessage, { delaySeconds: priority === "urgent" ? 0 : priority === "high" ? 5 : 30, });
// Send immediate auto-reply await env.EMAIL.send({ to: message.from, from: message.to, subject: `Re: ${message.headers.get("subject")}`, text: "Thank you for your message. It has been queued for processing.", }); } finally { reader.releaseLock(); } },
// Process queued emails async queue(batch, env, ctx): Promise<void> { console.log(`📥 Processing ${batch.messages.length} queued emails`);
for (const message of batch.messages) { try { const emailData = message.body as EmailQueueMessage;
console.log( `📧 Processing ${emailData.category} email from ${emailData.from}`, );
// Get stored email content const storedEmailData = await env.EMAIL_STORAGE.get(emailData.emailId); if (!storedEmailData) { console.error(`Email data not found: ${emailData.emailId}`); message.ack(); continue; }
const emailContent = JSON.parse(storedEmailData);
// Process based on category let processResult; switch (emailData.category) { case "support": processResult = await processSupport(emailData, emailContent, env); break; case "sales": processResult = await processSales(emailData, emailContent, env); break; case "billing": processResult = await processBilling(emailData, emailContent, env); break; default: processResult = await processGeneral(emailData, emailContent, env); }
// Update email status emailContent.processed = true; emailContent.status = "completed"; emailContent.processedAt = new Date().toISOString(); emailContent.processingResult = processResult;
await env.EMAIL_STORAGE.put( emailData.emailId, JSON.stringify(emailContent), );
// Track processing metrics env.EMAIL_ANALYTICS?.writeDataPoint({ blobs: [ "email_processed", emailData.from, emailData.to, emailData.category, emailData.priority, ], doubles: [1, emailContent.size], indexes: [ `category:${emailData.category}`, `priority:${emailData.priority}`, ], });
message.ack(); } catch (error) { console.error("Failed to process email:", error); message.retry(); } } },};
function determinePriority(message): "low" | "normal" | "high" | "urgent" { const subject = (message.headers.get("subject") || "").toLowerCase(); const to = message.to.toLowerCase();
if (subject.includes("urgent") || subject.includes("emergency")) { return "urgent"; }
if ( to.includes("support") && (subject.includes("down") || subject.includes("error")) ) { return "high"; }
if (to.includes("sales") || to.includes("billing")) { return "high"; }
return "normal";}
function determineCategory(message): string { const to = message.to.toLowerCase(); const subject = (message.headers.get("subject") || "").toLowerCase();
if ( to.includes("support") || subject.includes("help") || subject.includes("issue") ) { return "support"; }
if ( to.includes("sales") || subject.includes("quote") || subject.includes("pricing") ) { return "sales"; }
if ( to.includes("billing") || subject.includes("invoice") || subject.includes("payment") ) { return "billing"; }
return "general";}
async function processSupport( emailData: EmailQueueMessage, emailContent: any, env: Env,) { const ticketId = `TICKET-${Date.now()}`;
// Create support ticket const ticketData = { id: ticketId, emailId: emailData.emailId, from: emailData.from, subject: emailData.subject, priority: emailData.priority, status: "open", category: "support", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), content: emailContent.rawContent.substring(0, 5000), // Limit stored content };
await env.SUPPORT_TICKETS?.put(ticketId, JSON.stringify(ticketData));
// Send confirmation email await env.EMAIL.send({ to: emailData.from, from: "support@company.com", subject: `Support Ticket Created: ${ticketId}`, html: ` <h2>Support Ticket Created</h2> <p>Your support request has been received and assigned ticket number: <strong>${ticketId}</strong></p> <p><strong>Priority:</strong> ${emailData.priority}</p> <p>We will respond based on the priority level:</p> <ul> <li><strong>Urgent:</strong> Within 1 hour</li> <li><strong>High:</strong> Within 4 hours</li> <li><strong>Normal:</strong> Within 24 hours</li> </ul> <hr> <p><em>Original subject: ${emailData.subject}</em></p> `, });
return { ticketId, action: "ticket_created" };}
async function processSales( emailData: EmailQueueMessage, emailContent: any, env: Env,) { // Create sales lead const leadId = `LEAD-${Date.now()}`;
const leadData = { id: leadId, emailId: emailData.emailId, contact: emailData.from, subject: emailData.subject, priority: emailData.priority, status: "new", source: "email", createdAt: new Date().toISOString(), };
await env.SALES_LEADS?.put(leadId, JSON.stringify(leadData));
// Send sales response await env.EMAIL.send({ to: emailData.from, from: "sales@company.com", subject: `Re: ${emailData.subject}`, html: ` <h2>Thank you for your interest!</h2> <p>We've received your sales inquiry and assigned it reference: <strong>${leadId}</strong></p> <p>A member of our sales team will contact you within 24 hours.</p> <p>Best regards,<br>Sales Team</p> `, });
return { leadId, action: "lead_created" };}
async function processBilling( emailData: EmailQueueMessage, emailContent: any, env: Env,) { // Handle billing inquiries await env.EMAIL.send({ to: emailData.from, from: "billing@company.com", subject: `Re: ${emailData.subject}`, html: ` <h2>Billing Inquiry Received</h2> <p>Thank you for contacting our billing department.</p> <p>Your inquiry has been forwarded to our billing specialists who will respond within 2 business hours.</p> <p>For immediate assistance, please call: +1-800-555-0123</p> `, });
return { action: "billing_forwarded" };}
async function processGeneral( emailData: EmailQueueMessage, emailContent: any, env: Env,) { // Handle general inquiries await env.EMAIL.send({ to: emailData.from, from: "info@company.com", subject: `Re: ${emailData.subject}`, text: ` Thank you for contacting us.
We have received your message and will respond within 48 hours.
For urgent matters, please contact our support team at support@company.com.
Best regards, Customer Service Team `, });
return { action: "general_acknowledged" };}This email storage system provides comprehensive email processing with KV storage, queue-based processing, and automated responses for different email categories.