当您同步调用一个AWS Lambda 函数时,通常希望该函数能尽快返回响应。例如,当客户端通过Amazon API Gateway 或AWS Step Functions调用 Lambda 函数时,这种情况十分常见。然而,在某些情况下,您可能需要执行一些额外的工作,这些工作不影响响应,可以在发送响应后异步进行。例如,您可以将数据存储到数据库中或将信息发送到日志系统。
一旦您发送了函数响应,Lambda 服务会冻结运行环境,无法再运行额外代码。即使您为在后台运行任务创建了一个线程,Lambda 服务一旦返回处理程序的响应,运行环境也会被冻结,导致线程在下一次调用之前无法继续执行。虽然您可以延迟返回响应,直到所有工作完成,但这种做法可能会对用户体验产生负面影响。
本博客将探讨在函数返回响应后仍可运行的任务。
第一种选择是将代码拆分为两个函数。第一个函数处理同步代码;第二个函数处理异步代码。在同步功能返回之前,它会异步调用第二个函数,可以直接使用Invoke API 或间接触发,例如,通过发送消息到Amazon SQS。
以下 Python 代码演示了如何实现这一点:
pythonimport jsonimport timeimport osimport boto3from awslambdapowertools import Logger
logger = Logger()client = boto3client(lambda)
def calcresponse(event) loggerinfo(f[Function] 正在计算响应) timesleep(1) # 模拟同步工作 return { message 来自异步的问候 }
def submitasynctask(response) # 调用异步函数继续 loggerinfo(f[Function] 调用异步任务) clientinvokeasync(FunctionName=osgetenv(ASYNCFUNCTION) InvokeArgs=jsondumps(response))
def handler(event context) loggerinfo(f[Function] 收到事件 {jsondumps(event)})
response = calcresponse(event)# 完成响应计算,提交异步任务submitasynctask(response)# 将响应返回给客户端loggerinfo(f[Function] 返回响应给客户端)return { statusCode 200 body jsondumps(response)}以下是执行异步工作的 Lambda 函数:
pythonimport jsonimport timefrom awslambdapowertools import Logger
logger = Logger()
def handler(event context) loggerinfo(f[异步任务] 开始异步任务 {jsondumps(event)}) timesleep(3) # 模拟异步工作 loggerinfo(f[异步任务] 完成)
响应流 允许开发者在收到响应的第一字节后即开始流式发送响应,而无需等待整个响应的完成。您通常在需要缩短首次字节时间TTFB或在响应大于6 MBLambda 响应有效负载大小限制的情况下使用响应流。
通过这种方法,函数可以使用响应流发送响应,并在发送响应的最后一个字节后继续运行代码。这样,客户端会收到响应,而 Lambda 函数也可以继续运行。
以下 Nodejs 代码示例演示如何实现此功能:
javascriptimport { Logger } from @awslambdapowertools/logger
const logger = new Logger()
export const handler = awslambdastreamifyResponse(async (event responseStream context) =gt { loggerinfo([Function] 收到事件 event)
// 处理事件let response = await calcresponse(event)// 将响应返回给客户端loggerinfo([Function] 返回响应给客户端)responseStreamsetContentType(application/json)responseStreamwrite(response)responseStreamend()await asynctask(response)
})
const calcresponse = async (event) =gt { loggerinfo([Function] 正在计算响应) await sleep(1) // 模拟同步工作
return { message 来自流式的问候}}
const asynctask = async (response) =gt { loggerinfo([异步任务] 开始异步任务) await sleep(3) // 模拟异步工作 loggerinfo([异步任务] 完成)}
const sleep = async (sec) =gt { return new Promise((resolve) =gt { setTimeout(resolve sec 1000) })}
Lambda 扩展 允许增强 Lambda 函数,与您偏好的监控、可观察性、安全和治理工具集成。您还可以使用扩展在后台运行自己的代码,以便在您的函数返回响应给客户端后继续运行。
Lambda 扩展分为两种类型:外部和内部扩展。外部扩展作为与执行环境分开的进程运行。Lambda 函数可以通过/tmp 文件夹中的文件或使用本地网络例如通过 HTTP 请求与扩展进行通信。您必须将外部扩展打包为 Lambda 层。
内部扩展作为运行处理程序的同一进程中的独立线程运行。处理程序可以通过任何进程内机制进行通信,例如内部队列。以下示例显示了一个内部扩展,这是处理程序进程中的专用线程。
当 Lambda 服务调用函数时,它还会通知所有扩展。只有在 Lambda 函数返回响应并且所有扩展向运行时发出信号表示它们完成时,Lambda 服务才冻结执行环境。通过这种方式,函数可以使用扩展独立运行任务,扩展在任务处理完成后通知 Lambda 运行时。这使得执行环境在任务完成之前保持活跃。
以下 Python 代码示例将扩展代码隔离到自己的文件中,处理程序导入并使用它来运行后台任务:
pythonimport jsonimport timeimport asyncprocessor as apfrom awslambdapowertools import Logger
logger = Logger()
def calcresponse(event) loggerinfo(f[Function] 正在计算响应) timesleep(1) # 模拟同步工作 return { message 来自扩展的问候 }
def asynctask(response) loggerinfo(f[异步任务] 开始异步任务 {jsondumps(response)}) timesleep(3) # 模拟异步工作 loggerinfo(f[异步任务] 完成)
def handler(event context) loggerinfo(f[Function] 收到事件 {jsondumps(event)})
# 计算响应response = calcresponse(event)# 完成响应计算# 调用异步处理器继续loggerinfo(f[Function] 调用扩展中的异步任务)apstartasynctask(asynctask response)# 将响应返回给客户端loggerinfo(f[Function] 返回响应给客户端)return { statusCode 200 body jsondumps(response)}以下 Python 代码演示如何实现运行后台任务的扩展:
pythonimport osimport requestsimport threadingimport queuefrom awslambdapowertools import Logger
logger = Logger()LAMBDAEXTENSIONNAME = AsyncProcessor
asynctasksqueue = queueQueue()
def startasyncprocessor() # 注册内部扩展 loggerdebug(f[{LAMBDAEXTENSIONNAME}] 正在与 Lambda 服务注册) response = requestspost( url=fhttp//{osenviron[AWSLAMBDARUNTIMEAPI]}/20200101/extension/register json={events [INVOKE]} headers={LambdaExtensionName LAMBDAEXTENSIONNAME} ) extid = responseheaders[LambdaExtensionIdentifier] loggerdebug(f[{LAMBDAEXTENSIONNAME}] 注册成功,ID {extid})
def processtasks() while True loggerdebug(f[{LAMBDAEXTENSIONNAME}] 等待调用事件) response = requestsget( url=fhttp//{osenviron[AWSLAMBDARUNTIMEAPI]}/20200101/extension/event/next headers={LambdaExtensionIdentifier extid} timeout=None ) # 从内部队列获取下一个任务 loggerdebug(f[{LAMBDAEXTENSIONNAME}] 唤醒,等待处理处理程序的异步任务) asynctask args = asynctasksqueueget() if asynctask is None # 没有任务可运行 loggerdebug(f[{LAMBDAEXTENSIONNAME}] 收到空任务。忽略。) else # 调用任务 loggerdebug(f[{LAMBDAEXTENSIONNAME}] 从处理器收到异步任务,开始任务。) asynctask(args) loggerdebug(f[{LAMBDAEXTENSIONNAME}] 完成任务处理)# 在单独的线程中启动处理扩展事件threadingThread(target=processtasks daemon=True name=AsyncProcessor)start()def startasynctask(asynctask=None args=None) asynctasksqueueput((asynctask args))
startasyncprocessor()
Lambda 支持多种内置运行时:Python、Nodejs、Java、NET 和 Ruby。Lambda 也支持自定义运行时,这使您可以使用所需的任何其他编程语言开发 Lambda 函数。

当您调用使用自定义运行时的 Lambda 函数时,Lambda 服务触发名为“bootstrap”的进程,其中包含您的自定义代码。自定义代码需要与Lambda 运行时 API进行交互。它调用/next 端点以获取下一个调用的信息。此 API 调用是阻塞的,它会一直等待直到请求到达。当函数处理完请求后,它必须调用/response 端点将响应发送回客户端,然后必须再次调用/next 端点以等待下一个调用。Lambda 在您调用 /next 之后冻结执行环境,直到请求到达。
加速器官方网下载使用这种方法,您可以在调用 /response、将响应发送回客户端后,在调用 /next 之前运行异步任务,指示处理完成。
以下 Python 代码示例将自定义运行时代码隔离到自己的文件中,函数导入并使用它与运行时 API 交互:
pythonimport timeimport jsonimport runtimeinterface as rtfrom awslambdapowertools import Logger
logger = Logger()
def calcresponse(event) loggerinfo(f[Function] 正在计算响应) timesleep(1) # 模拟同步工作 return { message 来自自定义的问候 }
def asynctask(response) loggerinfo(f[异步任务] 开始异步任务 {jsondumps(response)}) timesleep(3) # 模拟异步工作 loggerinfo(f[异步任务] 完成)
def main() # 您可以在这里添加初始化代码
# 以下循环无限运行,等待下一个调用并发送响应回客户端while True requestId event = rtgetnext() loggerinfo(f[Function] 收到事件 {jsondumps(event)}) # 计算响应 response = calcresponse(event) # 完成计算响应,发送响应给客户端 loggerinfo(f[Function] 返回响应给客户端) rtsendresponse(requestId { statusCode 200 body jsondumps(response) }) loggerinfo(f[Function] 正在调用异步任务) asynctask(response)main()
以下 Python 代码演示如何与运行时 API 进行交互:
pythonimport requestsimport osfrom awslambdapowertools import Logger
logger = Logger()runtimeendpoint = osenviron[AWSLAMBDARUNTIMEAPI]
def getnext() loggerdebug([自定义运行时] 等待调用) request = requestsget( url=fhttp//{runtimeendpoint}/20180601/runtime/invocation/next timeout=None ) event = requestjson() requestId = requestheaders[LambdaRuntimeAwsRequestId] return requestId event
def sendresponse(requestId response) loggerdebug([自定义运行时] 发送响应) requestspost( url=fhttp//{runtimeendpoint}/20180601/runtime/invocation/{requestId}/response json=response timeout=None )
本博客展示了在 Lambda 函数中结合同步和异步任务的四种方式,允许您在函数向客户端返回响应后继续运行任务。以下表格总结了每种解决方案的优缺点:
异步调用响应流Lambda 扩展自定义运行时复杂性实现较简单最容易实现实现复杂,需与扩展 API 和独立线程交互部署需要两个工件:同步函数和异步函数一个包含所有代码的单一部署工件一个包含所有代码的单一部署工件成本成本最高,因额外调用和两个函数的总时长高于单一函数成本最低成本最低启动异步任务在处理程序返回之前在处理调用中的任何时间在处理调用中的任何时间限制发送给异步函数的有效负载不能超过256 KB仅支持 Nodejs 和自定义运行时。要求Lambda 函数 URLs,不能与 API Gateway 一起使用,始终公开额外好处同步和异步代码更好地解耦能够分阶段发送响应。支持大于6 MB 的有效负载需额外费用异步任务在自己的线程中运行,这可以降低总时长和成本失败时的重试由 Lambda 服务管理由开发者负责由开发者负责选择正确的方案取决于您的用例。如果您使用 Nodejs 编写函数并通过 Lambda 函数 URL 调用它,请使用响应流。这是最简单的实现方式,也是最具成本效益的方式。
如果异步任务可能发生故障例如,数据库不可访问,并且您必须确保任务完成,请使用异步 Lambda 调用方法。Lambda 服务会重试您的异步函数,直到其成功为止。如果所有重试都失败,最后会触发 Lambda 目标,您可以采取相应措施。
如果您需要自定义运行时,因为您需要使用 Lambda 默认不支持的编程语言,请使用自定义运行时选项。否则,请使用 Lambda 扩展选项。虽然实现更复杂,但它成本效益高,可以将代码打包为单个工件,并在向客户端发送响应之前开始处理异步任务。
欲了解更多无服务学习资源,请访问Serverless Land。
标签:贡献者 无服务器
留言框-