Temporal 官网的说明在:https://docs.temporal.io/server/production-deployment/#faq-autoscaling-workers-based-on-task-queue-load。
翻译如下:
Temporal 尚未支持返回任务队列中的任务数量。主要的技术障碍是每个任务可以有它自己的 ScheduleToStart 超时,因此仅计算添加和消费多少任务是不够的。
这就是我们建议追踪 schedule_to_start_latency 来确定任务队列是否有积压(对于给定的任务队列 Workflow 和 Activity Worker 配置不足)的原因。我们计划未来添加一些特性,提供更多对任务队列状态的可见性。
下面通过示例进行说明。
项目结构如下:
.├── go.mod├── go.sum├── main.go├── worker│ └── main.go└── workflow├── autoscaling_workflow.go└── model.go
go.mod:
module autoscaling-demo
go 1.19
require ( github.com/google/uuid v1.3.0 go.temporal.io/sdk v1.17.0)main.go:
x
package main
import ( "autoscaling-demo/workflow" "context" "fmt" "github.com/google/uuid" "go.temporal.io/sdk/client" "log" "time")
func main() { c, err := client.Dial(client.Options{ HostPort: "127.0.0.1:7233", Namespace: "default", }) if err != nil { log.Fatalln("unable to create Temporal client", err) } defer c.Close()
options := client.StartWorkflowOptions{ ID: fmt.Sprintf("autoscaling-workflow-%s", uuid.New().String()), TaskQueue: "autoscaling_workflow", } workflowRun, err := c.ExecuteWorkflow( context.Background(), options, workflow.AutoscalingWorkflowDefinition, &workflow.AutoscalingWorkflowParam{ SleepFor: 3, Name: "Tim", }) if err != nil { log.Printf("failed to ExecuteWorkflow err: %s\n", err.Error()) return } log.Printf("Workflow Id: %s\n", workflowRun.GetID()) log.Printf("Run Id: %s\n", workflowRun.GetRunID())
queryWorkflowWithOptionsContext, cancelFunc := context.WithTimeout(context.Background(), time.Minute) defer cancelFunc()
for i := 0; i < 4; i++ { queryWorkflowWithOptionsResponse, err := c.QueryWorkflowWithOptions(queryWorkflowWithOptionsContext, &client.QueryWorkflowWithOptionsRequest{ WorkflowID: workflowRun.GetID(), RunID: workflowRun.GetRunID(), QueryType: "current_state", }) if err != nil { log.Fatal(err) } var currentState string if err := queryWorkflowWithOptionsResponse.QueryResult.Get(¤tState); err != nil { log.Fatal(err) } fmt.Println("current state:", currentState) time.Sleep(time.Second) }
res := make(map[string]interface{}) if err := workflowRun.Get(context.Background(), &res); err != nil { log.Fatalln("failed to execute Workflow", err) } log.Printf("result: %v\n", res)}worker/main.go:
package main
import ( "autoscaling-demo/workflow" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "log")
func main() { c, err := client.Dial(client.Options{ HostPort: "127.0.0.1:7233", Namespace: "default", }) if err != nil { log.Fatalln("unable to create Temporal client", err) } defer c.Close()
w := worker.New(c, "autoscaling_workflow", worker.Options{}) w.RegisterWorkflow(workflow.AutoscalingWorkflowDefinition) w.RegisterActivity(workflow.SayHello) err = w.Run(worker.InterruptCh()) if err != nil { log.Fatalln("unable to start Worker", err) }}workflow/autoscaling_workflow.go:
package workflow
import ( "context" "fmt" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "time")
func AutoscalingWorkflowDefinition(ctx workflow.Context, param *AutoscalingWorkflowParam) (map[string]interface{}, error) { currentState := "started" // This could be any serializable struct
queryType := "current_state" if err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) { return currentState, nil }); err != nil { currentState = "failed to register query handler" return nil, err }
currentState = "waiting timer" if err := workflow.NewTimer(ctx, time.Duration(param.SleepFor)*time.Second).Get(ctx, nil); err != nil { currentState = "timer failed" return nil, err }
retryPolicy := &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 500, } sayHelloOptions := workflow.ActivityOptions{ RetryPolicy: retryPolicy, ScheduleToCloseTimeout: 30 * time.Minute, } sayHelloContext := workflow.WithActivityOptions(ctx, sayHelloOptions) currentState = "waiting activity" sayHelloFuture := workflow.ExecuteActivity(sayHelloContext, SayHello, param.Name) sayHelloResult := new(string) err := sayHelloFuture.Get(ctx, &sayHelloResult) if err != nil { currentState = "activity failed" return nil, err }
currentState = "done" return map[string]interface{}{ "SleepFor": param.SleepFor, "SayHello": sayHelloResult, }, nil}
func SayHello(ctx context.Context, name string) (string, error) { msg := fmt.Sprintf("Hello, %s!", name) fmt.Printf("Activity::SayHello - %s\n", msg) return msg, nil}workflow/model.go:
x
package workflow
type AutoscalingWorkflowParam struct { SleepFor uint `json:"sleep_for"` Name string `json:"name"`}启动 Worker:
go run worker/main.go 引发工作流执行:
go run main.go得到类似下面的输出:
XXX Workflow Id: autoscaling-workflow-4bd8fd21-3a71-40c8-bd86-bccd25070d3eXXX Run Id: d7b66047-c8c5-4663-a337-1b43d5ba0df5current state: waiting timercurrent state: waiting timercurrent state: waiting timercurrent state: waiting activityXXX result: map[SayHello:Hello, Tim! SleepFor:3]
上面的例子说明:通过 Query,可以获取工作流执行执行已推进到哪个 Activity。请查看官方文档获取更多关于 Query 的细节。
下面通过示例进行说明。
list_open_workflow_execution.go:
xxxxxxxxxxpackage main
import ( "context" "fmt" "go.temporal.io/api/filter/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "log" "time")
func main() { c, err := client.Dial(client.Options{ HostPort: "127.0.0.1:7233", Namespace: "default", }) if err != nil { log.Fatalln("unable to create Temporal client", err) } defer c.Close()
listOpenWorkflowExecutionContext, cancelFunc := context.WithTimeout(context.Background(), time.Minute) defer cancelFunc() listOpenWorkflowExecutionRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ Namespace: "default", MaximumPageSize: 1, Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ TypeFilter: &filter.WorkflowTypeFilter{Name: "AutoscalingWorkflowDefinition"}, }, }
for { listOpenWorkflowExecutionResponse, err := c.ListOpenWorkflow( listOpenWorkflowExecutionContext, listOpenWorkflowExecutionRequest, ) if err != nil { panic(err) }
if listOpenWorkflowExecutionResponse != nil && listOpenWorkflowExecutionResponse.Executions != nil { for _, execution := range listOpenWorkflowExecutionResponse.Executions { workflowId := execution.Execution.WorkflowId runId := execution.Execution.RunId status := execution.Status fmt.Printf("Workflow Id: %s, Run Id: %s, status: %d\n", workflowId, runId, status) } listOpenWorkflowExecutionRequest.NextPageToken = listOpenWorkflowExecutionResponse.NextPageToken } else { break } }}然后通过程序或 tctl 引发若干工作流执行。最终得到类似下面的输出:
xWorkflow Id: autoscaling-workflow-8674ad58-6fe4-4f4e-942f-6b33b5955ae7, Run Id: 7cad7413-6cf2-4445-8446-66bae321dffe, status: 1Workflow Id: autoscaling-workflow-45d51ed0-00e1-4203-be6b-f504e5cb968e, Run Id: 3c20c391-e85b-4d9a-9cec-7fe4ca7ebae5, status: 1Workflow Id: autoscaling-workflow-ce865963-b6c5-4ce2-973e-126fdd02ff70, Run Id: 9f37d87f-2419-45b5-bd56-8c34e203c995, status: 1