通过 AWS SES 异步发送电子邮件

已发表: 2022-03-10
快速总结 ↬一次发送许多事务性电子邮件,如果架构不当,可能会成为应用程序的瓶颈并降低用户体验。 部分问题是从应用程序内同步连接到 SMTP 服务器。 在本文中,我们将探讨如何使用 AWS S3、Lambda 和 SES 的组合从应用程序外部异步发送电子邮件。

大多数应用程序都会发送电子邮件与用户交流。 交易电子邮件是由用户与应用程序交互触发的电子邮件,例如在网站注册后欢迎新用户、为用户提供重置密码的链接或在用户购买后附上发票。 所有这些以前的情况通常只需要向用户发送一封电子邮件。 但在其他一些情况下,应用程序需要发送更多电子邮件,例如当用户在网站上发布新内容时,她的所有关注者(在 Twitter 等平台上可能有数百万用户)将收到通知。 在后一种情况下,如果架构不正确,发送电子邮件可能会成为应用程序的瓶颈。

这就是我的情况。 我有一个网站可能需要在一些用户触发的操作(例如用户通知她的所有关注者)之后发送 20 封电子邮件。 最初,它依赖于通过流行的基于云的 SMTP 提供商(例如 SendGrid、Mandrill、Mailjet 和 Mailgun)发送电子邮件,但回复用户需要几秒钟的时间。 显然,连接到 SMTP 服务器以发送这 20 封电子邮件会显着减慢处理速度。

经过检查,我发现了问题的根源:

  1. 同步连接
    应用程序连接到 SMTP 服务器并同步等待确认,然后继续执行该过程。
  2. 高延迟
    虽然我的服务器位于新加坡,但我使用的 SMTP 提供商的服务器位于美国,因此往返连接需要相当长的时间。
  3. 没有 SMTP 连接的可重用性
    当调用函数发送电子邮件时,函数会立即发送电子邮件,并在此时创建一​​个新的 SMTP 连接(它不提供在请求结束时收集所有电子邮件并在单个 SMTP 下将它们一起发送联系)。

由于#1,用户必须等待响应的时间与发送电子邮件所需的时间相关联。 由于#2,发送一封电子邮件的时间相对较长。 由于#3,发送 20 封电子邮件的时间是发送一封电子邮件所需时间的 20 倍。 虽然只发送一封电子邮件可能不会使应用程序变得非常慢,但发送 20 封电子邮件肯定会影响用户体验。

让我们看看如何解决这个问题。

跳跃后更多! 继续往下看↓

注意交易电子邮件的性质

首先,我们必须注意到并非所有电子邮件的重要性都相同。 我们可以将电子邮件大致分为两组:优先级电子邮件和非优先级电子邮件。 例如,如果用户忘记了访问帐户的密码,她会立即在收件箱中看到带有密码重置链接的电子邮件; 这是一个优先电子邮件。 相反,发送一封电子邮件通知我们关注的人发布了新内容不需要立即到达用户的收件箱; 那是一封非优先邮件。

该解决方案必须优化这两类电子邮件的发送方式。 假设在此过程中只有少数(可能是 1 或 2)封优先邮件要发送,并且大部分邮件都是非优先邮件,那么我们设计的解决方案如下:

  • 优先级电子邮件可以通过使用位于部署应用程序的同一区域的 SMTP 提供程序来简单地避免高延迟问题。 除了良好的研究之外,这还涉及将我们的应用程序与提供者的 API 集成。
  • 非优先邮件可以异步发送,也可以分批发送,多封邮件一起发送。 在应用程序级别实施,它需要适当的技术堆栈。

接下来让我们定义技术堆栈以异步发送电子邮件。

定义技术堆栈

注意:我决定将我的堆栈基于 AWS 服务,因为我的网站已经托管在 AWS EC2 上。 否则,我会因在几家公司的网络之间移动数据而产生开销。 但是,我们也可以使用其他云服务提供商来实施我们的解决方案。

我的第一种方法是设置队列。 通过队列,我可以让应用程序不再发送电子邮件,而是在队列中发布包含电子邮件内容和元数据的消息,然后让另一个进程从队列中提取消息并发送电子邮件。

但是,在检查 AWS 的队列服务(称为 SQS)时,我认为这不是一个合适的解决方案,因为:

  • 设置相当复杂;
  • 标准队列消息最多只能存储 256 kb 的信息,如果电子邮件有附件(例如发票),这可能还不够。 即使可以将大消息拆分为较小的消息,复杂性也会增加更多。

然后我意识到我可以通过结合其他更容易设置的 AWS 服务 S3 和 Lambda 来完美地模仿队列的行为。 S3 是一种用于存储和检索数据的云对象存储解决方案,可以充当上传消息的存储库,而运行代码以响应事件的计算服务 Lambda 可以选择消息并使用它执行操作。

换句话说,我们可以像这样设置我们的电子邮件发送过程:

  1. 应用程序将包含电子邮件内容 + 元数据的文件上传到 S3 存储桶。
  2. 每当有新文件上传到 S3 存储桶时,S3 都会触发一个包含新文件路径的事件。
  3. Lambda 函数选择事件、读取文件并发送电子邮件。

最后,我们必须决定如何发送电子邮件。 我们可以继续使用我们已有的 SMTP 提供商,让 Lambda 函数与他们的 API 交互,或者使用 AWS 服务发送电子邮件,称为 SES。 使用 SES 既有优点也有缺点:

好处:

  • 在 AWS Lambda 中使用非常简单(只需 2 行代码)。
  • 更便宜:Lambda 费用是根据执行函数所需的时间计算的,因此从 AWS 网络内连接到 SES 将比连接到外部服务器花费更短的时间,从而使函数更早完成并且成本更低. (除非 SES 在托管应用程序的同一区域不可用;就我而言,因为在我的 EC2 服务器所在的亚太地区(新加坡)区域不提供 SES,那么我最好连接到一些亚洲的外部 SMTP 提供商)。

缺点:

  • 提供的用于监控我们已发送电子邮件的统计数据不多,添加更强大的统计数据需要额外的努力(例如:跟踪打开的电子邮件百分比或点击的链接,必须通过 AWS CloudWatch 设置)。
  • 如果我们继续使用 SMTP 提供程序来发送优先级电子邮件,那么我们将不会将我们的统计信息放在一个地方。

为简单起见,在下面的代码中,我们将使用 SES。

然后,我们定义了流程和堆栈的逻辑如下:应用程序像往常一样发送优先级电子邮件,但对于非优先级电子邮件,它会将包含电子邮件内容和元数据的文件上传到 S3; 此文件由 Lambda 函数异步处理,该函数连接到 SES 以发送电子邮件。

让我们开始实施解决方案。

区分优先级和非优先级电子邮件

简而言之,这一切都取决于应用程序,因此我们需要逐个电子邮件地决定。 我将描述一个我为 WordPress 实现的解决方案,它需要围绕函数wp_mail的约束进行一些修改。 对于其他平台,以下策略也可以使用,但很可能会有更好的策略,不需要 hack 即可工作。

在 WordPress 中发送电子邮件的方法是调用函数wp_mail ,我们不想改变它(例如:通过调用函数wp_mail_synchronouswp_mail_asynchronous ),所以我们的wp_mail实现需要同时处理同步和异步情况,并且需要知道电子邮件属于哪个组。 不幸的是, wp_mail没有提供任何额外的参数来评估这些信息,从它的签名可以看出:

 function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() )

然后,为了找出电子邮件的类别,我们添加了一个 hacky 解决方案:默认情况下,我们使电子邮件属于优先级组,如果$to包含特定电子邮件(例如:[email protected]),或者如果$subject以特殊字符串开头(例如:“[Non-priority!]”),则它属于非优先组(我们从主题中删除相应的电子邮件或字符串)。 wp_mail是一个可插入的函数,所以我们可以简单地通过在我们的functions.php 文件上实现一个具有相同签名的新函数来覆盖它。 最初,它包含与原始wp_mail函数相同的代码,位于文件 wp-includes/pluggable.php 中,用于提取所有参数:

 if ( !function_exists( 'wp_mail' ) ) : function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() ) { $atts = apply_filters( 'wp_mail', compact( 'to', 'subject', 'message', 'headers', 'attachments' ) ); if ( isset( $atts['to'] ) ) { $to = $atts['to']; } if ( !is_array( $to ) ) { $to = explode( ',', $to ); } if ( isset( $atts['subject'] ) ) { $subject = $atts['subject']; } if ( isset( $atts['message'] ) ) { $message = $atts['message']; } if ( isset( $atts['headers'] ) ) { $headers = $atts['headers']; } if ( isset( $atts['attachments'] ) ) { $attachments = $atts['attachments']; } if ( ! is_array( $attachments ) ) { $attachments = explode( "\n", str_replace( "\r\n", "\n", $attachments ) ); } // Continue below... } endif;

然后我们检查它是否是非优先级的,在这种情况下,我们在函数send_asynchronous_mail下分叉到一个单独的逻辑,如果不是,我们继续执行与原始wp_mail函数中相同的代码:

 function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() ) { // Continued from above... $hacky_email = "[email protected]"; if (in_array($hacky_email, $to)) { // Remove the hacky email from $to array_splice($to, array_search($hacky_email, $to), 1); // Fork to asynchronous logic return send_asynchronous_mail($to, $subject, $message, $headers, $attachments); } // Continue all code from original function in wp-includes/pluggable.php // ... }

在我们的函数send_asynchronous_mail中,我们不是直接将电子邮件上传到 S3,而是将电子邮件添加到全局变量$emailqueue中,我们可以在请求结束时通过单个连接将所有电子邮件一起上传到 S3:

 function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { global $emailqueue; if (!$emailqueue) { $emailqueue = array(); } // Add email to queue. Code continues below... }

我们可以为每封电子邮件上传一个文件,也可以将它们捆绑在一起,以便在一个文件中包含许多电子邮件。 由于$headers包含电子邮件元数据(发件人、内容类型和字符集、CC、BCC 和回复字段),我们可以将具有相同$headers的电子邮件分组在一起。 这样,这些电子邮件都可以在同一个文件中上传到 S3,并且$headers元信息将仅包含在文件中一次,而不是每封电子邮件一次:

 function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { // Continued from above... // Add email to the queue $emailqueue[$headers] = $emailqueue[$headers] ?? array(); $emailqueue[$headers][] = array( 'to' => $to, 'subject' => $subject, 'message' => $message, 'attachments' => $attachments, ); // Code continues below }

最后,函数send_asynchronous_mail返回true 。 请注意,这段代码是 hacky: true通常意味着电子邮件已成功发送,但在这种情况下,它甚至还没有发送,它可能完全失败。 因此,调用wp_mail的函数不能将true的响应视为“电子邮件已成功发送”,而应将其视为已入队的确认。 这就是为什么将这种技术限制在非优先电子邮件中很重要,这样如果失败,该过程可以在后台继续重试,并且用户不会期望电子邮件已经在她的收件箱中:

 function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { // Continued from above... // That's it! return true; }

将电子邮件上传到 S3

在我之前的文章“通过 AWS S3 在多台服务器之间共享数据”中,我介绍了如何在 S3 中创建存储桶,以及如何通过 SDK 将文件上传到存储桶。 下面的所有代码都继续为 WordPress 实施解决方案,因此我们使用 SDK for PHP 连接到 AWS。

我们可以从抽象类AWS_S3 (在我之前的文章中介绍过)扩展以连接到 S3,并在请求结束时将电子邮件上传到存储桶“async-emails”(通过wp_footer挂钩触发)。 请注意,我们必须将 ACL 保持为“私有”,因为我们不希望电子邮件暴露在互联网上:

 class AsyncEmails_AWS_S3 extends AWS_S3 { function __construct() { // Send all emails at the end of the execution add_action("wp_footer", array($this, "upload_emails_to_s3"), PHP_INT_MAX); } protected function get_acl() { return "private"; } protected function get_bucket() { return "async-emails"; } function upload_emails_to_s3() { $s3Client = $this->get_s3_client(); // Code continued below... } } new AsyncEmails_AWS_S3();

我们开始遍历保存在全局变量$emailqueue中的 headers => emaildata 对,并从函数get_default_email_meta中获取默认配置以判断 headers 是否为空。 在下面的代码中,我只从 headers 中检索“from”字段(提取所有 headers 的代码可以从原始函数wp_mail复制):

 class AsyncEmails_AWS_S3 extends AWS_S3 { public function get_default_email_meta() { // Code continued from above... return array( 'from' => sprintf( '%s <%s>', get_bloginfo('name'), get_bloginfo('admin_email') ), 'contentType' => 'text/html', 'charset' => strtolower(get_option('blog_charset')) ); } public function upload_emails_to_s3() { // Code continued from above... global $emailqueue; foreach ($emailqueue as $headers => $emails) { $meta = $this->get_default_email_meta(); // Retrieve the "from" from the headers $regexp = '/From:\s*(([^\<]*?) <)? ?\s*\n/i'; if(preg_match($regexp, $headers, $matches)) { $meta['from'] = sprintf( '%s <%s>', $matches[2], $matches[3] ); } // Code continued below... } } } class AsyncEmails_AWS_S3 extends AWS_S3 { public function get_default_email_meta() { // Code continued from above... return array( 'from' => sprintf( '%s <%s>', get_bloginfo('name'), get_bloginfo('admin_email') ), 'contentType' => 'text/html', 'charset' => strtolower(get_option('blog_charset')) ); } public function upload_emails_to_s3() { // Code continued from above... global $emailqueue; foreach ($emailqueue as $headers => $emails) { $meta = $this->get_default_email_meta(); // Retrieve the "from" from the headers $regexp = '/From:\s*(([^\<]*?) <)? ?\s*\n/i'; if(preg_match($regexp, $headers, $matches)) { $meta['from'] = sprintf( '%s <%s>', $matches[2], $matches[3] ); } // Code continued below... } } }

最后,我们将电子邮件上传到 S3。 我们决定每个文件上传多少封电子邮件以节省资金。 Lambda 函数根据它们需要执行的时间量收费,按 100 毫秒的跨度计算。 一个函数需要的时间越多,它就变得越昂贵。

因此,通过每封电子邮件上传 1 个文件来发送所有电子邮件比每封电子邮件上传 1 个文件更昂贵,因为执行该函数的开销是每封电子邮件计算一次,而不是为许多电子邮件只计算一次,而且还因为发送许多电子邮件一起更彻底地填充了 100ms 跨度。

所以我们每个文件上传许多电子邮件。 多少封电子邮件? Lambda 函数有一个最大执行时间(默认为 3 秒),如果操作失败,它将继续从头开始重试,而不是从失败的地方重试。 因此,如果文件包含 100 封电子邮件,并且 Lambda 设法在最大时间结束之前发送了 50 封电子邮件,那么它会失败并再次尝试执行该操作,再次发送前 50 封电子邮件。 为避免这种情况,我们必须为每个文件选择一些我们确信足以在最长时间结束之前处理的电子邮件。 在我们的情况下,我们可以选择每个文件发送 25 封电子邮件。 电子邮件的数量取决于应用程序(较大的电子邮件将需要更长的时间来发送,而发送电子邮件的时间将取决于基础设施),因此我们应该进行一些测试以得出正确的数量。

该文件的内容只是一个 JSON 对象,包含属性“meta”下的电子邮件元数据,以及属性“emails”下的电子邮件块:

 class AsyncEmails_AWS_S3 extends AWS_S3 { public function upload_emails_to_s3() { // Code continued from above... foreach ($emailqueue as $headers => $emails) { // Code continued from above... // Split the emails into chunks of no more than the value of constant EMAILS_PER_FILE: $chunks = array_chunk($emails, EMAILS_PER_FILE); $filename = time().rand(); for ($chunk_count = 0; $chunk_count < count($chunks); $chunk_count++) { $body = array( 'meta' => $meta, 'emails' => $chunks[$chunk_count], ); // Upload to S3 $s3Client->putObject([ 'ACL' => $this->get_acl(), 'Bucket' => $this->get_bucket(), 'Key' => $filename.$chunk_count.'.json', 'Body' => json_encode($body), ]); } } } }

为简单起见,在上面的代码中,我没有将附件上传到 S3。 如果我们的电子邮件需要包含附件,那么我们必须使用 SES 函数SendRawEmail而不是SendEmail (在下面的 Lambda 脚本中使用)。

添加了将带有电子邮件的文件上传到 S3 的逻辑之后,我们可以继续编写 Lambda 函数。

编写 Lambda 脚本

Lambda 函数也称为无服务器函数,不是因为它们不在服务器上运行,而是因为开发人员无需担心服务器:开发人员只需提供脚本,云负责配置服务器、部署和运行脚本。 因此,如前所述,Lambda 函数是根据函数执行时间收费的。

以下 Node.js 脚本执行所需的工作。 由 S3 “Put” 事件调用,表示存储桶上已创建新对象,函数:

  1. 获取新对象的路径(在变量srcKey下)和存储桶(在变量srcBucket下)。
  2. 通过s3.getObject下载对象。
  3. 通过JSON.parse(response.Body.toString())解析对象的内容,并提取电子邮件和电子邮件元数据。
  4. 遍历所有电子邮件,并通过ses.sendEmail发送它们。
 var async = require('async'); var aws = require('aws-sdk'); var s3 = new aws.S3(); exports.handler = function(event, context, callback) { var srcBucket = event.Records[0].s3.bucket.name; var srcKey = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, " ")); // Download the file from S3, parse it, and send the emails async.waterfall([ function download(next) { // Download the file from S3 into a buffer. s3.getObject({ Bucket: srcBucket, Key: srcKey }, next); }, function process(response, next) { var file = JSON.parse(response.Body.toString()); var emails = file.emails; var emailsMeta = file.meta; // Check required parameters if (emails === null || emailsMeta === null) { callback('Bad Request: Missing required data: ' + response.Body.toString()); return; } if (emails.length === 0) { callback('Bad Request: No emails provided: ' + response.Body.toString()); return; } var totalEmails = emails.length; var sentEmails = 0; for (var i = 0; i < totalEmails; i++) { var email = emails[i]; var params = { Destination: { ToAddresses: email.to }, Message: { Subject: { Data: email.subject, Charset: emailsMeta.charset } }, Source: emailsMeta.from }; if (emailsMeta.contentType == 'text/html') { params.Message.Body = { Html: { Data: email.message, Charset: emailsMeta.charset } }; } else { params.Message.Body = { Text: { Data: email.message, Charset: emailsMeta.charset } }; } // Send the email var ses = new aws.SES({ "region": "us-east-1" }); ses.sendEmail(params, function(err, data) { if (err) { console.error('Unable to send email due to an error: ' + err); callback(err); } sentEmails++; if (sentEmails == totalEmails) { next(); } }); } } ], function (err) { if (err) { console.error('Unable to send emails due to an error: ' + err); callback(err); } // Success callback(null); }); };

接下来,我们必须将 Lambda 函数上传并配置到 AWS,这涉及到:

  1. 创建一个执行角色,授予 Lambda 访问 S3 的权限。
  2. 创建一个包含所有代码的 .zip 包,即我们正在创建的 Lambda 函数 + 所有必需的 Node.js 模块。
  3. 使用 CLI 工具将此包上传到 AWS。

AWS 站点上的“将 AWS Lambda 与 Amazon S3 结合使用的教程”中正确解释了如何执行这些操作。

使用 Lambda 函数连接 S3

最后,创建了存储桶和 Lambda 函数,我们需要将它们挂钩在一起,这样每当在存储桶上创建新对象时,就会触发一个事件来执行 Lambda 函数。 为此,我们转到 S3 仪表板并单击存储桶行,这将显示其属性:

在 S3 仪表板中显示存储桶属性
单击存储桶的行会显示存储桶的属性。 (大预览)

然后单击属性,我们向下滚动到“事件”项,然后单击添加通知,然后输入以下字段:

  • 名称:通知的名称,例如:“EmailSender”;
  • 事件: “Put”,即在桶上创建新对象时触发的事件;
  • 发送至: “Lambda 函数”;
  • Lambda:我们新创建的 Lambda 的名称,例如:“LambdaEmailSender”。
使用 Lambda 设置 S3
在 S3 中添加通知以触发 Lambda 事件。 (大预览)

最后,我们还可以将 S3 存储桶设置为在一段时间后自动删除包含电子邮件数据的文件。 为此,我们转到存储桶的管理选项卡,并创建一个新的生命周期规则,定义电子邮件必须在多少天后过期:

生命周期规则
设置生命周期规则以自动从存储桶中删除文件。 (大预览)

而已。 从这一刻起,当在 S3 存储桶上添加包含电子邮件内容和元数据的新对象时,它将触发 Lambda 函数,该函数将读取文件并连接到 SES 以发送电子邮件。

我在我的网站上实施了这个解决方案,它再次变得很快:通过将发送电子邮件卸载到外部进程,无论应用程序发送 20 封还是 5000 封电子邮件都没有区别,对触发操作的用户的响应将是即时。

结论

在本文中,我们分析了为什么在单个请求中发送大量事务性电子邮件可能会成为应用程序的瓶颈,并创建了解决该问题的解决方案:代替从应用程序内部(同步)连接到 SMTP 服务器,我们可以基于 AWS S3 + Lambda + SES 堆栈从外部函数异步发送电子邮件。

通过异步发送电子邮件,应用程序可以设法发送数千封电子邮件,但对触发该操作的用户的响应不会受到影响。 但是,为了确保用户不会等待电子邮件到达收件箱,我们还决定将电子邮件分为优先级和非优先级两组,并仅异步发送非优先级电子邮件。 我们为 WordPress 提供了一个实现,由于函数wp_mail用于发送电子邮件的限制,该实现相当hacky。

本文的一个教训是基于服务器的应用程序上的无服务器功能运行良好:在 CMS 上运行的站点(如 WordPress)可以通过仅在云上实现特定功能来提高其性能,并避免迁移带来的大量复杂性高度动态的站点到完全无服务器的架构。