mkdir client-go-demo
cd client-go-demo/
go mod init client-go-demo
go get k8s.io/client-go@v0.24.0
go get k8s.io/apimachinery@v0.24.0
go get k8s.io/api@v0.24.0
注意:版本要一致。
将 kubeconfig 文件放在 ~/.kube/config
中,其内容类似:
apiVersion v1
clusters
cluster
certificate-authority-data...
server https //1.2.3.45
name kubernetes
contexts
context
cluster kubernetes
user kubernetes-admin
name kubernetes-admin@kubernetes
current-context kubernetes-admin@kubernetes
kind Config
preferences
users
name kubernetes-admin
user
client-certificate-data...
client-key-data...
client-go 是与 kubernetes 集群进行对话的 Go 客户端。
kubernetes
包包含用于访问 Kubernetes API 的客户端集合(clientset)discovery
包用于发现 Kubernetes API server 支持的 APIdynamic
包包含可以对任意 Kubernetes API 对象执行通用操作的动态客户端(dynamic client)plugin/pkg/client/auth
包包含用于从外部源获取凭证的认证插件transport
包用于设置认证和开启连接tools/cache
包对编写控制器(controller)有用如果应用程序运行在集群的 Pod 中,请参阅 in-cluster example,否则请参阅 out-of-cluster example。
1. 集群内部的认证
当使用 rest.InClusterConfig()
时,client-go 使用被挂载到 Pod 里面的 /var/run/secrets/kubernetes.io/serviceaccount
路径上的 Service Account token。
2. 集群外部的认证
使用包含集群上下文信息的 kubeconfig 文件初始化客户端。kubectl
命令也使用 kubeconfig 文件对集群进行身份验证。
RESTClient 是对 http.Client 的封装,它在指定的路径上执行诸如 Get、Put、Post 和 Delete 之类的通用 REST 函数。其它客户端都是在它的基础上实现的。可用于 Kubernetes 内置资源和 CRD 资源。
rest_client_demo.go:
package main
import (
"context"
"fmt"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 如果从 kubeconfig 文件构建 rest.Config 不方便,
// 那么可以使用 clientcmd.BuildConfigFromKubeconfigGetter() 构建 rest.Config
// (比如在凭证存储在配置中心的情况下)
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic("fail to build config")
}
config.GroupVersion = &v1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
config.APIPath = "/api"
// 创建 RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
pod := &v1.Pod{}
if err := restClient.Get().
Namespace("default").
Resource("pods").
// 确保 Pod 存在,否则程序会报错
Name("nginx-65778599-2nwp5").
Do(context.TODO()).
Into(pod); err != nil {
panic(err)
}
fmt.Printf("GVK: %s\n", pod.GroupVersionKind())
}
上面的程序使用 Pod 名称获取 Pod 信息。
Clientset 是用于处理 Kubernetes 内置资源对象的客户端集合。默认情况下,不能操作 CRD 资源。
clientset_demo.go:
x
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
pod, err := clientset.CoreV1().
Pods("default").
Get(context.TODO(), "nginx-65778599-2nwp5", metav1.GetOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Annotations: %s\n", pod.Annotations)
}
上面的程序使用 Pod 名称获取 Pod 信息。
DynamicClient 不仅能对 Kubernetes 内置资源进行处理,还可以对 CRD 资源进行处理。
dynamic_client_demo.go:
x
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
client, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}
deploymentSpec := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "nginx-alpine-deployment-test",
},
"spec": map[string]interface{}{
"replicas": 1,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": "nginx-alpine-test",
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": "nginx-alpine-test",
},
},
"spec": map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "nginx-alpine",
"image": "nginx:alpine",
"ports": []map[string]interface{}{
{
"name": "http",
"protocol": "TCP",
"containerPort": 80,
},
},
},
},
},
},
},
},
}
deploymentResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
fmt.Println("Creating deployment...")
result, err := client.
Resource(deploymentResource).
Namespace("default").
Create(context.Background(), deploymentSpec, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetName())
}
上面的程序创建名为 nginx-alpine-deployment-test
的 Deployment。
DiscoveryClient 是发现客户端,用于发现 Kubernetes API Server 支持的资源组(Group)、资源版本(Version)和资源信息(Resource)。
discovery_client_demo.go:
package main
import (
"fmt"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err)
}
for _, list := range APIResourceList {
fmt.Println(list)
}
}
scaleClient 是用于对 Deployment、ReplicaSet 等资源进行扩缩容的客户端。
scale_client_demo.go:
package main
import (
"context"
"fmt"
autoscalingapi "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
)
// resourceMapper 决定要伸缩的资源的推荐版本
type resourceMapper struct{}
// ResourceFor 带部分资源,返回推荐资源
func (r *resourceMapper) ResourceFor(resource schema.GroupVersionResource) (schema.GroupVersionResource, error) {
fmt.Printf("ResourceFor was called with resource %s\n", resource.String())
if resource.Group == "apps" && resource.Resource == "deployments" {
return schema.GroupVersionResource{
Group: resource.Group,
Version: "v1",
Resource: resource.Resource,
}, nil
}
return schema.GroupVersionResource{}, fmt.Errorf("no preferred version for %s", resource.String())
}
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
// 创建 scaleClient
scaleClient, err := scale.NewForConfig(
config,
&resourceMapper{},
dynamic.LegacyAPIPathResolverFunc,
scale.NewDiscoveryScaleKindResolver(discoveryClient))
if err != nil {
println(err)
}
scaleSpec := &autoscalingapi.Scale{
Spec: autoscalingapi.ScaleSpec{
// 期望的实例数量
Replicas: 2,
},
Status: autoscalingapi.ScaleStatus{
// 观察的实例的实际数量
Replicas: 0,
// 选择器。请查看 http://kubernetes.io/docs/user-guide/labels#label-selectors
Selector: "app=nginx",
},
}
// 命名空间
scaleSpec.Namespace = "default"
// Deployment 一定要存在
scaleSpec.ObjectMeta.Name = "nginx"
_, err = scaleClient.
Scales("default").
Update(
context.Background(),
// 给定的可伸缩资源
schema.GroupResource{
Group: "apps",
Resource: "deployments",
},
scaleSpec,
metav1.UpdateOptions{},
)
if err != nil {
panic(err)
}
fmt.Printf("%s was scaled", scaleSpec.Name)
}
上面的代码将名为 nginx 的 Deployment 的副本数伸缩为 2。
List-Watch 机制是 Kubernetes 中的异步消息通知机制,通过它能有效地确保消息的实时性、顺序性和可靠性。它分为两部分:
List 基于 HTTP 短链接实现,而 Watch 基于 HTTP 长连接实现。Watch 通过使用长连接的方式极大地减轻 Kubernetes API Server 的压力。
deployment_watch_demo.go:
xxxxxxxxxx
package main
import (
"context"
"fmt"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
watcher, err := clientset.AppsV1().
Deployments("default").
Watch(
context.TODO(),
metav1.ListOptions{
// 标签选择器
LabelSelector: "app=nginx",
},
)
if err != nil {
panic(err)
}
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
fmt.Println("channel closed")
break
}
fmt.Println("Event Type:", event.Type)
dp, ok := event.Object.(*appsv1.Deployment)
if !ok {
fmt.Println("not deployment")
continue
}
fmt.Println(dp)
}
}
}
上面的代码 WATCH app 标签等于 nginx 的 Deployment 上发生的事件。