Temporal 官网的说明在:https://docs.temporal.io/server/production-deployment/#faq-autoscaling-workers-based-on-task-queue-load。
翻译如下:
Temporal 尚未支持返回任务队列中的任务数量。主要的技术障碍是每个任务可以有它自己的 ScheduleToStart
超时,因此仅计算添加和消费多少任务是不够的。
这就是我们建议追踪 schedule_to_start_latency
来确定任务队列是否有积压(对于给定的任务队列 Workflow 和 Activity Worker 配置不足)的原因。我们计划未来添加一些特性,提供更多对任务队列状态的可见性。
下面通过示例进行说明。
项目结构如下:
.
├── go.mod
├── go.sum
├── main.go
├── worker
│ └── main.go
└── workflow
├── autoscaling_workflow.go
└── model.go
go.mod:
module autoscaling-demo
go 1.19
require (
github.com/google/uuid v1.3.0
go.temporal.io/sdk v1.17.0
)
main.go:
x
package main
import (
"autoscaling-demo/workflow"
"context"
"fmt"
"github.com/google/uuid"
"go.temporal.io/sdk/client"
"log"
"time"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: fmt.Sprintf("autoscaling-workflow-%s", uuid.New().String()),
TaskQueue: "autoscaling_workflow",
}
workflowRun, err := c.ExecuteWorkflow(
context.Background(),
options,
workflow.AutoscalingWorkflowDefinition,
&workflow.AutoscalingWorkflowParam{
SleepFor: 3,
Name: "Tim",
})
if err != nil {
log.Printf("failed to ExecuteWorkflow err: %s\n", err.Error())
return
}
log.Printf("Workflow Id: %s\n", workflowRun.GetID())
log.Printf("Run Id: %s\n", workflowRun.GetRunID())
queryWorkflowWithOptionsContext, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
for i := 0; i < 4; i++ {
queryWorkflowWithOptionsResponse, err := c.QueryWorkflowWithOptions(queryWorkflowWithOptionsContext,
&client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowRun.GetID(),
RunID: workflowRun.GetRunID(),
QueryType: "current_state",
})
if err != nil {
log.Fatal(err)
}
var currentState string
if err := queryWorkflowWithOptionsResponse.QueryResult.Get(¤tState); err != nil {
log.Fatal(err)
}
fmt.Println("current state:", currentState)
time.Sleep(time.Second)
}
res := make(map[string]interface{})
if err := workflowRun.Get(context.Background(), &res); err != nil {
log.Fatalln("failed to execute Workflow", err)
}
log.Printf("result: %v\n", res)
}
worker/main.go:
package main
import (
"autoscaling-demo/workflow"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"log"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, "autoscaling_workflow", worker.Options{})
w.RegisterWorkflow(workflow.AutoscalingWorkflowDefinition)
w.RegisterActivity(workflow.SayHello)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
workflow/autoscaling_workflow.go:
package workflow
import (
"context"
"fmt"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)
func AutoscalingWorkflowDefinition(ctx workflow.Context,
param *AutoscalingWorkflowParam) (map[string]interface{}, error) {
currentState := "started" // This could be any serializable struct
queryType := "current_state"
if err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
}); err != nil {
currentState = "failed to register query handler"
return nil, err
}
currentState = "waiting timer"
if err := workflow.NewTimer(ctx,
time.Duration(param.SleepFor)*time.Second).Get(ctx, nil); err != nil {
currentState = "timer failed"
return nil, err
}
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
sayHelloOptions := workflow.ActivityOptions{
RetryPolicy: retryPolicy,
ScheduleToCloseTimeout: 30 * time.Minute,
}
sayHelloContext := workflow.WithActivityOptions(ctx, sayHelloOptions)
currentState = "waiting activity"
sayHelloFuture := workflow.ExecuteActivity(sayHelloContext, SayHello, param.Name)
sayHelloResult := new(string)
err := sayHelloFuture.Get(ctx, &sayHelloResult)
if err != nil {
currentState = "activity failed"
return nil, err
}
currentState = "done"
return map[string]interface{}{
"SleepFor": param.SleepFor,
"SayHello": sayHelloResult,
}, nil
}
func SayHello(ctx context.Context, name string) (string, error) {
msg := fmt.Sprintf("Hello, %s!", name)
fmt.Printf("Activity::SayHello - %s\n", msg)
return msg, nil
}
workflow/model.go:
x
package workflow
type AutoscalingWorkflowParam struct {
SleepFor uint `json:"sleep_for"`
Name string `json:"name"`
}
启动 Worker:
go run worker/main.go
引发工作流执行:
go run main.go
得到类似下面的输出:
XXX Workflow Id: autoscaling-workflow-4bd8fd21-3a71-40c8-bd86-bccd25070d3e
XXX Run Id: d7b66047-c8c5-4663-a337-1b43d5ba0df5
current state: waiting timer
current state: waiting timer
current state: waiting timer
current state: waiting activity
XXX result: map[SayHello:Hello, Tim! SleepFor:3]
上面的例子说明:通过 Query,可以获取工作流执行执行已推进到哪个 Activity。请查看官方文档获取更多关于 Query 的细节。
下面通过示例进行说明。
list_open_workflow_execution.go:
xxxxxxxxxx
package main
import (
"context"
"fmt"
"go.temporal.io/api/filter/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"log"
"time"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
listOpenWorkflowExecutionContext, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
listOpenWorkflowExecutionRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: "default",
MaximumPageSize: 1,
Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{
TypeFilter: &filter.WorkflowTypeFilter{Name: "AutoscalingWorkflowDefinition"},
},
}
for {
listOpenWorkflowExecutionResponse, err := c.ListOpenWorkflow(
listOpenWorkflowExecutionContext,
listOpenWorkflowExecutionRequest,
)
if err != nil {
panic(err)
}
if listOpenWorkflowExecutionResponse != nil && listOpenWorkflowExecutionResponse.Executions != nil {
for _, execution := range listOpenWorkflowExecutionResponse.Executions {
workflowId := execution.Execution.WorkflowId
runId := execution.Execution.RunId
status := execution.Status
fmt.Printf("Workflow Id: %s, Run Id: %s, status: %d\n", workflowId, runId, status)
}
listOpenWorkflowExecutionRequest.NextPageToken = listOpenWorkflowExecutionResponse.NextPageToken
} else {
break
}
}
}
然后通过程序或 tctl
引发若干工作流执行。最终得到类似下面的输出:
xWorkflow Id: autoscaling-workflow-8674ad58-6fe4-4f4e-942f-6b33b5955ae7, Run Id: 7cad7413-6cf2-4445-8446-66bae321dffe, status: 1
Workflow Id: autoscaling-workflow-45d51ed0-00e1-4203-be6b-f504e5cb968e, Run Id: 3c20c391-e85b-4d9a-9cec-7fe4ca7ebae5, status: 1
Workflow Id: autoscaling-workflow-ce865963-b6c5-4ce2-973e-126fdd02ff70, Run Id: 9f37d87f-2419-45b5-bd56-8c34e203c995, status: 1