在 http://timd.cn/temporal/demo/ 的基础上,进行修改。
将 activity/say_hello.go 的内容改为:
package activity
import ( "context" "fmt")
func SayHello(ctx context.Context, name string, startTime int64) (string, error) { msg := fmt.Sprintf("Hello, %s! start time is %d.", name, startTime) return msg, nil}将 workflow/handle_name.go 的内容改为:
package workflow
import ( "encoding/json" "fmt" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "hello-temporal/activity" "time")
func HandleName(ctx workflow.Context, name string) (string, error) { startTime := time.Now().UnixMilli() searchAttributes := workflow.GetInfo(ctx).SearchAttributes if searchAttributes != nil { scheduledStartTime, found := searchAttributes.IndexedFields["TemporalScheduledStartTime"] if found { var s string if err := json.Unmarshal(scheduledStartTime.GetData(), &s); err != nil { return "", err } loc, _ := time.LoadLocation("Asia/Shanghai") //设置时区 tt, err := time.ParseInLocation("2006-01-02T15:04:05Z", s, loc) if err != nil { return "", err } startTime = tt.UnixMilli() fmt.Println("retrieve scheduled start time from search attribute") } } fmt.Printf("scheduled start time: %d\n", startTime)
retryPolicy := &temporal.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2.0, MaximumInterval: time.Minute, MaximumAttempts: 500, } options := workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, RetryPolicy: retryPolicy, }
ctx = workflow.WithActivityOptions(ctx, options) f := workflow.ExecuteActivity(ctx, activity.SayHello, name, startTime)
res := new(string) err := f.Get(ctx, res) if err != nil { fmt.Printf("failed to say hello to %s, because %s\n", name, err) return "", err } return *res, nil}将 main.go 的内容改为:
package main
import ( "context" "fmt" "github.com/google/uuid" "go.temporal.io/api/common/v1" "go.temporal.io/api/schedule/v1" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" "log" "time")
func main() { c, err := client.Dial(client.Options{ HostPort: "127.0.0.1:7233", Namespace: "default", }) if err != nil { log.Fatalln("unable to create Temporal client", err) } defer c.Close()
interval := 15 * time.Second request := &workflowservice.CreateScheduleRequest{ RequestId: uuid.New().String(), Namespace: "default", ScheduleId: "go-sdk-schedule-id", Schedule: &schedule.Schedule{ Spec: &schedule.ScheduleSpec{ Interval: []*schedule.IntervalSpec{ {Interval: &interval}, }, TimezoneName: "Asia/Shanghai", }, Action: &schedule.ScheduleAction{ Action: &schedule.ScheduleAction_StartWorkflow{ StartWorkflow: &workflow.NewWorkflowExecutionInfo{ WorkflowId: "go-sdk-schedule-workflow-id", WorkflowType: &common.WorkflowType{Name: "HandleName"}, TaskQueue: &taskqueue.TaskQueue{Name: "test_task_queue_name"}, Input: &common.Payloads{ Payloads: []*common.Payload{ { Metadata: map[string][]byte{ "encoding": []byte(converter.MetadataEncodingJSON), }, Data: []byte("\"Tim\""), }, }, }, }, }, }, }, } if response, err := c.WorkflowService().CreateSchedule(context.Background(), request); err != nil { panic(err) } else { fmt.Println(response.String()) }}执行测试项目中的 main.go,在 Temporal Web UI 上可以看到:

假设使用 docker compose 启动开发环境,那么可以使用类似下面的命令创建 Schedule:
docker exec temporal-admin-tools tctl schedule create --schedule-id tctl-workflow-id --workflow-id tctl-workflow-id --type HandleName --task-queue test_task_queue_name --input '"Tim"' --cron '* * * * *'# 输出:Schedule created在 Temporal Web UI 上可以看到:

点击左侧导航栏的 Schedules,然后点击页面右上角的 Create Schedule 按钮。
Web UI 支持的参数较少,许多特性都不支持,比如给 Workflow 指定 input 参数、设置允许的操作总数等。