Temporal 应用程序开发指南的基础部分覆盖概念的最小集,以及构建和运行 Temporal 应用程序所需的实现细节 - 即用于启动执行 Activity 的工作流执行(Workflow Execution)的所有相关步骤。


运行开发集群

下面介绍如何通过 Docker Compose 在本地部署 Temporal 集群,以便能够使用 Temporal 客户端 API 和 tcl 命令,来测试和开发应用程序。

Docker Compose

在开发 Temporal 应用程序期间,使用 Docker Compose 和 Temporal 集群 Docker 镜像在本地快速安装和运行 Temporal 集群。

必要条件

安装 DockerDocker Compose

克隆仓库,及运行 Docker Compose

下面的步骤使用默认配置启动和运行 Temporal 集群。

  1. 克隆 temporalio/docker-compose 仓库。

  2. 切换到项目目录。

  3. 从项目目录,启动应用程序。

结果:Temporal 集群运行在 http://127.0.0.1:7233,Temporal Web UI 运行在 http://127.0.0.1:8080

如果想尝试其它配置(不同的依赖和数据库),或尝试自定义的 Docker 镜像,请仿效 temporalio/docker-compose README


添加 SDK

Temporal SDK 提供用于 Temporal 应用程序开发的框架。SDK 提供如下功能:

下面以 Go 为例进行说明。

Temporal Go SDK 添加到项目:

或将 Go SDK 仓库克隆到你喜欢的位置:

API 引用

每种 SDK 都有它自己的 API 引用。

下面以 Go 为例进行说明。

Temporal Go SDK API 引用被发布在 pkg.go.dev

代码示例

你可以在 Temporal's GitHub repository 找到完整的可执行代码示例列表。


连接到集群

Temporal Client 使你可以与 Temporal Cluster 通信。与 Temporal 集群的通信包括,但不限于:

注意:

不能在 Workflow 内部初始化和使用 Temporal 客户端。但可以在 Activity 内部使用 Temporal 客户端与 Temporal 集群进行通信。

当你在本地运行集群时,你必须提供的连接选项的数量是最少的。

当你正在连接到生产集群时,你可能需要提供额外的连接和客户端选项,包括但不限于:

关于管理和生成 Temporal Cloud 客户端证书的更多信息,请查看 How to manage certificates in Temporal Cloud

关于为 Temporal 集群配置 TLS 的更多信息,请查看 Temporal Customization Samples

下面以 Go 为例进行说明。

使用 go.temporal.io/sdk/client 包中的 Dial() API 创建新 Client

如果你未提供 HostPort,客户端默认地址和端口号为 127.0.0.1:7233

通过 Client Options 实例的 Namespace 字段指定自定义的 Namespace 名称。

使用 ConnectionOptions API 连接带 mTLS 的客户端。


开发工作流

工作流(Workflow)是 Temporal 应用程序的基础单元,它开始于工作流定义(Workflow Definition)的开发。

下面以 Go 为例进行说明。

在 Temporal Go SDK 编程模型中,工作流定义是可导出(exportable)函数。

在 Go 中,工作流类型(Workflow Type)名称默认与函数名相同。

工作流参数

Temporal 工作流可以拥有任意数量的自定义参数。但是,强烈建议使用对象作为参数,以便修改对象的单独的字段,而不破坏工作流的签名。所有工作流定义参数必须可序列化。

下面以 Go 为例进行说明。

基于 Go 的工作流定义的第一个参数必须具有 workflow.Context 类型,Temporal Go SDK 使用它来传递工作流执行(Workflow Execution)上下文,并且几乎所有来自于工作流的可调用 Go SDK API 都需要它。它获取自 go.temporal.io/sdk/workflow 包。

workflow.Context 实体的操作与 Go 提供的标准 context.Context 相似。两者之间仅有的区别是 workflow.Context 提供的 Done() 函数返回 workflow.Channel,而非标准的 Go chan

第二个参数,string,是当工作流定义被调用时,传递给工作流的自定义参数。工作流定义支持 0 或多个自定义参数。这些参数可以是常规类型的变量或安全指针。但最佳实践是传递结构体类型的单个参数,这样在添加新参数时,可以向后兼容(backward compatibility)。

不管使用的是指针,还是常规类型值,所有工作流参数必须是可序列化的。参数不能是管道(channel)、函数(function)、可变参数(variadic)或不安全指针(unsafe pointer)。

工作流返回值

工作流返回值也必须可序列化。在受支持的每种语言中返回结果、返回错误或抛出异常非常惯用。但用于获取工作流执行的结果的 Temporal API 仅能接收结果或错误中的一个。

下面以 Go 为例进行说明。

基于 Go 的工作流定义可以返回 errorcustomValue, error 组合。此外,最佳实践是使用结构体类型持有所有自定义值。

用 Go 写的工作流定义既能返回自定义值,也能返回错误。但是,在调用进程中不能既接收自定义值,又接收错误。调用者仅能接收其中一个。从工作流中返回非 nil error 表明在它的执行期间遇到错误,工作流执行应该被终止,任何自定义返回值将被系统忽略。

工作流类型

工作流拥有被称为工作流名称的类型。

下面的例子展示如何为工作流类型(Workflow Type)设置自定义名称。

下面以 Go 为例进行说明。

为定制化工作流类型,当使用 Worker 注册工作流时,使用 RegisterOptions 设置 Name 参数。

工作流逻辑要求

工作流逻辑被 deterministic execution requirements 约束。因此,每种语言被限制为使用某些惯用技术。不过,每种 Temporal SDK 提供一组能够在工作流内部使用的 API,与外部的应用程序代码进行交互。

下面以 Go 为例进行说明。

在 Go 中,工作流定义代码不能直接做如下事情:

Temporal Go SDK 拥有处理等价的 Go 结构的 API:


开发 Activity

工作流做的主要事情之一是编排(orchestrate)Activity 的执行。Activity 是能与世界进行交互的普通函数/方法执行。对于能执行 Activity 的工作流而言,我们必须定义 Activity 定义(Activity Definition)。

下面以 Go 为例进行说明。

在 Temporal Go SDK 编程模型中,Activity 是可导出的(exportable)函数或 struct 方法。

函数

结构体方法

Activity 结构体可以拥有一个以上的方法,每个方法充当单独的 Activity 类型(Activity Type)。写成结构体方法的 Activity 可以使用共享的结构体变量,比如:

因为这是共同需要,本指南的剩余部分展示写成结构体方法的 Activity。

Activity 参数

对 Activity 定义(Activity Definition)支持的参数总数没有显式的限制。但是,被编码进 gRPC 消息负载的最终数据的总大小有限制。

单个参数被限制到最大 2MB。包括所有参数的 gRPC 消息的总大小被限制到最大 4MB。

也请牢记:所有负载数据被记录在工作流执行事件历史(Workflow Execution Event History) 中,庞大的事件历史会影响 Worker 性能。这是因为全部事件历史可能被传输到具有工作流任务(Workflow Task)的 Worker 进程。

有些 SDK 要求你传递上下文对象,有些不需要。当谈及你的应用程序数据(即被序列化及编码进负载的数据)时,我们建议你使用包装了传递给 Activity 应用程序的数据的单个对象作为参数。以便你能在不破坏函数和方法签名的前提下,改变将哪些数据传递给 Activity。

下面以 Go 为例进行说明。

Activity 定义的第一个参数是 context.Context。对于 Activity 定义而言,该参数是可选的。不过建议使用该参数,尤其是当期望 Activity 使用其它 Go SDK API 时。

Activity 定义可以支持多个其它自定义参数。但所有参数都必须可被序列化(参数不能是管道、函数、可变参数或非安全指针),建议传递后期可以更新的单个结构体。

Activity 返回值

从 Activity 返回的所有数据都必须可序列化。

Activity 返回的数据量没有显式的限制。但是牢记所有返回值被记录到工作流执行事件历史(Workflow Execution Event History)中。

下面以 Go 为例进行说明。

基于 Go 的 Activity 定义既能只返回 error,也能返回 customValue, error 组合(与工作流定义相同)。你可能希望使用结构体类型来持有所有自定义值,只需要牢记它们都必须可序列化。


Activity 执行

引发 Activity 执行(Activity Executions)的调用被写在工作流定义(Workflow Definition)中。引发 Activity 执行的调用生成 ScheduleActivityTask 命令。这导致在你的工作流执行事件历史(Workflow Execution Event History)中,生成三个与 Activity Task 相关的事件(ActivityTaskScheduledActivityTaskStarted、ActivityTask[Closed])集合。

Activity 实现的单例被多个并发的 Activity 调用共享。Activity 实现代码必须无状态(stateless)

通过调用参数传递给 Activity 或 Activity 通过结果值返回的值被记录在执行(Execution)历史中。当 Workflow 状态需要恢复(recover)时,全部执行历史被从 Temporal 服务传输到 Workflow Worker。庞大的执行历史可能冲击 Workflow 的性能。

因此,请留意你通过 Activity 调用参数或返回值传输的数据量。除此之外,在 Activity 实现上没有额外的限制存在。

下面以 Go 为例进行说明。

在工作流定义内部使用 ExecuteActivity() API 调用引发 Activity 执行(Activity Execution)。该 API 来自于 go.temporal.io/sdk/workflow 包。

ExecuteActivity() API 调用需要 workflow.Context 实例、Activity 函数名和传递给 Activity 执行的任意变量。

以变量对象(不用加引号)或字符串的形式,提供 Activity 函数名称。

传递实际函数对象的好处是框架可以在 Activity 定义之前验证参数。

ExecuteActivity 调用返回 Future 对象,可以使用 Future 对象获取 Activity 执行的结果。

超时设置

Activity 执行语义依赖多个参数。唯一需要设置的必需值是 Schedule-To-Close TimeoutStart-To-Close Timeout。通过 Activity Options 设置这些值。

获取 Activity 结果

引发 Activity 执行(Activity Execution)的调用生成 ScheduleActivityTask 命令,并给工作流提供一个可等待对象(Awaitable)。工作流执行(Workflow Execution)既可以阻塞执行直到可等待对象的结果可用;也可以继续执行,在结果可用时,再使用它。

下面以 Go 为例进行说明。

ExecuteActivity API 调用返回 workflow.Future 实例,该实例拥有如下两个方法:

调用 workflow.Future 实例的 Get() 方法获取 Activity 执行的结果。结果参数的类型必须匹配 Activity 函数声明的返回值的类型。

先使用 IsReady() 方法可以确保 Get() 调用不会导致工作流执行在结果上等待。

从 Workflow 里面调用多个 Activity 执行是惯例。因此,在 Activity 执行的结果上阻塞或继续执行额外的逻辑,之后再检查 Activity 执行的结果也是惯例。


运行 Worker 进程

Worker Process 是执行 Workflow 函数和 Activity 函数的地方。

Worker 实体是 Worker 进程内部监听特定队列的组件。

尽管多个 Worker 实体可以存在于单个 Worker 进程内,但是单个 Worker 实体可能完全足够。如果想获取更多信息,请查看 Worker tuning guide

Worker 实体既可以包含 Workflow Worker,也可以包含 Activity Worker,以便它既能处理工作流执行,也能处理 Activity 执行。

接下来以 Go 为例进行说明。

通过调用 go.temporal.io/sdk/worker 包中的 worker.New() 创建 Worker 实例,该函数的参数如下:

然后,注册 Worker 能够执行的 Workflow 类型和 Activity 类型。

最后,调用 Worker 实例的 Start()Run() 方法。Run 接受一个中断管道作为参数,以便在终端中停止 Worker。否则,必须调用 Stop() 方法来停止 Worker。

提示:

如果你已经安装 gow,那么当你更新 Worker 文件时,Worker 进程会自动地“重新加载”。

注册类型

监听相同任务队列(Task Queue)名称的全部 Worker 必须被注册为处理相同的 Workflow 类型(Workflow Type)和 Activity 类型(Activity Type)。

如果 Worker 轮询到一个任务,但是它不知道该任务的 Workflow 类型或 Activity 类型,那么它使该任务失败。但是,任务的失败不会导致相关的工作流执行(Workflow Execution)失败。

下面以 Go 为例进行说明。

在工作进程内部,RegisterWorkflow()RegisterActivity() 调用实际上会在内存中创建 Workflow 类型和它们的实现之间的映射。

注册 Activity 结构体

按照 Activity 定义(Activity Definition)的最佳实践,你可能有一个拥有多个方法和字段的 Activity 结构体。当你为 Activity 结构体使用 RegisterActivity() 时,Worker 有权访问所有可导出(exportable)方法。

注册多个类型

为给 Worker 实体注册多个 Activity 类型和/或 Workflow 类型,只需要执行多次 Activity 注册调用,但是需要确保每个类型名唯一:


启动工作流执行

工作流执行(Workflow Execution)语义依赖多个参数 - 也就是说,为启动工作流执行,你必须提供(Worker 轮询的)任务队列、工作流类型、语言特定的上下文数据和工作流函数(Workflow Function)参数。

在下面的例子中,使用 Temporal 客户端启动所有工作流执行。为从其它工作流执行内部引发工作流执行,使用 Child Workflow 或外部工作流 API。

查看自定义工作流类型部分,以查看如何自定义工作流类型的名称。

引发工作流执行的请求导致 Temporal 集群在工作流执行事件历史(Workflow Execution Event History)中创建第一个事件(WorkflowExecutionStarted)。Temporal 集群接着创建导致第一个 WorkflowTaskScheduled 事件的第一个工作流任务。

下面以 Go 为例进行说明。

使用 Go SDK ClientExecuteWorkflow() 方法触发工作流执行(Workflow Execution)。

ExecuteWorkflow() API 调用需要 context.Context 实例、StartWorkflowOptions 实例、工作流类型名和传递给工作流执行的所有变量。ExecuteWorkflow() 调用返回 Future 对象,使用该对象可以获取工作流执行的结果。

如果调用进程有权直接访问函数,那么像函数名称是变量一样传递工作流类型名称参数,不需要使用引号引用。

如果调用进程无权直接访问静态定义的工作流定义(Workflow Definition),比如,工作流定义在未导入的包中,或者它完全是用不同语言写的,那么可以通过 string 的形式,提供工作流类型。

设置任务队列

在大多数 SDK 中,唯一必须设置的 Workflow Option 是任务队列(Task Queue)的名称。

对于要执行的任何代码,必须运行包含轮询相同任务队列名称的 Worker 实体的 Worker 进程。

下面以 Go 为例进行说明。

go.temporal.io/sdk/client 包创建 StartWorkflowOptions 实例,设置 TaskQueue 字段,并把该实例传递给 ExecuteWorkflow 调用。

Workflow Id

虽然 Workflow Id 不是必需的,但是我们建议提供你自己的、映射到业务流程或业务实体标识的 Workflow Id,比如订单标识或客户标识。

下面以 Go 为例进行说明。

go.temporal.io/sdk/client 包创建 StartWorkflowOptions 实例,设置 ID 字段,然后将该实例传递给 ExecuteWorkflow 调用。

获取工作流结果

如果启动工作流执行的调用成功,那么你将获得访问工作流执行的 Run Id 的权限。

Workflow Id、Run Id 和 Namespace 用于唯一地标识系统中的工作流执行,以及获取它的结果。

既可以在结果上阻塞执行(同步的执行),也可以在某些其它时间点获取结果(异步的执行)。

在 Temporal 平台,也可以使用 Queries 访问工作流执行的状态和结果。

下面以 Go 为例进行说明。

ExecuteWorkflow 调用返回 WorkflowRun 实例。

WorkflowRun 实例拥有如下三个方法:

为在调用工作流执行的相同进程中等待它的结果,调用 ExecuteWorkflow() 调用返回的 WorkflowRun 实例的 Get() 方法。

可以从完全不同的进程中获取工作流执行的结果。仅需 Workflow Id。(如果超过一个关闭的工作流执行拥有相同的 Workflow Id,那么 Run Id 是可选的。)只要工作流执行事件历史(Workflow Execution Event History)保留在系统中,那么工作流执行的结果就可用。

调用 Go SDK Client 实例的 GetWorkflow() 方法,并且给它传递用于引发工作流执行的 Workflow Id。然后调用 WorkflowRun 实例的 Get() 方法,给 Get() 方法传递一个用于填充结果的指针。

获取最近的完成结果

Temporal Cron Job 场景下,你可能需要获取前一次工作流运行(Workflow Run,代表被启动的非子工作流)的结果,并在当前的工作流运行中使用它。

为完成该操作,直接在你的 Workflow 代码中,使用来自于 go.temporal.io/sdk/workflow 包的 HasLastCompletionResultGetLastCompletionResult API。

即便某个 cron 工作流运行失败,这段代码仍能生效。下一个工作流运行获取最近的成功地完成的工作流运行的结果。