本指南提供详尽的 Temporal Activity 概览。
在日常交流中,术语 Activity 经常代表 Activity 定义(Activity Definition)、Activity 类型(Activity Type)或 Activity 执行(Activity Execution),Temporal 文档旨在明确及说明它们之间的区别。
Activity 是普通的函数或对象方法,它执行单个的、定义明确的操作(短期运行或长期运行),比如调用其它服务、转码多媒体文件或发送邮件消息。
工作流代码编排 Activity 的执行,持久化结果。如果 Activity 函数执行(Activity Function Execution)失败,任何未来的执行从初始状态开始(除心跳外),因此 Activity 函数可以包含任意代码,没有限制。
Activity 函数(Activity Function)由 Worker 进程执行。当 Activity 函数返回时,Worker 将结果作为 ActivityTaskCompleted
事件的一部分发送回 Temporal 集群。该事件被添加到工作流执行的事件历史。
Activity 定义(Activity Definition)是定义 Activity 任务执行(Activity Task Execution)的约束的代码。
术语 “Activity 定义”用于指代任意给定语言的 SDK 中的完整的原语集,它为 Activity 函数定义(Activity Function Definition)提供访问点 - 为 Activity 任务执行(Activity Task Execution)调用的方法或函数。因此,术语 Activity 函数和 Activity 方法指代执行的实例的源。
在代码中,通过 Activity 类型(Activity Type)命名和引用 Activity 定义。
Activity 定义像普通函数一样执行。
如果发生故障,函数在重试时从其初始状态开始(除非已建立 Activity 心跳)。
因此,Activity 定义没有关于它包含的代码的限制。
Activity 定义可以根据需要支持多个参数。
通过参数传递的所有值被记录到工作流执行(Workflow Execution)的事件历史(Event History)中。返回值也会被捕获到调用工作流执行的事件历史中。
Activity 定义必须包含如下参数:
可以在实现指南中看到其它参数,比如 Retry Policies 和返回值。
Activity 类型(Activity Type)是从名称到 Activity 定义的映射。
Activity 类型的作用范围由任务队列(Task Queue)界定。
Activity 执行是 Activity 任务执行(Activity Task Executions)的完整的链。
Activity 执行没有时间限制。在 Temporal 应用程序里,可以针对每种情况优化 Activity 执行时间限制和重试。
如果因为某些原因,Activity 执行没有完成(用完所有重试),错误被返回给决定如何处理它的 Workflow。
工作流可以请求取消 Activity 执行。当 Activity 执行被取消,或它的工作流执行已完成或失败时,传递给它的函数的 context 是取消,也会将它的管道的关闭状态设置为 Done
。Activity 可以使用它执行任意必要的清理,以及终止它的执行。
仅能向允许心跳的 Activity 执行投递取消请求:
Activity 执行(Activity Execution)的标识符。该标识符可以由系统生成,或者由生成 Activity 执行的工作流代码提供。在工作流运行(Workflow Run)的处于打开(open)状态的 Activity 执行中,该标识符是唯一的。(如果拥有相同 Activity Id 的更早的 Activity 执行已经关闭,那么单个工作流运行可以重用该 Id。)
Activity Id 可用于异步地完成 Activity。
Schedule-To-Start 超时是从 Activity 任务(Activity Task)被调度(即被放进任务队列)到 Worker 启动(即从任务队列中取出) Activity 任务所允许的最长时间。换言之,它是 Activity 任务可以排队多久的限制。
就 Schedule-To-Start 超时和相关的指标而言,Worker 从任务队列中取出任务的时刻被认为是 Activity 任务的开始。“开始”的定义避免 Temporal 集群和 Worker 之间的时钟差异可能造成的问题。
Schedule-To-Start 和 Schedule-To-Close 中的“调度”有不同的频率保证。
Schedule-To-Start 超时对每个 Activity 任务强制执行,而 Schedule-To-Close 超时对每个 Activity 执行强制执行一次。因此,Schedule-To-Start 中的“调度”指的是组成 Activity 执行的 Activity 任务序列中的每个 Activity 任务的调度时刻,而 Schedule-To-Close 中的“调度”指的是那个序列中的第一个 Activity 任务。
附加到 Activity 执行的重试策略(Retry Policy)重试 Activity 任务。
该超时有两个主要用例:
默认的 Schedule-To-Start 超时是 ∞(无穷大)。
如果使用该超时,我们建议将该超时设置为在出现所有可能的 Worker 都中断的情况下,工作流执行愿意等待 Activity 执行的最大时间,并制定一个具体计划,将 Activity 任务重新路由到不同的任务队列。不管重试策略如何,该超时不会触发任何重试,因为重试会将 Activity 任务放回到相同的任务队列。我们建议不使用该超时,除非你知道你正在做什么。
在大多数情况下,我们建议通过监控 temporal_activity_schedule_to_start_latency
指标,来了解 Worker 何时放慢获取 Activity 任务的速度,而不是设置该超时。
Start-To-Close 超时是单个 Activity 任务执行(Activity Task Execution)所允许的最长时间。
默认的 Start-To-Close 超时与默认的 Schedule-To-Close Timeout 相同。
Activity 执行必须设置该超时(Start-To-Close)或 Schedule-To-Close Timeout。我们建议始终设置该超时;但确保它始终被设置为比 Activity 执行发生的最大可能时间长。对于长期运行的 Activity 执行,我们建议也使用 Activity Heartbeats 和 Heartbeat Timeouts。
Start-To-Close 超时的主要用例是检测在 Worker 开始执行 Activity 任务后,它在什么时候崩溃。
附加到 Activity 执行的重试策略(Retry Policy)重试每个 Activity 任务执行(Activity Task Execution)。因此,Start-To-Close 超时被应用到 Activity 执行里的每个 Activity 任务执行(Activity Task Execution)。
如果第一个 Activity 任务执行返回错误,那么完整的 Activity 执行可能看起来像这样:
如果达到该超时,那么发生下面的操作:
ActivityTaskTimedOut 事件被写到工作流执行的可变状态。
如果重试策略决定重试,那么 Temporal 集群调度另一个 Activity 任务。
Schedule-To-Close 超时是整个 Activity 执行所允许的最长时间,从组成该 Activity 执行的 Activity 任务链中的第一个 Activity 任务(Activity Task)被调度到最后一个 Activity 任务到达关闭(Closed)状态。
拥有 Activity 任务执行链的 Activity 执行的 Schedule-To-Close 超时周期的示例如下:
默认的 Schedule-To-Close 超时是 ∞(无穷大)。
Activity 执行必须设置该超时( Schedule-To-Close)或 Start-To-Close。默认情况下,Activity 执行重试策略规定重试最多持续 10 年。该超时可用于控制 Activity 执行的总体持续时间(在遇到故障时,会重复执行 Activity 任务),无需更改重试策略的最大尝试次数字段。
Activity 心跳是从正在执行 Activity 的 Worker 发送到 Temporal 集群的 ping 信号。ping 告诉 Temporal 集群 Activity 执行正在不断推进,Worker 没有崩溃。
Activity 心跳与 Heartbeat Timeout 同时使用。
Activity 心跳在 Activity 定义内部实现。心跳中可以包含自定义的进度信息,Activity 执行在发生重试时可使用该信息。
可以根据需要经常记录 Activity 心跳(比如,每分钟一次或每个循环迭代一次)。对除最短的 Activity 函数执行之外的任何事情进行心跳通常是很好的实践。Temporal SDK 控制向集群发送心跳的速率。
本地 Activity(Local Activities)不需要心跳,并且心跳不做任何事情。
对于长期运行的 Acticity,我们建议使用相对短的心跳超时和频繁的心跳。通过这种方式,如果 Worker 出现故障,可以及时地处理它。
心跳可以包含应用程序层级的负载,该负载可用于保存 Activity 执行的进展。如果 Activity 任务执行(Activity Task Execution)因丢失心跳而超时,那么下一个 Activity 任务可以访问,并继续使用该负载。
集群通过心跳将 Activity 取消(Cancellation)投递给 Activity。不发送心跳的 Activity 不能接收取消。心跳限流可能导致取消的投递晚于预期的时间。
心跳不是总能被发送到集群 - 它们可能被 Worker 限流。限流间隔是下面的较小值:
heartbeatTimeout
,那么是 heartbeatTimeout * 0.8
;否则,是 defaultHeartbeatThrottleInterval
maxHeartbeatThrottleInterval
defaultHeartbeatThrottleInterval
默认为 30 秒,maxHeartbeatThrottleInterval
默认是 60 秒。可以在 Worker 选项中设置它们。
限流的实现如下:
Worker 在发送心跳后,为限流间隔设置一个计时器。
Worker 停止发送心跳,但是从 Activity 持续接收心跳,并记住最近的一个。
当计时器触发时,Worker:
就“你如何知道你正在取得进展?”而言,心跳是最好的主意。对于短时的操作,进度更新不是必须的。但是,检查长期运行的 Activity 执行的进展和状态几乎总是很有用。
设置 Activity 心跳时应该考虑以下事项:
比如,下面的场景适合使用心跳:
而下面的场景不适合使用心跳:
心跳超时是两次 Activity 心跳(Activity Heartbeats)之间的最大时间。
如果达到该超时,那么 Activity 任务失败,重试策略(Retry Policy)决定是否重试。
异步的 Activity 完成(Asynchronous Activity Completion)是一个特性,它在不造成 Activity 执行完成的情况下,使 Activity 函数可以返回。然后可以使用 Temporal 客户端通过心跳汇报进展以及最终提供结果。
该特性的预期用例是外部系统拥有由 Activity 启动的计算的最终结果。
如果外部进程不可靠,通过信号发送关键状态更新可能失败,那么考虑使用异步的 Activity 代替信号。
如果流程循环中有人类,那么考虑使用信号(Signals)作为异步的 Activity 的替代,以向工作流执行返回数据。因为循环中有人类意味着流程中有多个步骤。第一个是在外部系统中存储状态的 Activity 函数,以及至少一个人类“完成”该活动的其它步骤。如果第一步失败,你需要快速地检测并重试,而不是等待整个流程,因为当人类参与其中时,整个流程显著地变长。
Task Token 是 Activity 任务执行(Activity Task Execution)的唯一标识符。
异步的 Activity 完成(Asynchronous Activity Completion)调用带下列任一项作为参数:
本地 Activity(Local Activity)是一个 Activity 执行(Activity Execution),它与生成它的工作流执行(Workflow Execution)在相同的进程中执行。
一些 Activity 执行的执行时间非常短,不需要队列语义、流控制、速率限制和路由功能。对于这种情况,Temporal 支持本地 Activity 特性。
本地 Activity 的主要收益是与正常的 Activity 执行相比,它们使用更少的 Temporal 服务资源(比如,更少的状态转换),有更低的延迟开销(因为无需往返于集群)。但是,本地 Activity 可能受较短的持续时间影响,而且缺乏速率限制。
考虑使用本地 Activity 来实现以下功能:
在不了解本地 Activity 的局限的情况下,使用它可能导致多种生产问题。我们建议使用常规 Activity,除非你的使用场景需要非常高的吞吐量和扇出非常短暂的 Activity 的大型 Activity。在我们的论坛中可以找到关于在本地 Activity 和 Activity 之间进行选择的更多指导。