Go gRPC 客户端常用设置
概述
本文将讲述:
- 健康检查
- 客户端重试
- 客户端负载均衡
- 客户端如何在服务端扩缩容时自动 Rebalance
健康检查
如果启用健康检查,那么客户端在负载均衡时,将只对健康的 SubChannel 发起请求,进而保证成功率。
gRPC 提供向其客户端传达系统健康状况的健康库,该库通过 health/v1
API 提供服务定义:https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto
客户端
客户端可以通过两种方法监控服务端的运行状况:
- 适用于单次请求的
check
方法
- 适用于流的
watch
方法
在大多数情况下,客户端不需要直接检查服务端。当在 ServiceConfig
中指定健康检查配置时,客户端将透明地执行健康检查操作。该配置指示在建立连接时应检查的后端 serviceName
,空字符串表示应报告服务端的整体运行状况。更多详情,可查看:
- https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md
- https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
// 导入 grpc/health,启用透明的客户端侧检查
import _ "google.golang.org/grpc/health"
// 设置相应的 Service Config
var dftSvcConfig = `{
"loadBalancingConfig": [{"round_robin":{}}],
"healthCheckConfig": {
"serviceName": ""
},
"methodConfig": [{
"name": [{}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": "0.2s",
"MaxBackoff": "4s",
"BackoffMultiplier": 2.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
func main() {
conn, err := grpc.NewClient(
fmt.Sprintf("%s:///aaaa:50052", builder.Scheme()),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(dftSvcConfig),
)
}
服务端
服务端通过检查相关系统,比如数据库,控制及更新自己的状态。健康服务器可以返回以下状态:
UNKNOWN
:表示尚不清楚当前状态,通常出现在服务端实例启动时
SERVING
:表示已准备好为请求提供服务
NOT_SERVING
:表示系统无法为请求提供服务
SERVICE_UNKNOWN
:表示服务端不知道客户端请求的serviceName
,只有Watch()
调用能报告此状态
服务端可以使用 healthServer.SetServingStatus("serviceName", servingStatus)
切换其运行状况。
import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
healthServer := health.NewServer()
server := grpc.NewServer()
grpc_health_v1.RegisterHealthServer(server, healthServer)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Printf("server listening at 0.0.0.0:%d\n", port)
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端重试
// 出向 RPC 的重试策略。
message RetryPolicy {
// RPC 尝试的最大数量,包括原始尝试。大于 5 的值将被视为 5
uint32 max_attempts = 1;
// 指数退避参数。初始重试尝试发生在 random(0, initial_backoff)。
// 通常第 n 次尝试发生在 random(0, min(initial_backoff*backoff_multiplier**(n-1), max_backoff))
google.protobuf.Duration initial_backoff = 2;
google.protobuf.Duration max_backoff = 3;
float backoff_multiplier = 4;
// 重试状态码集合。必须非空
repeated google.rpc.Code retryable_status_codes = 5;
}
客户端负载均衡
message LoadBalancingConfig {
// Exactly one LB policy may be configured.
oneof policy {
// For each new LB policy supported by gRPC, a new field must be added
// here. The field's name must be the LB policy name and its type is a
// message that provides whatever configuration parameters are needed
// by the LB policy. The configuration message will be passed to the
// LB policy when it is instantiated on the client.
//
// If the LB policy does not require any configuration parameters, the
// message for that LB policy may be empty.
//
// Note that if an LB policy contains another nested LB policy
// (e.g., a gslb policy picks the cluster and then delegates to
// a round_robin policy to pick the backend within that cluster), its
// configuration message may include a nested instance of the
// LoadBalancingConfig message to configure the nested LB policy.
PickFirstConfig pick_first = 4 [json_name = "pick_first"];
RoundRobinConfig round_robin = 1 [json_name = "round_robin"];
WeightedRoundRobinLbConfig weighted_round_robin = 20
[json_name = "weighted_round_robin"];
...
}
自定义带 TTL 的 DNS Resolver
gRPC 在 Kubernetes 上的一种部署架构
流程如下:
- gRPC Server 以 StatefulSet 的方式在 Kubernetes 上部署
- Client 解析 Headless Service 获取 Pod IP 列表
- Client 通过设定的负载均衡策略确定连接哪个 Pod IP
- Client 直接连接 Pod IP

Kubernetes Manifest 示例:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: grpc-client-setup-test
labels:
app: grpc-client-setup-test
spec:
serviceName: grpc-client-setup-test-headless
replicas: 2
selector:
matchLabels:
app: grpc-client-setup-test
template:
metadata:
labels:
app: grpc-client-setup-test
spec:
terminationGracePeriodSeconds: 10
imagePullSecrets:
- name: your-registry-secrets
containers:
- name: grpc-client-setup-test
image: your-image-name:your-image-tag
ports:
- containerPort: 50052
name: grpc
env:
- name: GRPC_GO_LOG_VERBOSITY_LEVEL
value: "99"
- name: GRPC_GO_LOG_SEVERITY_LEVEL
value: "info"
readinessProbe:
tcpSocket:
port: 50052
initialDelaySeconds: 2
periodSeconds: 4
livenessProbe:
tcpSocket:
port: 50052
initialDelaySeconds: 2
periodSeconds: 4
---
apiVersion: v1
kind: Service
metadata:
name: grpc-client-setup-test-headless
spec:
clusterIP: None
selector:
app: grpc-client-setup-test
ports:
- name: grpc
port: 50052
targetPort: 50052
publishNotReadyAddresses: true
上述架构的问题
- gRPC 客户端仅在建立连接时解析 Headless Service 一次,期间如果 StatefulSet 副本数发生变化,客户端感知不到
实现带 TTL 的 DNS 解析器解决上述问题
项目结构:
.
├── client.go
├── go.mod
├── go.sum
├── greeter
│ ├── greeter.pb.go
│ └── greeter_grpc.pb.go
├── greeter.proto
├── server.go
└── util
└── dns_with_ttl_resolver.go
初始化:
go mod tidy
go mod download
go generate
go fmt ./...
go.mod:
module test
go 1.23.2
require (
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.5
)
require (
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
)
greeter.proto:
syntax = "proto3";
package greeter;
option go_package = "./greeter;greeter";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
client.go:
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/health"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/encoding/protojson"
"test/greeter"
"test/util"
)
var dftSvcConfig = `{
"loadBalancingConfig": [{"round_robin":{}}],
"healthCheckConfig": {
"serviceName": ""
},
"methodConfig": [{
"name": [{}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": "0.2s",
"MaxBackoff": "4s",
"BackoffMultiplier": 2.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
func main() {
serverName := ""
serverPort := 0
flag.StringVar(&serverName, "serverName", "grpc-client-setup-test-headless", "server name")
flag.IntVar(&serverPort, "serverPort", 50052, "server port")
flag.Parse()
// 注册自定义解析器
builder := &util.DNSWithTTLResolverBuilder{RefreshTTL: 4 * time.Second, ResolveTTL: 4 * time.Second}
resolver.Register(builder)
// 使用自定义的解析器 schema 创建 gRPC 连接
conn, err := grpc.NewClient(
// fmt.Sprintf("dns:///%s:%d", serverName, serverPort),
fmt.Sprintf("%s:///%s:%d", builder.Scheme(), serverName, serverPort),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(dftSvcConfig),
)
if err != nil {
log.Fatalf("failed to create gRPC client:%s", err)
}
defer func() { _ = conn.Close() }()
client := greeter.NewGreeterClient(conn)
for {
resp, err := client.SayHello(context.Background(), &greeter.HelloRequest{Name: "Tim"})
if err != nil {
log.Fatalf("failed to SayHello: %v", err)
}
log.Println(protojson.Format(resp))
time.Sleep(5 * time.Second)
}
}
server.go:
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"test/greeter"
)
func serverLoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Printf("invoking %s", info.FullMethod)
response, err := handler(ctx, req)
return response, err
}
type GreeterService struct {
greeter.UnimplementedGreeterServer
}
func (g *GreeterService) SayHello(_ context.Context, req *greeter.HelloRequest) (*greeter.HelloResponse, error) {
return &greeter.HelloResponse{Message: "Hello " + req.GetName()}, nil
}
//go:generate protoc --go_out=. --go-grpc_out=. greeter.proto
func main() {
port := 0
flag.IntVar(&port, "port", 50052, "server port")
flag.Parse()
healthServer := health.NewServer()
server := grpc.NewServer(
grpc.UnaryInterceptor(serverLoggingInterceptor),
)
grpc_health_v1.RegisterHealthServer(server, healthServer)
greeter.RegisterGreeterServer(server, &GreeterService{})
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Printf("server listening at 0.0.0.0:%d\n", port)
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
util/dns_with_ttl_resolver.go:
package util
import (
"context"
"fmt"
"log"
"net"
"reflect"
"sync"
"time"
"google.golang.org/grpc/resolver"
)
// DNSWithTTLResolverBuilder 实现 resolver.Builder 接口,构造带 TTL 的 DNS 解析器
type DNSWithTTLResolverBuilder struct {
// DNS 刷新时间间隔
RefreshTTL time.Duration
// DNS 解析超时时间
ResolveTTL time.Duration
}
func (b *DNSWithTTLResolverBuilder) Build(
// gRPC 的 Target
target resolver.Target,
// 包含用于解析器的回调,这些回调用于向 gRPC ClientConn 通知更新
cc resolver.ClientConn,
_ resolver.BuildOptions) (resolver.Resolver, error) {
r := &DNSWithTTLResolver{
target: target,
cc: cc,
refreshTTL: b.RefreshTTL,
resolveTTL: b.ResolveTTL,
closeCh: make(chan struct{}),
mu: new(sync.Mutex),
addrSet: make(map[string]struct{}),
}
// 启动后台协程
r.Start()
return r, nil
}
func (b *DNSWithTTLResolverBuilder) Scheme() string { return "dns-with-ttl" }
// DNSWithTTLResolver 是带 TTL 的 DNS 解析器
type DNSWithTTLResolver struct {
target resolver.Target
cc resolver.ClientConn
refreshTTL time.Duration
resolveTTL time.Duration
closeCh chan struct{}
mu *sync.Mutex
addrSet map[string]struct{}
}
func (r *DNSWithTTLResolver) Start() {
r.resolve(true)
ticker := time.NewTicker(r.refreshTTL)
go func() {
for {
select {
case <-r.closeCh:
ticker.Stop()
return
case <-ticker.C:
r.resolve(false)
}
}
}()
}
func (r *DNSWithTTLResolver) resolve(resolveNow bool) {
// 解析域名,获取地址列表
hostport := r.target.Endpoint()
log.Printf("resolving %s", hostport)
host, port, err := net.SplitHostPort(hostport)
if err != nil {
// 通知 ClientConn,解析器遇到错误。ClientConn 将该策略转发到负载均衡策略
r.cc.ReportError(err)
return
}
var netResolver net.Resolver
ctx, cancel := context.WithTimeout(context.Background(), r.resolveTTL)
defer cancel()
addresses, err := netResolver.LookupHost(ctx, host)
if err != nil {
if resolveNow {
r.cc.ReportError(err)
}
return
}
// 将地址列表转换为 gRPC 地址列表
addrs := make([]resolver.Address, len(addresses))
addrSet := make(map[string]struct{})
for i, addr := range addresses {
addrs[i] = resolver.Address{
Addr: fmt.Sprintf("%s:%v", addr, port),
ServerName: host,
}
addrSet[addrs[i].Addr] = struct{}{}
}
r.mu.Lock()
defer r.mu.Unlock()
// 更新前判断地址列表是否发生变化
if reflect.DeepEqual(r.addrSet, addrSet) {
log.Println("no change detected")
return
}
r.addrSet = addrSet
// 更新连接状态
if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
if resolveNow {
r.cc.ReportError(err)
}
}
}
func (r *DNSWithTTLResolver) ResolveNow(_ resolver.ResolveNowOptions) {
r.resolve(true)
}
func (r *DNSWithTTLResolver) Close() {
select {
case <-r.closeCh:
return
default:
close(r.closeCh)
}
}