通過 AWS SES 異步發送電子郵件
已發表: 2022-03-10大多數應用程序都會發送電子郵件與用戶交流。 交易電子郵件是由用戶與應用程序交互觸發的電子郵件,例如在網站註冊後歡迎新用戶、為用戶提供重置密碼的鏈接或在用戶購買後附上發票。 所有這些以前的情況通常只需要向用戶發送一封電子郵件。 但在其他一些情況下,應用程序需要發送更多電子郵件,例如當用戶在網站上發布新內容時,她的所有關注者(在 Twitter 等平台上可能有數百萬用戶)將收到通知。 在後一種情況下,如果架構不正確,發送電子郵件可能會成為應用程序的瓶頸。
這就是我的情況。 我有一個網站可能需要在一些用戶觸發的操作(例如用戶通知她的所有關注者)之後發送 20 封電子郵件。 最初,它依賴於通過流行的基於雲的 SMTP 提供商(例如 SendGrid、Mandrill、Mailjet 和 Mailgun)發送電子郵件,但是回復用戶需要幾秒鐘的時間。 顯然,連接到 SMTP 服務器以發送這 20 封電子郵件會顯著減慢處理速度。
經過檢查,我發現了問題的根源:
- 同步連接
應用程序連接到 SMTP 服務器並同步等待確認,然後繼續執行該過程。 - 高延遲
雖然我的服務器位於新加坡,但我使用的 SMTP 提供商的服務器位於美國,因此往返連接需要相當長的時間。 - 沒有 SMTP 連接的可重用性當調用函數發送電子郵件時,函數會立即發送電子郵件,並在此時創建一個新的 SMTP 連接(它不提供在請求結束時收集所有電子郵件並在單個 SMTP 下將它們一起發送聯繫)。
由於#1,用戶必須等待響應的時間與發送電子郵件所需的時間相關聯。 由於#2,發送一封電子郵件的時間相對較長。 由於#3,發送 20 封電子郵件的時間是發送一封電子郵件所需時間的 20 倍。 雖然只發送一封電子郵件可能不會使應用程序變得非常慢,但發送 20 封電子郵件肯定會影響用戶體驗。
讓我們看看如何解決這個問題。
注意交易電子郵件的性質
首先,我們必須注意到並非所有電子郵件的重要性都相同。 我們可以將電子郵件大致分為兩組:優先級電子郵件和非優先級電子郵件。 例如,如果用戶忘記了訪問帳戶的密碼,她會立即在收件箱中看到帶有密碼重置鏈接的電子郵件; 這是一個優先電子郵件。 相反,發送一封電子郵件通知我們關注的人發布了新內容不需要立即到達用戶的收件箱; 那是一封非優先郵件。
該解決方案必須優化這兩類電子郵件的發送方式。 假設在此過程中只有少數(可能是 1 或 2)封優先郵件要發送,並且大部分郵件都是非優先郵件,那麼我們設計的解決方案如下:
- 優先級電子郵件可以通過使用位於部署應用程序的同一區域的 SMTP 提供程序來簡單地避免高延遲問題。 除了良好的研究之外,這還涉及將我們的應用程序與提供者的 API 集成。
- 非優先郵件可以異步發送,也可以分批發送,多封郵件一起發送。 在應用程序級別實施,它需要適當的技術堆棧。
接下來讓我們定義技術堆棧以異步發送電子郵件。
定義技術堆棧
注意:我決定將我的堆棧基於 AWS 服務,因為我的網站已經託管在 AWS EC2 上。 否則,我會因在幾家公司的網絡之間移動數據而產生開銷。 但是,我們也可以使用其他雲服務提供商來實施我們的解決方案。
我的第一種方法是設置隊列。 通過隊列,我可以讓應用程序不再發送電子郵件,而是在隊列中發布包含電子郵件內容和元數據的消息,然後讓另一個進程從隊列中提取消息並發送電子郵件。
但是,在檢查 AWS 的隊列服務(稱為 SQS)時,我認為這不是一個合適的解決方案,因為:
- 設置相當複雜;
- 標準隊列消息最多只能存儲 256 kb 的信息,如果電子郵件有附件(例如發票),這可能還不夠。 即使可以將大消息拆分為較小的消息,複雜性也會增加更多。
然後我意識到我可以通過結合其他更容易設置的 AWS 服務 S3 和 Lambda 來完美地模仿隊列的行為。 S3 是一種用於存儲和檢索數據的雲對象存儲解決方案,可以充當上傳消息的存儲庫,而運行代碼以響應事件的計算服務 Lambda 可以選擇消息並使用它執行操作。
換句話說,我們可以像這樣設置我們的電子郵件發送過程:
- 應用程序將包含電子郵件內容 + 元數據的文件上傳到 S3 存儲桶。
- 每當有新文件上傳到 S3 存儲桶時,S3 都會觸發一個包含新文件路徑的事件。
- Lambda 函數選擇事件、讀取文件並發送電子郵件。
最後,我們必須決定如何發送電子郵件。 我們可以繼續使用我們已有的 SMTP 提供商,讓 Lambda 函數與他們的 API 交互,或者使用 AWS 服務發送電子郵件,稱為 SES。 使用 SES 既有優點也有缺點:
好處:
- 在 AWS Lambda 中使用非常簡單(只需 2 行代碼)。
- 它更便宜:Lambda 費用是根據執行函數所需的時間計算的,因此從 AWS 網絡內連接到 SES 將比連接到外部服務器花費更短的時間,從而使函數更早完成並且成本更低. (除非 SES 在託管應用程序的同一區域不可用;就我而言,因為在我的 EC2 服務器所在的亞太地區(新加坡)區域不提供 SES,那麼我最好連接到一些亞洲的外部 SMTP 提供商)。
缺點:
- 提供的用於監控我們已發送電子郵件的統計數據不多,添加更強大的統計數據需要額外的努力(例如:跟踪打開的電子郵件百分比或點擊的鏈接,必須通過 AWS CloudWatch 設置)。
- 如果我們繼續使用 SMTP 提供程序來發送優先級電子郵件,那麼我們將不會將我們的統計信息放在一個地方。
為簡單起見,在下面的代碼中,我們將使用 SES。
然後,我們定義了流程和堆棧的邏輯如下:應用程序像往常一樣發送優先級電子郵件,但對於非優先級電子郵件,它會將包含電子郵件內容和元數據的文件上傳到 S3; 此文件由 Lambda 函數異步處理,該函數連接到 SES 以發送電子郵件。
讓我們開始實施解決方案。
區分優先級和非優先級電子郵件
簡而言之,這一切都取決於應用程序,因此我們需要逐個電子郵件地決定。 我將描述一個我為 WordPress 實現的解決方案,它需要圍繞函數wp_mail
的約束進行一些修改。 對於其他平台,以下策略也可以使用,但很可能會有更好的策略,不需要 hack 即可工作。
在 WordPress 中發送電子郵件的方式是調用函數wp_mail
,我們不想改變它(例如:通過調用函數wp_mail_synchronous
或wp_mail_asynchronous
),所以我們的wp_mail
實現需要同時處理同步和異步情況,並且需要知道電子郵件屬於哪個組。 不幸的是, wp_mail
沒有提供任何額外的參數來評估這些信息,從它的簽名可以看出:
function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() )
然後,為了找出電子郵件的類別,我們添加了一個 hacky 解決方案:默認情況下,我們使電子郵件屬於優先級組,如果$to
包含特定電子郵件(例如:[email protected]),或者如果$subject
以特殊字符串開頭(例如:“[Non-priority!]”),則它屬於非優先組(我們從主題中刪除相應的電子郵件或字符串)。 wp_mail
是一個可插入的函數,所以我們可以簡單地通過在我們的functions.php 文件上實現一個具有相同簽名的新函數來覆蓋它。 最初,它包含與原始wp_mail
函數相同的代碼,位於文件 wp-includes/pluggable.php 中,用於提取所有參數:
if ( !function_exists( 'wp_mail' ) ) : function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() ) { $atts = apply_filters( 'wp_mail', compact( 'to', 'subject', 'message', 'headers', 'attachments' ) ); if ( isset( $atts['to'] ) ) { $to = $atts['to']; } if ( !is_array( $to ) ) { $to = explode( ',', $to ); } if ( isset( $atts['subject'] ) ) { $subject = $atts['subject']; } if ( isset( $atts['message'] ) ) { $message = $atts['message']; } if ( isset( $atts['headers'] ) ) { $headers = $atts['headers']; } if ( isset( $atts['attachments'] ) ) { $attachments = $atts['attachments']; } if ( ! is_array( $attachments ) ) { $attachments = explode( "\n", str_replace( "\r\n", "\n", $attachments ) ); } // Continue below... } endif;
然後我們檢查它是否是非優先級的,在這種情況下,我們在函數send_asynchronous_mail
下分叉到一個單獨的邏輯,如果不是,我們繼續執行與原始wp_mail
函數中相同的代碼:
function wp_mail( $to, $subject, $message, $headers = '', $attachments = array() ) { // Continued from above... $hacky_email = "[email protected]"; if (in_array($hacky_email, $to)) { // Remove the hacky email from $to array_splice($to, array_search($hacky_email, $to), 1); // Fork to asynchronous logic return send_asynchronous_mail($to, $subject, $message, $headers, $attachments); } // Continue all code from original function in wp-includes/pluggable.php // ... }
在我們的函數send_asynchronous_mail
中,我們不是直接將電子郵件上傳到 S3,而是將電子郵件添加到全局變量$emailqueue
中,我們可以在請求結束時通過單個連接將所有電子郵件一起上傳到 S3:
function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { global $emailqueue; if (!$emailqueue) { $emailqueue = array(); } // Add email to queue. Code continues below... }
我們可以為每封電子郵件上傳一個文件,也可以將它們捆綁在一起,以便在一個文件中包含許多電子郵件。 由於$headers
包含電子郵件元數據(發件人、內容類型和字符集、CC、BCC 和回複字段),我們可以將具有相同$headers
的電子郵件分組在一起。 這樣,這些電子郵件都可以在同一個文件中上傳到 S3,並且$headers
元信息將僅包含在文件中一次,而不是每封電子郵件一次:
function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { // Continued from above... // Add email to the queue $emailqueue[$headers] = $emailqueue[$headers] ?? array(); $emailqueue[$headers][] = array( 'to' => $to, 'subject' => $subject, 'message' => $message, 'attachments' => $attachments, ); // Code continues below }
最後,函數send_asynchronous_mail
返回true
。 請注意,這段代碼是 hacky: true
通常意味著電子郵件已成功發送,但在這種情況下,它甚至還沒有發送,它可能完全失敗。 因此,調用wp_mail
的函數不能將true
的響應視為“電子郵件已成功發送”,而應將其視為已入隊的確認。 這就是為什麼將這種技術限制在非優先電子郵件中很重要,這樣如果失敗,該過程可以在後台繼續重試,並且用戶不會期望電子郵件已經在她的收件箱中:
function send_asynchronous_mail($to, $subject, $message, $headers, $attachments) { // Continued from above... // That's it! return true; }
將電子郵件上傳到 S3
在我之前的文章“通過 AWS S3 在多台服務器之間共享數據”中,我介紹瞭如何在 S3 中創建存儲桶,以及如何通過 SDK 將文件上傳到存儲桶。 下面的所有代碼都繼續為 WordPress 實施解決方案,因此我們使用 SDK for PHP 連接到 AWS。

我們可以從抽像類AWS_S3
(在我之前的文章中介紹過)擴展以連接到 S3,並在請求結束時將電子郵件上傳到存儲桶“async-emails”(通過wp_footer
掛鉤觸發)。 請注意,我們必須將 ACL 保持為“私有”,因為我們不希望電子郵件暴露在互聯網上:
class AsyncEmails_AWS_S3 extends AWS_S3 { function __construct() { // Send all emails at the end of the execution add_action("wp_footer", array($this, "upload_emails_to_s3"), PHP_INT_MAX); } protected function get_acl() { return "private"; } protected function get_bucket() { return "async-emails"; } function upload_emails_to_s3() { $s3Client = $this->get_s3_client(); // Code continued below... } } new AsyncEmails_AWS_S3();
我們開始遍歷保存在全局變量$emailqueue
中的 headers => emaildata 對,並從函數get_default_email_meta
中獲取默認配置以判斷 headers 是否為空。 在下面的代碼中,我只從 headers 中檢索“from”字段(提取所有 headers 的代碼可以從原始函數wp_mail
複製):
class AsyncEmails_AWS_S3 extends AWS_S3 { public function get_default_email_meta() { // Code continued from above... return array( 'from' => sprintf( '%s <%s>', get_bloginfo('name'), get_bloginfo('admin_email') ), 'contentType' => 'text/html', 'charset' => strtolower(get_option('blog_charset')) ); } public function upload_emails_to_s3() { // Code continued from above... global $emailqueue; foreach ($emailqueue as $headers => $emails) { $meta = $this->get_default_email_meta(); // Retrieve the "from" from the headers $regexp = '/From:\s*(([^\<]*?) <)? ?\s*\n/i'; if(preg_match($regexp, $headers, $matches)) { $meta['from'] = sprintf( '%s <%s>', $matches[2], $matches[3] ); } // Code continued below... } } }
(.+?)>class AsyncEmails_AWS_S3 extends AWS_S3 { public function get_default_email_meta() { // Code continued from above... return array( 'from' => sprintf( '%s <%s>', get_bloginfo('name'), get_bloginfo('admin_email') ), 'contentType' => 'text/html', 'charset' => strtolower(get_option('blog_charset')) ); } public function upload_emails_to_s3() { // Code continued from above... global $emailqueue; foreach ($emailqueue as $headers => $emails) { $meta = $this->get_default_email_meta(); // Retrieve the "from" from the headers $regexp = '/From:\s*(([^\<]*?) <)? ?\s*\n/i'; if(preg_match($regexp, $headers, $matches)) { $meta['from'] = sprintf( '%s <%s>', $matches[2], $matches[3] ); } // Code continued below... } } }
最後,我們將電子郵件上傳到 S3。 我們決定每個文件上傳多少封電子郵件以節省資金。 Lambda 函數根據它們需要執行的時間量收費,按 100 毫秒的跨度計算。 一個函數需要的時間越多,它就變得越昂貴。
因此,通過每封電子郵件上傳 1 個文件來發送所有電子郵件比每封電子郵件上傳 1 個文件更昂貴,因為執行該函數的開銷是每封電子郵件計算一次,而不是為許多電子郵件只計算一次,而且還因為發送許多電子郵件一起更徹底地填充了 100ms 跨度。
所以我們每個文件上傳許多電子郵件。 多少封電子郵件? Lambda 函數有一個最大執行時間(默認為 3 秒),如果操作失敗,它將繼續從頭開始重試,而不是從失敗的地方重試。 因此,如果文件包含 100 封電子郵件,並且 Lambda 設法在最大時間結束之前發送了 50 封電子郵件,那麼它會失敗並再次嘗試執行該操作,再次發送前 50 封電子郵件。 為避免這種情況,我們必須為每個文件選擇一些我們確信足以在最長時間結束之前處理的電子郵件。 在我們的情況下,我們可以選擇每個文件發送 25 封電子郵件。 電子郵件的數量取決於應用程序(較大的電子郵件將需要更長的時間來發送,而發送電子郵件的時間將取決於基礎設施),因此我們應該進行一些測試以得出正確的數量。
該文件的內容只是一個 JSON 對象,包含屬性“meta”下的電子郵件元數據,以及屬性“emails”下的電子郵件塊:
class AsyncEmails_AWS_S3 extends AWS_S3 { public function upload_emails_to_s3() { // Code continued from above... foreach ($emailqueue as $headers => $emails) { // Code continued from above... // Split the emails into chunks of no more than the value of constant EMAILS_PER_FILE: $chunks = array_chunk($emails, EMAILS_PER_FILE); $filename = time().rand(); for ($chunk_count = 0; $chunk_count < count($chunks); $chunk_count++) { $body = array( 'meta' => $meta, 'emails' => $chunks[$chunk_count], ); // Upload to S3 $s3Client->putObject([ 'ACL' => $this->get_acl(), 'Bucket' => $this->get_bucket(), 'Key' => $filename.$chunk_count.'.json', 'Body' => json_encode($body), ]); } } } }
為簡單起見,在上面的代碼中,我沒有將附件上傳到 S3。 如果我們的電子郵件需要包含附件,那麼我們必須使用 SES 函數SendRawEmail
而不是SendEmail
(在下面的 Lambda 腳本中使用)。
添加了將帶有電子郵件的文件上傳到 S3 的邏輯之後,我們可以繼續編寫 Lambda 函數。
編寫 Lambda 腳本
Lambda 函數也稱為無服務器函數,不是因為它們不在服務器上運行,而是因為開發人員無需擔心服務器:開發人員只需提供腳本,雲負責配置服務器、部署和運行腳本。 因此,如前所述,Lambda 函數是根據函數執行時間收費的。
以下 Node.js 腳本執行所需的工作。 由 S3 “Put” 事件調用,表示存儲桶上已創建新對象,函數:
- 獲取新對象的路徑(在變量
srcKey
下)和存儲桶(在變量srcBucket
下)。 - 通過
s3.getObject
下載對象。 - 通過
JSON.parse(response.Body.toString())
解析對象的內容,並提取電子郵件和電子郵件元數據。 - 遍歷所有電子郵件,並通過
ses.sendEmail
發送它們。
var async = require('async'); var aws = require('aws-sdk'); var s3 = new aws.S3(); exports.handler = function(event, context, callback) { var srcBucket = event.Records[0].s3.bucket.name; var srcKey = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, " ")); // Download the file from S3, parse it, and send the emails async.waterfall([ function download(next) { // Download the file from S3 into a buffer. s3.getObject({ Bucket: srcBucket, Key: srcKey }, next); }, function process(response, next) { var file = JSON.parse(response.Body.toString()); var emails = file.emails; var emailsMeta = file.meta; // Check required parameters if (emails === null || emailsMeta === null) { callback('Bad Request: Missing required data: ' + response.Body.toString()); return; } if (emails.length === 0) { callback('Bad Request: No emails provided: ' + response.Body.toString()); return; } var totalEmails = emails.length; var sentEmails = 0; for (var i = 0; i < totalEmails; i++) { var email = emails[i]; var params = { Destination: { ToAddresses: email.to }, Message: { Subject: { Data: email.subject, Charset: emailsMeta.charset } }, Source: emailsMeta.from }; if (emailsMeta.contentType == 'text/html') { params.Message.Body = { Html: { Data: email.message, Charset: emailsMeta.charset } }; } else { params.Message.Body = { Text: { Data: email.message, Charset: emailsMeta.charset } }; } // Send the email var ses = new aws.SES({ "region": "us-east-1" }); ses.sendEmail(params, function(err, data) { if (err) { console.error('Unable to send email due to an error: ' + err); callback(err); } sentEmails++; if (sentEmails == totalEmails) { next(); } }); } } ], function (err) { if (err) { console.error('Unable to send emails due to an error: ' + err); callback(err); } // Success callback(null); }); };
接下來,我們必須將 Lambda 函數上傳並配置到 AWS,這涉及到:
- 創建一個執行角色,授予 Lambda 訪問 S3 的權限。
- 創建一個包含所有代碼的 .zip 包,即我們正在創建的 Lambda 函數 + 所有必需的 Node.js 模塊。
- 使用 CLI 工具將此包上傳到 AWS。
AWS 站點上的“將 AWS Lambda 與 Amazon S3 結合使用的教程”中正確解釋瞭如何執行這些操作。
使用 Lambda 函數連接 S3
最後,創建了存儲桶和 Lambda 函數,我們需要將它們掛鉤在一起,這樣每當在存儲桶上創建新對象時,就會觸發一個事件來執行 Lambda 函數。 為此,我們轉到 S3 儀表板並單擊存儲桶行,這將顯示其屬性:

然後單擊屬性,我們向下滾動到“事件”項,然後單擊添加通知,然後輸入以下字段:
- 名稱:通知的名稱,例如:“EmailSender”;
- 事件: “Put”,即在桶上創建新對象時觸發的事件;
- 發送至: “Lambda 函數”;
- Lambda:我們新創建的 Lambda 的名稱,例如:“LambdaEmailSender”。

最後,我們還可以將 S3 存儲桶設置為在一段時間後自動刪除包含電子郵件數據的文件。 為此,我們轉到存儲桶的管理選項卡,並創建一個新的生命週期規則,定義電子郵件必須在多少天后過期:

而已。 從這一刻起,當在 S3 存儲桶上添加包含電子郵件內容和元數據的新對象時,它將觸發 Lambda 函數,該函數將讀取文件並連接到 SES 以發送電子郵件。
我在我的網站上實施了這個解決方案,它再次變得很快:通過將發送電子郵件卸載到外部進程,無論應用程序發送 20 封還是 5000 封電子郵件都沒有區別,對觸發該操作的用戶的響應將是即時。
結論
在本文中,我們分析了為什麼在單個請求中發送大量事務性電子郵件可能會成為應用程序的瓶頸,並創建了解決該問題的解決方案:代替從應用程序內部(同步)連接到 SMTP 服務器,我們可以基於 AWS S3 + Lambda + SES 堆棧從外部函數異步發送電子郵件。
通過異步發送電子郵件,應用程序可以設法發送數千封電子郵件,但對觸發該操作的用戶的響應不會受到影響。 但是,為了確保用戶不會等待電子郵件到達收件箱,我們還決定將電子郵件分為優先級和非優先級兩組,並僅異步發送非優先級電子郵件。 我們為 WordPress 提供了一個實現,由於函數wp_mail
用於發送電子郵件的限制,該實現相當hacky。
本文的一個教訓是基於服務器的應用程序上的無服務器功能運行良好:在 CMS 上運行的站點(如 WordPress)可以通過僅在雲上實現特定功能來提高其性能,並避免遷移帶來的大量複雜性高度動態的站點到完全無服務器的架構。