使用 Temporal SDK 可以很方便地开发 Worker 程序。下面是一个 Go 示例程序。
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "your_task_queue_name", worker.Options{})
w.RegisterWorkflow( YourWorkflowDefinition )
w.RegisterActivity( YourActivityDefinition )
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
其中 Worker Entity 的 Run()
方法封装 Poll Task、Call Function、Repond Completed 等一些列操作。接下来,我们使用 Temporal Client API 模拟 Activity Worker Entity 的 Run()
方法。
下面通过一个示例进行说明。
项目结构:
.
├── external_activity_worker.go
├── go.mod
├── go.sum
├── worker
│ └── workflow_worker.go
└── workflow
└── handle_name.go
go.mod:
module external-activity-worker-demo
go 1.19
require (
go.temporal.io/api v1.11.1-0.20220907050538-6de5285cf463
go.temporal.io/sdk v1.17.0
)
external_activity_worker.go:
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"log"
"time"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
Identity: "external-activity-worker",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
request := workflowservice.PollActivityTaskQueueRequest{
Namespace: "default",
TaskQueue: &taskqueue.TaskQueue{
Name: "say_hello",
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
Identity: "external-activity-worker",
}
for {
ctx, _ := context.WithTimeout(context.Background(), 20*time.Second)
start := time.Now()
response, err := c.WorkflowService().PollActivityTaskQueue(ctx, &request)
if err != nil {
panic(err)
}
fmt.Printf("time elapsed %s\n", time.Since(start))
// 等到超时,没拉取到任务
if response.WorkflowExecution == nil {
continue
}
// 打印任务信息
fmt.Printf("Workflow Id = %s\n", response.WorkflowExecution.WorkflowId)
fmt.Printf("Run Id = %s\n", response.WorkflowExecution.RunId)
fmt.Printf("Activity Id = %s\n", response.ActivityId)
fmt.Printf("Base64(Task Token) = %s\n", base64.StdEncoding.EncodeToString(response.TaskToken))
for i := 0; i < 20; i++ {
recordActivityTaskHeartbeatByIdRequest := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{
Namespace: response.WorkflowNamespace,
WorkflowId: response.WorkflowExecution.WorkflowId,
RunId: response.WorkflowExecution.RunId,
ActivityId: response.ActivityId,
Details: nil,
}
heartbeatContext, _ := context.WithTimeout(context.Background(), 10*time.Second)
_, err := c.WorkflowService().RecordActivityTaskHeartbeatById(
heartbeatContext,
recordActivityTaskHeartbeatByIdRequest,
)
if err != nil {
panic(err)
}
fmt.Printf("hearbeat sent %d times\n", i)
time.Sleep(time.Second)
}
data := response.Input.Payloads[0].Data
fmt.Printf("arguments: %s\n", string(data))
b, _ := json.Marshal(map[string]interface{}{
"message": fmt.Sprintf("Hello, %s!", string(data)),
})
responseRequest := workflowservice.RespondActivityTaskCompletedByIdRequest{
Namespace: response.WorkflowNamespace,
WorkflowId: response.WorkflowExecution.WorkflowId,
RunId: response.WorkflowExecution.RunId,
ActivityId: response.ActivityId,
Result: &common.Payloads{Payloads: []*common.Payload{
{
Data: b,
Metadata: map[string][]byte{
"encoding": []byte(converter.MetadataEncodingJSON),
},
},
}},
}
recordCompleteContext, _ := context.WithTimeout(context.Background(), 10*time.Second)
if _, err = c.WorkflowService().RespondActivityTaskCompletedById(
recordCompleteContext,
&responseRequest,
); err != nil {
panic(err)
}
}
}
worker/workflow_worker.go:
package main
import (
"external-activity-worker-demo/workflow"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "handle_name", worker.Options{})
w.RegisterWorkflow(workflow.HandleName)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
workflow/handle_name.go:
package workflow
import (
"fmt"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)
func HandleName(ctx workflow.Context, name string) (map[string]interface{}, error) {
fmt.Println("HandleName")
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
options := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
RetryPolicy: retryPolicy,
HeartbeatTimeout: 20 * time.Second,
TaskQueue: "say_hello",
}
ctx = workflow.WithActivityOptions(ctx, options)
f := workflow.ExecuteActivity(ctx, "say_hello", name)
res := make(map[string]interface{})
err := f.Get(ctx, &res)
if err != nil {
fmt.Printf("failed to say hello to %s, because %s\n", name, err)
return nil, err
}
return res, nil
}
可以看到,通过:
Client.WorkflowService().PollActivityTaskQueue()
从 Activity Task Queue 中拾取 Activity Task(包括 Activity Type、Input 等)Client.WorkflowService().RecordActivityTaskHeartbeatById()
可以向 Temporal 集群 Report HeartbeatClient.WorkflowService().RespondActivityTaskCompletedById()
可以向 Temporal 集群响应 Activity Execution 的结果我们将生成工作流执行;获取打开的工作流执行列表;获取工作流执行的进展;从 Activity 任务队列中拾取任务;向 Temporal 集群发送心跳;向 Temporal 集群响应执行结果等 Client API 封装到 Rest Service 里,通过Rest JSON API 将它们暴漏给外部用户。因为该服务是进入 Temporal 集群的门面,所以称之为 Facade Service。
引入 Facade Service 后,系统架构如下:
引入 Facade 带来的好处包括但不限于:
计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决。
Temporal 为方便开发,给我们提供各种语言的 SDK。在当前的设计中,Temporal 已作为基础设施,被我们隐藏起来,用户无法感知其存在。因此我们提供自己的 SDK,方便用户使用 Facade Service。
下面是一个简单的 Facade Service 示例。
项目结构:
x.
├── go.mod
└── main.go
go.mod:
xxxxxxxxxx
module facade-service
go 1.19
require (
github.com/gin-gonic/gin v1.8.1
go.temporal.io/api v1.11.1-0.20220907050538-6de5285cf463
go.temporal.io/sdk v1.17.0
)
main.go:
xxxxxxxxxx
package main
import (
"context"
"encoding/base64"
"github.com/gin-gonic/gin"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"net/http"
"time"
)
var TemporalClient client.Client
func init() {
var err error
TemporalClient, err = client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
panic(err)
}
}
func setUpRouter() *gin.Engine {
r := gin.Default()
r.GET("/poll-activity-task", func(c *gin.Context) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 20*time.Second)
defer cancelFunc()
request := &workflowservice.PollActivityTaskQueueRequest{
Namespace: "default",
TaskQueue: &taskqueue.TaskQueue{
Name: "say_hello",
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
Identity: "external-activity-worker",
}
response, err := TemporalClient.WorkflowService().PollActivityTaskQueue(ctx, request)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
m := gin.H{
"workflow_id": "",
"run_id": "",
"activity_id": "",
"task_token": "",
}
if response.WorkflowExecution != nil {
m["workflow_id"] = response.WorkflowExecution.WorkflowId
m["run_id"] = response.WorkflowExecution.RunId
m["activity_id"] = response.ActivityId
m["task_token"] = base64.StdEncoding.EncodeToString(response.TaskToken)
}
c.JSON(http.StatusOK, m)
})
r.GET("/heartbeat", func(c *gin.Context) {
workflowId, _ := c.GetQuery("workflow_id")
runId, _ := c.GetQuery("run_id")
activityId, _ := c.GetQuery("activity_id")
request := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{
Namespace: "default",
WorkflowId: workflowId,
RunId: runId,
ActivityId: activityId,
Details: nil,
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
if _, err := TemporalClient.WorkflowService().RecordActivityTaskHeartbeatById(ctx, request); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{})
})
r.GET("/complete", func(c *gin.Context) {
workflowId, _ := c.GetQuery("workflow_id")
runId, _ := c.GetQuery("run_id")
activityId, _ := c.GetQuery("activity_id")
request := &workflowservice.RespondActivityTaskCompletedByIdRequest{
Namespace: "default",
WorkflowId: workflowId,
RunId: runId,
ActivityId: activityId,
Result: &common.Payloads{Payloads: []*common.Payload{
{
Data: []byte("{\"a\": 1}"),
Metadata: map[string][]byte{
"encoding": []byte(converter.MetadataEncodingJSON),
},
},
}},
}
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
if _, err := TemporalClient.WorkflowService().RespondActivityTaskCompletedById(ctx, request); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{})
})
return r
}
func main() {
r := setUpRouter()
if err := r.Run(":9090"); err != nil {
panic(err)
}
}
在 Temporal 中,工作流定义(Workflow Definition)是代码。如果想新增或修改工作流定义,那么将新增或修改 Workflow Worker。为此,我们引入 Workflow DSL(Domain Specific Language),使整个系统中只有一个工作流定义,该工作流定义根据输入参数,决定生成哪些 Activity,以及如何生成 Activity。
引入 Workflow DSL 后,系统架构如下:
想要新增或修改工作流定义时,只需要新增或修改 Workflow DSL 即可,无需上线或新增 Workflow Worker。
下面是一个简单的 DSL Workflow 示例。
项目结构:
x├── activity
│ ├── activity_one.go
│ ├── activity_three.go
│ └── activity_two.go
├── go.mod
├── worker
│ ├── activity_one_worker.go
│ ├── activity_three_worker.go
│ ├── activity_two_worker.go
│ └── dsl_workflow_worker.go
└── workflow
└── dsl_workflow.go
go.mod:
xxxxxxxxxx
module hello-temporal
go 1.19
require go.temporal.io/sdk v1.17.0
activity/activity_one.go:
x
package activity
import "context"
func ActivityOne(ctx context.Context) (string, error) {
return "activity one", nil
}
activity/activity_two.go 和 activity/activity_three.go 与 activity/activity_one.go 类似,分别定义 ActivityTwo、ActivityThree 函数。
worker/activity_one_worker.go:
x
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"hello-temporal/activity"
"log"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "ActivityOne", worker.Options{})
w.RegisterActivity(activity.ActivityOne)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
worker/activity_two_worker.go 和 worker/activity_two_worker.go 与 worker/activity_one_worker.go 类似,分别注册 ActivityTwo 和 ActivityThree。
workflow/dsl_worker.go:
xxxxxxxxxx
package workflow
import (
"fmt"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)
var Workflows = map[string][]string{
"workflow-1-2": {"ActivityOne", "ActivityTwo"},
"workflow-all": {"ActivityOne", "ActivityTwo", "ActivityThree"},
}
func DSLWorkflow(ctx workflow.Context, name string) ([]string, error) {
fmt.Println("DSLWorkflow")
// 用 SideEffect 防止修改 DSL 导致 non-deterministic 错误
activitiesEncoded := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
activities, found := Workflows[name]
if !found {
return nil
}
return activities
})
activities := make([]string, 0)
if err := activitiesEncoded.Get(&activities); err != nil {
return nil, err
}
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
options := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Minute,
RetryPolicy: retryPolicy,
HeartbeatTimeout: 20 * time.Second,
}
var results []string
// 逐个执行任务
for _, activityName := range activities {
options.TaskQueue = activityName
ctx = workflow.WithActivityOptions(ctx, options)
f := workflow.ExecuteActivity(ctx, activityName)
result := new(string)
if err := f.Get(ctx, result); err != nil {
return nil, err
}
results = append(results, *result)
}
return results, nil
}
worker/dsl_workflow_worker.go:
xxxxxxxxxx
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"hello-temporal/workflow"
"log"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "dsl_workflow", worker.Options{})
w.RegisterWorkflow(workflow.DSLWorkflow)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}