在 http://timd.cn/temporal/demo/ 的基础上,进行修改。
将 activity/say_hello.go 的内容改为:
package activity
import (
"context"
"fmt"
)
func SayHello(ctx context.Context, name string, startTime int64) (string, error) {
msg := fmt.Sprintf("Hello, %s! start time is %d.", name, startTime)
return msg, nil
}
将 workflow/handle_name.go 的内容改为:
package workflow
import (
"encoding/json"
"fmt"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"hello-temporal/activity"
"time"
)
func HandleName(ctx workflow.Context, name string) (string, error) {
startTime := time.Now().UnixMilli()
searchAttributes := workflow.GetInfo(ctx).SearchAttributes
if searchAttributes != nil {
scheduledStartTime, found := searchAttributes.IndexedFields["TemporalScheduledStartTime"]
if found {
var s string
if err := json.Unmarshal(scheduledStartTime.GetData(), &s); err != nil {
return "", err
}
loc, _ := time.LoadLocation("Asia/Shanghai") //设置时区
tt, err := time.ParseInLocation("2006-01-02T15:04:05Z", s, loc)
if err != nil {
return "", err
}
startTime = tt.UnixMilli()
fmt.Println("retrieve scheduled start time from search attribute")
}
}
fmt.Printf("scheduled start time: %d\n", startTime)
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, options)
f := workflow.ExecuteActivity(ctx, activity.SayHello, name, startTime)
res := new(string)
err := f.Get(ctx, res)
if err != nil {
fmt.Printf("failed to say hello to %s, because %s\n", name, err)
return "", err
}
return *res, nil
}
将 main.go 的内容改为:
package main
import (
"context"
"fmt"
"github.com/google/uuid"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/schedule/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"log"
"time"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
interval := 15 * time.Second
request := &workflowservice.CreateScheduleRequest{
RequestId: uuid.New().String(),
Namespace: "default",
ScheduleId: "go-sdk-schedule-id",
Schedule: &schedule.Schedule{
Spec: &schedule.ScheduleSpec{
Interval: []*schedule.IntervalSpec{
{Interval: &interval},
},
TimezoneName: "Asia/Shanghai",
},
Action: &schedule.ScheduleAction{
Action: &schedule.ScheduleAction_StartWorkflow{
StartWorkflow: &workflow.NewWorkflowExecutionInfo{
WorkflowId: "go-sdk-schedule-workflow-id",
WorkflowType: &common.WorkflowType{Name: "HandleName"},
TaskQueue: &taskqueue.TaskQueue{Name: "test_task_queue_name"},
Input: &common.Payloads{
Payloads: []*common.Payload{
{
Metadata: map[string][]byte{
"encoding": []byte(converter.MetadataEncodingJSON),
},
Data: []byte("\"Tim\""),
},
},
},
},
},
},
},
}
if response, err := c.WorkflowService().CreateSchedule(context.Background(), request); err != nil {
panic(err)
} else {
fmt.Println(response.String())
}
}
执行测试项目中的 main.go,在 Temporal Web UI 上可以看到:
假设使用 docker compose 启动开发环境,那么可以使用类似下面的命令创建 Schedule:
docker exec temporal-admin-tools tctl schedule create --schedule-id tctl-workflow-id --workflow-id tctl-workflow-id --type HandleName --task-queue test_task_queue_name --input '"Tim"' --cron '* * * * *'
# 输出:Schedule created
在 Temporal Web UI 上可以看到:
点击左侧导航栏的 Schedules,然后点击页面右上角的 Create Schedule 按钮。
Web UI 支持的参数较少,许多特性都不支持,比如给 Workflow 指定 input 参数、设置允许的操作总数等。