Go gRPC 客户端常用设置

概述

本文将讲述:


健康检查

如果启用健康检查,那么客户端在负载均衡时,将只对健康的 SubChannel 发起请求,进而保证成功率。

gRPC 提供向其客户端传达系统健康状况的健康库,该库通过 health/v1 API 提供服务定义:https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto

客户端

客户端可以通过两种方法监控服务端的运行状况:

在大多数情况下,客户端不需要直接检查服务端。当在 ServiceConfig 中指定健康检查配置时,客户端将透明地执行健康检查操作。该配置指示在建立连接时应检查的后端 serviceName,空字符串表示应报告服务端的整体运行状况。更多详情,可查看:

// 导入 grpc/health,启用透明的客户端侧检查
import _ "google.golang.org/grpc/health"

// 设置相应的 Service Config
var dftSvcConfig = `{
    "loadBalancingConfig": [{"round_robin":{}}],
    "healthCheckConfig": {
       "serviceName": ""
    },
    "methodConfig": [{
       "name": [{}],
       "waitForReady": true,
       "retryPolicy": {
          "MaxAttempts": 5,
          "InitialBackoff": "0.2s",
          "MaxBackoff": "4s",
          "BackoffMultiplier": 2.0,
          "RetryableStatusCodes": [ "UNAVAILABLE" ]
       }
    }]
}`

func main() {
    conn, err := grpc.NewClient(
        fmt.Sprintf("%s:///aaaa:50052", builder.Scheme()),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(dftSvcConfig),
    )
}

服务端

服务端通过检查相关系统,比如数据库,控制及更新自己的状态。健康服务器可以返回以下状态:

服务端可以使用 healthServer.SetServingStatus("serviceName", servingStatus) 切换其运行状况。

import (
    "google.golang.org/grpc/health"
    "google.golang.org/grpc/health/grpc_health_v1"
)

func main() {
    healthServer := health.NewServer()
    server := grpc.NewServer()
    grpc_health_v1.RegisterHealthServer(server, healthServer)

    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    log.Printf("server listening at 0.0.0.0:%d\n", port)
    if err := server.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

客户端重试

  // 出向 RPC 的重试策略。
  message RetryPolicy {
    // RPC 尝试的最大数量,包括原始尝试。大于 5 的值将被视为 5
    uint32 max_attempts = 1;

    // 指数退避参数。初始重试尝试发生在 random(0, initial_backoff)。
    // 通常第 n 次尝试发生在 random(0, min(initial_backoff*backoff_multiplier**(n-1), max_backoff))
    google.protobuf.Duration initial_backoff = 2;
    google.protobuf.Duration max_backoff = 3;
    float backoff_multiplier = 4;

    // 重试状态码集合。必须非空
    repeated google.rpc.Code retryable_status_codes = 5;
  }

客户端负载均衡

message LoadBalancingConfig {
  // Exactly one LB policy may be configured.
  oneof policy {
    // For each new LB policy supported by gRPC, a new field must be added
    // here.  The field's name must be the LB policy name and its type is a
    // message that provides whatever configuration parameters are needed
    // by the LB policy.  The configuration message will be passed to the
    // LB policy when it is instantiated on the client.
    //
    // If the LB policy does not require any configuration parameters, the
    // message for that LB policy may be empty.
    //
    // Note that if an LB policy contains another nested LB policy
    // (e.g., a gslb policy picks the cluster and then delegates to
    // a round_robin policy to pick the backend within that cluster), its
    // configuration message may include a nested instance of the
    // LoadBalancingConfig message to configure the nested LB policy.

    PickFirstConfig pick_first = 4 [json_name = "pick_first"];

    RoundRobinConfig round_robin = 1 [json_name = "round_robin"];

    WeightedRoundRobinLbConfig weighted_round_robin = 20
        [json_name = "weighted_round_robin"];

    ...
}

自定义带 TTL 的 DNS Resolver

gRPC 在 Kubernetes 上的一种部署架构

流程如下:

Kubernetes Manifest 示例:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: grpc-client-setup-test
  labels:
    app: grpc-client-setup-test
spec:
  serviceName: grpc-client-setup-test-headless
  replicas: 2
  selector:
    matchLabels:
      app: grpc-client-setup-test
  template:
    metadata:
      labels:
        app: grpc-client-setup-test
    spec:
      terminationGracePeriodSeconds: 10
      imagePullSecrets:
      - name: your-registry-secrets
      containers:
      - name: grpc-client-setup-test
        image: your-image-name:your-image-tag
        ports:
        - containerPort: 50052
          name: grpc
        env:
        - name: GRPC_GO_LOG_VERBOSITY_LEVEL
          value: "99"
        - name: GRPC_GO_LOG_SEVERITY_LEVEL
          value: "info"
        readinessProbe:
          tcpSocket:
            port: 50052
          initialDelaySeconds: 2
          periodSeconds: 4
        livenessProbe:
          tcpSocket:
            port: 50052
          initialDelaySeconds: 2
          periodSeconds: 4

---

apiVersion: v1
kind: Service
metadata:
  name: grpc-client-setup-test-headless
spec:
  clusterIP: None
  selector:
    app: grpc-client-setup-test
  ports:
  - name: grpc
    port: 50052
    targetPort: 50052
  publishNotReadyAddresses: true

上述架构的问题

实现带 TTL 的 DNS 解析器解决上述问题

项目结构:

.
├── client.go
├── go.mod
├── go.sum
├── greeter
│   ├── greeter.pb.go
│   └── greeter_grpc.pb.go
├── greeter.proto
├── server.go
└── util
    └── dns_with_ttl_resolver.go

初始化:

go mod tidy
go mod download
go generate
go fmt ./...

go.mod:

module test

go 1.23.2

require (
    google.golang.org/grpc v1.70.0
    google.golang.org/protobuf v1.36.5
)

require (
    golang.org/x/net v0.32.0 // indirect
    golang.org/x/sys v0.28.0 // indirect
    golang.org/x/text v0.21.0 // indirect
    google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
)

greeter.proto:

syntax = "proto3";

package greeter;

option go_package = "./greeter;greeter";

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

client.go:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    _ "google.golang.org/grpc/health"
    "google.golang.org/grpc/resolver"
    "google.golang.org/protobuf/encoding/protojson"

    "test/greeter"
    "test/util"
)

var dftSvcConfig = `{
    "loadBalancingConfig": [{"round_robin":{}}],
    "healthCheckConfig": {
       "serviceName": ""
    },
    "methodConfig": [{
       "name": [{}],
       "waitForReady": true,
       "retryPolicy": {
          "MaxAttempts": 5,
          "InitialBackoff": "0.2s",
          "MaxBackoff": "4s",
          "BackoffMultiplier": 2.0,
          "RetryableStatusCodes": [ "UNAVAILABLE" ]
       }
    }]
}`

func main() {
    serverName := ""
    serverPort := 0
    flag.StringVar(&serverName, "serverName", "grpc-client-setup-test-headless", "server name")
    flag.IntVar(&serverPort, "serverPort", 50052, "server port")
    flag.Parse()

    // 注册自定义解析器
    builder := &util.DNSWithTTLResolverBuilder{RefreshTTL: 4 * time.Second, ResolveTTL: 4 * time.Second}
    resolver.Register(builder)

    // 使用自定义的解析器 schema 创建 gRPC 连接
    conn, err := grpc.NewClient(
       // fmt.Sprintf("dns:///%s:%d", serverName, serverPort),
       fmt.Sprintf("%s:///%s:%d", builder.Scheme(), serverName, serverPort),
       grpc.WithTransportCredentials(insecure.NewCredentials()),
       grpc.WithDefaultServiceConfig(dftSvcConfig),
    )
    if err != nil {
       log.Fatalf("failed to create gRPC client:%s", err)
    }
    defer func() { _ = conn.Close() }()

    client := greeter.NewGreeterClient(conn)

    for {
       resp, err := client.SayHello(context.Background(), &greeter.HelloRequest{Name: "Tim"})
       if err != nil {
          log.Fatalf("failed to SayHello: %v", err)
       }
       log.Println(protojson.Format(resp))
       time.Sleep(5 * time.Second)
    }
}

server.go:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/health"
    "google.golang.org/grpc/health/grpc_health_v1"

    "test/greeter"
)

func serverLoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler) (resp interface{}, err error) {
    log.Printf("invoking %s", info.FullMethod)
    response, err := handler(ctx, req)
    return response, err
}

type GreeterService struct {
    greeter.UnimplementedGreeterServer
}

func (g *GreeterService) SayHello(_ context.Context, req *greeter.HelloRequest) (*greeter.HelloResponse, error) {
    return &greeter.HelloResponse{Message: "Hello " + req.GetName()}, nil
}

//go:generate protoc --go_out=. --go-grpc_out=. greeter.proto
func main() {
    port := 0
    flag.IntVar(&port, "port", 50052, "server port")
    flag.Parse()

    healthServer := health.NewServer()
    server := grpc.NewServer(
       grpc.UnaryInterceptor(serverLoggingInterceptor),
    )
    grpc_health_v1.RegisterHealthServer(server, healthServer)
    greeter.RegisterGreeterServer(server, &GreeterService{})

    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    if err != nil {
       log.Fatalf("failed to listen: %v", err)
    }
    log.Printf("server listening at 0.0.0.0:%d\n", port)
    if err := server.Serve(lis); err != nil {
       log.Fatalf("failed to serve: %v", err)
    }
}

util/dns_with_ttl_resolver.go:

package util

import (
    "context"
    "fmt"
    "log"
    "net"
    "reflect"
    "sync"
    "time"

    "google.golang.org/grpc/resolver"
)

// DNSWithTTLResolverBuilder 实现 resolver.Builder 接口,构造带 TTL 的 DNS 解析器
type DNSWithTTLResolverBuilder struct {
    // DNS 刷新时间间隔
    RefreshTTL time.Duration
    // DNS 解析超时时间
    ResolveTTL time.Duration
}

func (b *DNSWithTTLResolverBuilder) Build(
    // gRPC 的 Target
    target resolver.Target,
    // 包含用于解析器的回调,这些回调用于向 gRPC ClientConn 通知更新
    cc resolver.ClientConn,
    _ resolver.BuildOptions) (resolver.Resolver, error) {
    r := &DNSWithTTLResolver{
       target:     target,
       cc:         cc,
       refreshTTL: b.RefreshTTL,
       resolveTTL: b.ResolveTTL,

       closeCh: make(chan struct{}),
       mu:      new(sync.Mutex),
       addrSet: make(map[string]struct{}),
    }
    // 启动后台协程
    r.Start()
    return r, nil
}

func (b *DNSWithTTLResolverBuilder) Scheme() string { return "dns-with-ttl" }

// DNSWithTTLResolver 是带 TTL 的 DNS 解析器
type DNSWithTTLResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    refreshTTL time.Duration
    resolveTTL time.Duration

    closeCh chan struct{}
    mu      *sync.Mutex
    addrSet map[string]struct{}
}

func (r *DNSWithTTLResolver) Start() {
    r.resolve(true)

    ticker := time.NewTicker(r.refreshTTL)
    go func() {
       for {
          select {
          case <-r.closeCh:
             ticker.Stop()
             return
          case <-ticker.C:
             r.resolve(false)
          }
       }
    }()
}

func (r *DNSWithTTLResolver) resolve(resolveNow bool) {
    // 解析域名,获取地址列表
    hostport := r.target.Endpoint()
    log.Printf("resolving %s", hostport)
    host, port, err := net.SplitHostPort(hostport)
    if err != nil {
       // 通知 ClientConn,解析器遇到错误。ClientConn 将该策略转发到负载均衡策略
       r.cc.ReportError(err)
       return
    }
    var netResolver net.Resolver
    ctx, cancel := context.WithTimeout(context.Background(), r.resolveTTL)
    defer cancel()
    addresses, err := netResolver.LookupHost(ctx, host)
    if err != nil {
       if resolveNow {
          r.cc.ReportError(err)
       }
       return
    }

    // 将地址列表转换为 gRPC 地址列表
    addrs := make([]resolver.Address, len(addresses))
    addrSet := make(map[string]struct{})
    for i, addr := range addresses {
       addrs[i] = resolver.Address{
          Addr:       fmt.Sprintf("%s:%v", addr, port),
          ServerName: host,
       }
       addrSet[addrs[i].Addr] = struct{}{}
    }

    r.mu.Lock()
    defer r.mu.Unlock()
    // 更新前判断地址列表是否发生变化
    if reflect.DeepEqual(r.addrSet, addrSet) {
       log.Println("no change detected")
       return
    }
    r.addrSet = addrSet

    // 更新连接状态
    if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
       if resolveNow {
          r.cc.ReportError(err)
       }
    }
}

func (r *DNSWithTTLResolver) ResolveNow(_ resolver.ResolveNowOptions) {
    r.resolve(true)
}

func (r *DNSWithTTLResolver) Close() {
    select {
    case <-r.closeCh:
       return
    default:
       close(r.closeCh)
    }
}