mkdir client-go-democd client-go-demo/go mod init client-go-demogo get k8s.io/client-go@v0.24.0go get k8s.io/apimachinery@v0.24.0go get k8s.io/api@v0.24.0注意:版本要一致。
将 kubeconfig 文件放在 ~/.kube/config 中,其内容类似:
apiVersionv1clusterscluster certificate-authority-data... serverhttps//1.2.3.45 namekubernetescontextscontext clusterkubernetes userkubernetes-admin namekubernetes-admin@kubernetescurrent-contextkubernetes-admin@kuberneteskindConfigpreferencesusersnamekubernetes-admin user client-certificate-data... client-key-data...client-go 是与 kubernetes 集群进行对话的 Go 客户端。
kubernetes 包包含用于访问 Kubernetes API 的客户端集合(clientset)discovery 包用于发现 Kubernetes API server 支持的 APIdynamic 包包含可以对任意 Kubernetes API 对象执行通用操作的动态客户端(dynamic client)plugin/pkg/client/auth 包包含用于从外部源获取凭证的认证插件transport 包用于设置认证和开启连接tools/cache 包对编写控制器(controller)有用如果应用程序运行在集群的 Pod 中,请参阅 in-cluster example,否则请参阅 out-of-cluster example。
1. 集群内部的认证
当使用 rest.InClusterConfig() 时,client-go 使用被挂载到 Pod 里面的 /var/run/secrets/kubernetes.io/serviceaccount 路径上的 Service Account token。
2. 集群外部的认证
使用包含集群上下文信息的 kubeconfig 文件初始化客户端。kubectl 命令也使用 kubeconfig 文件对集群进行身份验证。
RESTClient 是对 http.Client 的封装,它在指定的路径上执行诸如 Get、Put、Post 和 Delete 之类的通用 REST 函数。其它客户端都是在它的基础上实现的。可用于 Kubernetes 内置资源和 CRD 资源。
rest_client_demo.go:
package main
import ( "context" "fmt" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd")
func main() { // 如果从 kubeconfig 文件构建 rest.Config 不方便, // 那么可以使用 clientcmd.BuildConfigFromKubeconfigGetter() 构建 rest.Config // (比如在凭证存储在配置中心的情况下) config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic("fail to build config") } config.GroupVersion = &v1.SchemeGroupVersion config.NegotiatedSerializer = scheme.Codecs config.APIPath = "/api"
// 创建 RESTClient restClient, err := rest.RESTClientFor(config) if err != nil { panic(err) }
pod := &v1.Pod{} if err := restClient.Get(). Namespace("default"). Resource("pods"). // 确保 Pod 存在,否则程序会报错 Name("nginx-65778599-2nwp5"). Do(context.TODO()). Into(pod); err != nil { panic(err) } fmt.Printf("GVK: %s\n", pod.GroupVersionKind())}上面的程序使用 Pod 名称获取 Pod 信息。
Clientset 是用于处理 Kubernetes 内置资源对象的客户端集合。默认情况下,不能操作 CRD 资源。
clientset_demo.go:
x
package main
import ( "context" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd")
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) }
clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) }
pod, err := clientset.CoreV1(). Pods("default"). Get(context.TODO(), "nginx-65778599-2nwp5", metav1.GetOptions{}) if err != nil { panic(err) } fmt.Printf("Annotations: %s\n", pod.Annotations)}上面的程序使用 Pod 名称获取 Pod 信息。
DynamicClient 不仅能对 Kubernetes 内置资源进行处理,还可以对 CRD 资源进行处理。
dynamic_client_demo.go:
x
package main
import ( "context" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd")
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) } client, err := dynamic.NewForConfig(config) if err != nil { panic(err) }
deploymentSpec := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "apps/v1", "kind": "Deployment", "metadata": map[string]interface{}{ "name": "nginx-alpine-deployment-test", }, "spec": map[string]interface{}{ "replicas": 1, "selector": map[string]interface{}{ "matchLabels": map[string]interface{}{ "app": "nginx-alpine-test", }, }, "template": map[string]interface{}{ "metadata": map[string]interface{}{ "labels": map[string]interface{}{ "app": "nginx-alpine-test", }, }, "spec": map[string]interface{}{ "containers": []map[string]interface{}{ { "name": "nginx-alpine", "image": "nginx:alpine", "ports": []map[string]interface{}{ { "name": "http", "protocol": "TCP", "containerPort": 80, }, }, }, }, }, }, }, }, } deploymentResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
fmt.Println("Creating deployment...") result, err := client. Resource(deploymentResource). Namespace("default"). Create(context.Background(), deploymentSpec, metav1.CreateOptions{}) if err != nil { panic(err) } fmt.Printf("Created deployment %q.\n", result.GetName())}上面的程序创建名为 nginx-alpine-deployment-test 的 Deployment。
DiscoveryClient 是发现客户端,用于发现 Kubernetes API Server 支持的资源组(Group)、资源版本(Version)和资源信息(Resource)。
discovery_client_demo.go:
package main
import ( "fmt" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/clientcmd")
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err) } _, APIResourceList, err := discoveryClient.ServerGroupsAndResources() if err != nil { panic(err) } for _, list := range APIResourceList { fmt.Println(list) }}scaleClient 是用于对 Deployment、ReplicaSet 等资源进行扩缩容的客户端。
scale_client_demo.go:
package main
import ( "context" "fmt" autoscalingapi "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/scale" "k8s.io/client-go/tools/clientcmd")
// resourceMapper 决定要伸缩的资源的推荐版本type resourceMapper struct{}
// ResourceFor 带部分资源,返回推荐资源func (r *resourceMapper) ResourceFor(resource schema.GroupVersionResource) (schema.GroupVersionResource, error) { fmt.Printf("ResourceFor was called with resource %s\n", resource.String()) if resource.Group == "apps" && resource.Resource == "deployments" { return schema.GroupVersionResource{ Group: resource.Group, Version: "v1", Resource: resource.Resource, }, nil } return schema.GroupVersionResource{}, fmt.Errorf("no preferred version for %s", resource.String())}
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) }
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err) }
// 创建 scaleClient scaleClient, err := scale.NewForConfig( config, &resourceMapper{}, dynamic.LegacyAPIPathResolverFunc, scale.NewDiscoveryScaleKindResolver(discoveryClient)) if err != nil { println(err) }
scaleSpec := &autoscalingapi.Scale{ Spec: autoscalingapi.ScaleSpec{ // 期望的实例数量 Replicas: 2, }, Status: autoscalingapi.ScaleStatus{ // 观察的实例的实际数量 Replicas: 0, // 选择器。请查看 http://kubernetes.io/docs/user-guide/labels#label-selectors Selector: "app=nginx", }, } // 命名空间 scaleSpec.Namespace = "default" // Deployment 一定要存在 scaleSpec.ObjectMeta.Name = "nginx"
_, err = scaleClient. Scales("default"). Update( context.Background(), // 给定的可伸缩资源 schema.GroupResource{ Group: "apps", Resource: "deployments", }, scaleSpec, metav1.UpdateOptions{}, ) if err != nil { panic(err) }
fmt.Printf("%s was scaled", scaleSpec.Name)}上面的代码将名为 nginx 的 Deployment 的副本数伸缩为 2。
List-Watch 机制是 Kubernetes 中的异步消息通知机制,通过它能有效地确保消息的实时性、顺序性和可靠性。它分为两部分:
List 基于 HTTP 短链接实现,而 Watch 基于 HTTP 长连接实现。Watch 通过使用长连接的方式极大地减轻 Kubernetes API Server 的压力。
deployment_watch_demo.go:
xxxxxxxxxxpackage main
import ( "context" "fmt" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd")
func main() { config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { panic(err) }
clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) }
watcher, err := clientset.AppsV1(). Deployments("default"). Watch( context.TODO(), metav1.ListOptions{ // 标签选择器 LabelSelector: "app=nginx", }, ) if err != nil { panic(err) }
for { select { case event, ok := <-watcher.ResultChan(): if !ok { fmt.Println("channel closed") break } fmt.Println("Event Type:", event.Type) dp, ok := event.Object.(*appsv1.Deployment) if !ok { fmt.Println("not deployment") continue } fmt.Println(dp) } }}上面的代码 WATCH app 标签等于 nginx 的 Deployment 上发生的事件。