K8S二次开发系列之四:自定义开发Controller


Kubernetes中,Controller是一个非常重要的组件,它可以根据我们声明的资源期望状态和实际状态来进行调谐(Reconcile),以确保我们的应用程序始终处于所需的状态。本文将介绍Controller的工作原理,并介绍如何编写一个Controller。

Controller工作原理

在K8s中,用户通过声明式API定义资源的“预期状态”,Controller则负责监视资源的实际状态,当资源的实际状态和“预期状态”不一致时,Controller则对系统进行必要的更改,以确保两者一致,这个过程被称之为调谐(Reconcile)。

K8s中有多种类型的Controller,例如Deployment Controller、ReplicaSet Controller和StatefulSet Controller等。每个控制器都有不同的工作原理和适用场景,但它们的基本原理都是相同的。我们也可以根据需要编写Controller来实现自定义的业务逻辑。

K8S HTTP API的List/Watch机制

Controller需要监视K8S中资源的状态,这是如何实现的呢?

K8S Apiserver提供了常规的资源List接口,如下所示:

curl --location 'https://192.168.126.100:6443/api/v1/namespaces/default/pods' \
--header 'Authorization: Bearer Token'
{
    "kind": "PodList",
    "apiVersion": "v1",
    "metadata": {
        "resourceVersion": "18128858"
    },
    "items": [
        {
            "metadata": {
                "name": "nginx-68544fd76d-jjlsj",
                "generateName": "nginx-68544fd76d-",
                "namespace": "default",
                "uid": "6d209662-dc6b-48b6-8768-57e34a8ebe62",
                "resourceVersion": "17940143",
                "creationTimestamp": "2023-12-14T07:54:50Z",
                "labels": {
                    "app": "nginx",
                    "app_type": "frontend",
                    "pod-template-hash": "68544fd76d"
                },
                "annotations": {
                    "cni.projectcalico.org/containerID": "160ffb3e8948966b33945a10e50299edc933937fcacf111d14fbcdddd131d09b",
                    "cni.projectcalico.org/podIP": "10.244.123.131/32",
                    "cni.projectcalico.org/podIPs": "10.244.123.131/32"
                },
                "ownerReferences": [
                    {
                        "apiVersion": "apps/v1",
                        "kind": "ReplicaSet",
                        "name": "nginx-68544fd76d",
                        "uid": "ac770bcb-6369-44ed-8111-a746e1ce3955",
                        "controller": true,
                        "blockOwnerDeletion": true
                    }
                ],
                "managedFields": [……]
            },
            "spec": {……},
            "status": {……}
        }
    ]
}

上面的API获取了default命名空间下面的所有pods列表。在返回值的metadata字段里有一个参数resourceVersion,该字段的值是此次List操作得到的资源的版本号。我们在watch请求中可以带上该版本号作为参数,Apiserver会watch将该版本之后的资源变化并通知客户端。

现在在URL后面加上参数?watch=true,则Apiserver会对default命名空间下面的pod的状态进行持续监控,并在pod状态发生变化时通过chunked Response(HTTP 1.1)或者Server Push(HTTP2)通知到客户端。K8S称此机制为watch。

curl --location 'https://192.168.126.100:6443/api/v1/namespaces/default/pods?watch=true&resourceVersion=18128858' \
--header 'Authorization: Bearer Token'

在另一个终端中创建一个名为nginx的pod,然后将其删除,可以看到下面的输出:

{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-68544fd76d-6fggw","generateName":"nginx-68544fd76d-","namespace":"default","uid":"4f6c6528-1fdd-4f9d-a66d-74a5ac7d4142","resourceVersion":"18130095","creationTimestamp":"2023-12-15T07:35:49Z","labels":{"app":"nginx","app_type":"frontend","pod-template-hash":"68544fd76d"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"nginx-68544fd76d","uid":"8f979c58-c687-43d1-badb-83a258a467ea","controller":true,"blockOwnerDeletion":true}],"managedFields":[{"manager":"kube-controller-manager","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:generateName":{},"f:labels":{".":{},"f:app":{},"f:app_type":{},"f:pod-template-hash":{}},"f:ownerReferences":{".":{},"k:{\"uid\":\"8f979c58-c687-43d1-badb-83a258a467ea\"}":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx-container\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{".":{},"f:limits":{".":{},"f:cpu":{},"f:memory":{}},"f:requests":{".":{},"f:cpu":{},"f:memory":{}}},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:serviceAccount":{},"f:serviceAccountName":{},"f:terminationGracePeriodSeconds":{}}}}]},"spec":{"volumes":[{"name":"kube-api-access-7897x","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx-container","image":"nginx:1.21.4","resources":{"limits":{"cpu":"200m","memory":"200Mi"},"requests":{"cpu":"100m","memory":"100Mi"}},"volumeMounts":[{"name":"kube-api-access-7897x","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{},"affinity":{"podAntiAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":[{"labelSelector":{"matchExpressions":[{"key":"app_type","operator":"In","values":["frontend"]}]},"topologyKey":"kubernetes.io/hostname"}]}},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Pending","qosClass":"Burstable"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-68544fd76d-6fggw","generateName":"nginx-68544fd76d-","namespace":"default","uid":"4f6c6528-1fdd-4f9d-a66d-74a5ac7d4142","resourceVersion":"18130100","creationTimestamp":"2023-12-15T07:35:49Z","labels":{"app":"nginx","app_type":"frontend","pod-template-hash":"68544fd76d"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"nginx-68544fd76d","uid":"8f979c58-c687-43d1-badb-83a258a467ea","controller":true,"blockOwnerDeletion":true}],"managedFields":[{"manager":"kube-controller-manager","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:generateName":{},"f:labels":{".":{},"f:app":{},"f:app_type":{},"f:pod-template-hash":{}},"f:ownerReferences":{".":{},"k:{\"uid\":\"8f979c58-c687-43d1-badb-83a258a467ea\"}":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx-container\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{".":{},"f:limits":{".":{},"f:cpu":{},"f:memory":{}},"f:requests":{".":{},"f:cpu":{},"f:memory":{}}},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:serviceAccount":{},"f:serviceAccountName":{},"f:terminationGracePeriodSeconds":{}}}}]},"spec":{"volumes":[{"name":"kube-api-access-7897x","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx-container","image":"nginx:1.21.4","resources":{"limits":{"cpu":"200m","memory":"200Mi"},"requests":{"cpu":"100m","memory":"100Mi"}},"volumeMounts":[{"name":"kube-api-access-7897x","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"k8s-node2.lab.example.com","securityContext":{},"affinity":{"podAntiAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":[{"labelSelector":{"matchExpressions":[{"key":"app_type","operator":"In","values":["frontend"]}]},"topologyKey":"kubernetes.io/hostname"}]}},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Pending","conditions":[{"type":"PodScheduled","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"}],"qosClass":"Burstable"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-68544fd76d-6fggw","generateName":"nginx-68544fd76d-","namespace":"default","uid":"4f6c6528-1fdd-4f9d-a66d-74a5ac7d4142","resourceVersion":"18130102","creationTimestamp":"2023-12-15T07:35:49Z","labels":{"app":"nginx","app_type":"frontend","pod-template-hash":"68544fd76d"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"nginx-68544fd76d","uid":"8f979c58-c687-43d1-badb-83a258a467ea","controller":true,"blockOwnerDeletion":true}],"managedFields":[{"manager":"Go-http-client","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:status":{"f:conditions":{"k:{\"type\":\"ContainersReady\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Initialized\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Ready\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}}},"f:containerStatuses":{},"f:hostIP":{},"f:startTime":{}}},"subresource":"status"},{"manager":"kube-controller-manager","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:generateName":{},"f:labels":{".":{},"f:app":{},"f:app_type":{},"f:pod-template-hash":{}},"f:ownerReferences":{".":{},"k:{\"uid\":\"8f979c58-c687-43d1-badb-83a258a467ea\"}":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx-container\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{".":{},"f:limits":{".":{},"f:cpu":{},"f:memory":{}},"f:requests":{".":{},"f:cpu":{},"f:memory":{}}},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:serviceAccount":{},"f:serviceAccountName":{},"f:terminationGracePeriodSeconds":{}}}}]},"spec":{"volumes":[{"name":"kube-api-access-7897x","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx-container","image":"nginx:1.21.4","resources":{"limits":{"cpu":"200m","memory":"200Mi"},"requests":{"cpu":"100m","memory":"100Mi"}},"volumeMounts":[{"name":"kube-api-access-7897x","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"k8s-node2.lab.example.com","securityContext":{},"affinity":{"podAntiAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":[{"labelSelector":{"matchExpressions":[{"key":"app_type","operator":"In","values":["frontend"]}]},"topologyKey":"kubernetes.io/hostname"}]}},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Pending","conditions":[{"type":"Initialized","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"},{"type":"Ready","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"ContainersReady","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"PodScheduled","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"}],"hostIP":"192.168.126.102","startTime":"2023-12-15T07:35:49Z","containerStatuses":[{"name":"nginx-container","state":{"waiting":{"reason":"ContainerCreating"}},"lastState":{},"ready":false,"restartCount":0,"image":"nginx:1.21.4","imageID":"","started":false}],"qosClass":"Burstable"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-68544fd76d-6fggw","generateName":"nginx-68544fd76d-","namespace":"default","uid":"4f6c6528-1fdd-4f9d-a66d-74a5ac7d4142","resourceVersion":"18130114","creationTimestamp":"2023-12-15T07:35:49Z","labels":{"app":"nginx","app_type":"frontend","pod-template-hash":"68544fd76d"},"annotations":{"cni.projectcalico.org/containerID":"9e115a5d84568ea17e39a8c7cc7ca1fe8ec7e1be88882633a423f89cb175e338","cni.projectcalico.org/podIP":"10.244.123.176/32","cni.projectcalico.org/podIPs":"10.244.123.176/32"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"nginx-68544fd76d","uid":"8f979c58-c687-43d1-badb-83a258a467ea","controller":true,"blockOwnerDeletion":true}],"managedFields":[{"manager":"Go-http-client","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:status":{"f:conditions":{"k:{\"type\":\"ContainersReady\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Initialized\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Ready\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}}},"f:containerStatuses":{},"f:hostIP":{},"f:startTime":{}}},"subresource":"status"},{"manager":"kube-controller-manager","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:generateName":{},"f:labels":{".":{},"f:app":{},"f:app_type":{},"f:pod-template-hash":{}},"f:ownerReferences":{".":{},"k:{\"uid\":\"8f979c58-c687-43d1-badb-83a258a467ea\"}":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx-container\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{".":{},"f:limits":{".":{},"f:cpu":{},"f:memory":{}},"f:requests":{".":{},"f:cpu":{},"f:memory":{}}},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:serviceAccount":{},"f:serviceAccountName":{},"f:terminationGracePeriodSeconds":{}}}},{"manager":"calico","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:54Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:annotations":{".":{},"f:cni.projectcalico.org/containerID":{},"f:cni.projectcalico.org/podIP":{},"f:cni.projectcalico.org/podIPs":{}}}},"subresource":"status"}]},"spec":{"volumes":[{"name":"kube-api-access-7897x","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx-container","image":"nginx:1.21.4","resources":{"limits":{"cpu":"200m","memory":"200Mi"},"requests":{"cpu":"100m","memory":"100Mi"}},"volumeMounts":[{"name":"kube-api-access-7897x","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"k8s-node2.lab.example.com","securityContext":{},"affinity":{"podAntiAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":[{"labelSelector":{"matchExpressions":[{"key":"app_type","operator":"In","values":["frontend"]}]},"topologyKey":"kubernetes.io/hostname"}]}},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Pending","conditions":[{"type":"Initialized","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"},{"type":"Ready","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"ContainersReady","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"PodScheduled","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"}],"hostIP":"192.168.126.102","startTime":"2023-12-15T07:35:49Z","containerStatuses":[{"name":"nginx-container","state":{"waiting":{"reason":"ContainerCreating"}},"lastState":{},"ready":false,"restartCount":0,"image":"nginx:1.21.4","imageID":"","started":false}],"qosClass":"Burstable"}}}
{"type":"DELETED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-68544fd76d-6fggw","generateName":"nginx-68544fd76d-","namespace":"default","uid":"4f6c6528-1fdd-4f9d-a66d-74a5ac7d4142","resourceVersion":"18130238","creationTimestamp":"2023-12-15T07:35:49Z","deletionTimestamp":"2023-12-15T07:36:41Z","deletionGracePeriodSeconds":0,"labels":{"app":"nginx","app_type":"frontend","pod-template-hash":"68544fd76d"},"annotations":{"cni.projectcalico.org/containerID":"9e115a5d84568ea17e39a8c7cc7ca1fe8ec7e1be88882633a423f89cb175e338","cni.projectcalico.org/podIP":"","cni.projectcalico.org/podIPs":""},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"nginx-68544fd76d","uid":"8f979c58-c687-43d1-badb-83a258a467ea","controller":true,"blockOwnerDeletion":true}],"managedFields":[{"manager":"kube-controller-manager","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:49Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:generateName":{},"f:labels":{".":{},"f:app":{},"f:app_type":{},"f:pod-template-hash":{}},"f:ownerReferences":{".":{},"k:{\"uid\":\"8f979c58-c687-43d1-badb-83a258a467ea\"}":{}}},"f:spec":{"f:containers":{"k:{\"name\":\"nginx-container\"}":{".":{},"f:image":{},"f:imagePullPolicy":{},"f:name":{},"f:resources":{".":{},"f:limits":{".":{},"f:cpu":{},"f:memory":{}},"f:requests":{".":{},"f:cpu":{},"f:memory":{}}},"f:terminationMessagePath":{},"f:terminationMessagePolicy":{}}},"f:dnsPolicy":{},"f:enableServiceLinks":{},"f:restartPolicy":{},"f:schedulerName":{},"f:securityContext":{},"f:serviceAccount":{},"f:serviceAccountName":{},"f:terminationGracePeriodSeconds":{}}}},{"manager":"calico","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:35:54Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:annotations":{".":{},"f:cni.projectcalico.org/containerID":{},"f:cni.projectcalico.org/podIP":{},"f:cni.projectcalico.org/podIPs":{}}}},"subresource":"status"},{"manager":"Go-http-client","operation":"Update","apiVersion":"v1","time":"2023-12-15T07:36:43Z","fieldsType":"FieldsV1","fieldsV1":{"f:status":{"f:conditions":{"k:{\"type\":\"ContainersReady\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Initialized\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:status":{},"f:type":{}},"k:{\"type\":\"Ready\"}":{".":{},"f:lastProbeTime":{},"f:lastTransitionTime":{},"f:message":{},"f:reason":{},"f:status":{},"f:type":{}}},"f:containerStatuses":{},"f:hostIP":{},"f:phase":{},"f:startTime":{}}},"subresource":"status"}]},"spec":{"volumes":[{"name":"kube-api-access-7897x","projected":{"sources":[{"serviceAccountToken":{"expirationSeconds":3607,"path":"token"}},{"configMap":{"name":"kube-root-ca.crt","items":[{"key":"ca.crt","path":"ca.crt"}]}},{"downwardAPI":{"items":[{"path":"namespace","fieldRef":{"apiVersion":"v1","fieldPath":"metadata.namespace"}}]}}],"defaultMode":420}}],"containers":[{"name":"nginx-container","image":"nginx:1.21.4","resources":{"limits":{"cpu":"200m","memory":"200Mi"},"requests":{"cpu":"100m","memory":"100Mi"}},"volumeMounts":[{"name":"kube-api-access-7897x","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","imagePullPolicy":"IfNotPresent"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"k8s-node2.lab.example.com","securityContext":{},"affinity":{"podAntiAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":[{"labelSelector":{"matchExpressions":[{"key":"app_type","operator":"In","values":["frontend"]}]},"topologyKey":"kubernetes.io/hostname"}]}},"schedulerName":"default-scheduler","tolerations":[{"key":"node.kubernetes.io/not-ready","operator":"Exists","effect":"NoExecute","tolerationSeconds":300},{"key":"node.kubernetes.io/unreachable","operator":"Exists","effect":"NoExecute","tolerationSeconds":300}],"priority":0,"enableServiceLinks":true,"preemptionPolicy":"PreemptLowerPriority"},"status":{"phase":"Running","conditions":[{"type":"Initialized","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"},{"type":"Ready","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:36:43Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"ContainersReady","status":"False","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:36:43Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-container]"},{"type":"PodScheduled","status":"True","lastProbeTime":null,"lastTransitionTime":"2023-12-15T07:35:49Z"}],"hostIP":"192.168.126.102","startTime":"2023-12-15T07:35:49Z","containerStatuses":[{"name":"nginx-container","state":{"terminated":{"exitCode":0,"reason":"Completed","startedAt":"2023-12-15T07:35:55Z","finishedAt":"2023-12-15T07:36:41Z","containerID":"docker://08b4792466758680744919ec235c8c5934b92182c548e431c1e465df0c85428d"}},"lastState":{},"ready":false,"restartCount":0,"image":"nginx:1.21.4","imageID":"docker-pullable://nginx@sha256:366e9f1ddebdb844044c2fafd13b75271a9f620819370f8971220c2b330a9254","containerID":"docker://08b4792466758680744919ec235c8c5934b92182c548e431c1e465df0c85428d","started":false}],"qosClass":"Burstable"}}}

从上面HTTP Watch返回的Response中,可以看到有三种类型的事件:ADDED,MODIFIED和DELETED。ADDED表示创建了新的Pod,Pod的状态变化会产生MODIFIED类型的事件,DELETED则表示Pod被删除。

我们可以用client-go编写一个简易的Watch功能。

实现一个简易的Watch

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/zeromicro/go-zero/core/logx"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	ktypes "k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/flowcontrol"
)

var environment = "local"
var kubeconfig = "./kubeconfig"

func createKubernetes() *kubernetes.Clientset {
	logx.Info("Using namespace=default")
	var conf *rest.Config
	var err error
	if environment == "local" {
		logx.Infof("Using kubeconfig=%s", kubeconfig)
		conf, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
	} else {
		logx.Info("Using serviceaccount=default")
		conf, err = rest.InClusterConfig()
		conf.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(1000, 1000) // setting a big ratelimiter for client-side throttling, default 5
	}
	if err != nil {
		logx.Error(err)
		os.Exit(0)
	}
	clientset, err := kubernetes.NewForConfig(conf)
	if err != nil {
		logx.Error(err)
		os.Exit(0)
	}
	return clientset
}

func main() {
	clientset := createKubernetes()

	// watch for changes to pods
	watcher, err := clientset.CoreV1().Pods("default").Watch(context.Background(), metav1.ListOptions{
		LabelSelector: "app=nginx",
	})
	if err != nil {
		panic(err.Error())
	}

	// loop through events from the watcher
	for event := range watcher.ResultChan() {
		pod := event.Object.(*corev1.Pod)
		switch event.Type {
		case watch.Added:
			logx.Infof("Pod %s added, current status is %s \n", pod.Name, pod.Status.Phase)
			// todo: reconcile logic goes here
		case watch.Modified:
			logx.Infof("Pod %s modified, current status is %s \n", pod.Name, pod.Status.Phase)
			// todo: reconcile logic goes here
			if pod.Status.Phase == "Running" {
				data := fmt.Sprintf(`{"spec":{"template":{"metadata":{"annotations":{"kubectl.kubernetes.io/restartedAt":"%s"}}}}}`, time.Now().Format("2006-01-02T15:04:05Z"))
				_, err = clientset.AppsV1().Deployments("default").Patch(context.TODO(), "tools", ktypes.StrategicMergePatchType, []byte(data), metav1.PatchOptions{FieldManager: "kubectl-rollout"})
				if err != nil {
					logx.Error(err)
				} else {
					logx.Info("Restart tools success")
				}
			}
		case watch.Deleted:
			logx.Infof("Pod %s deleted \n", pod.Name)
			// todo: reconcile logic goes here
		}
	}
}

本例中监视的是app=nginx的Pod,主要的监听逻辑在watcher.ResultChan()。例子中实现了当Pod被修改后,并且处于Running状态时,重新启动tools这个deployment。

K8S Informer原理

采用K8S HTTP API可以查询K8S API资源对象并Watch其变化,但大量的HTTP调用会对Apiserver造成较大的负荷,而且网络调用可能存在较大的延迟。除此之外,开发者还需要在程序中处理资源的缓存,HTTP连接出问题后的重连等。为了解决这些问题并简化Controller的开发工作,K8S在client-go中提供了Informer客户端库。

在 Kubernetes中,Informer是一个客户端库,用于监视Kubernetes API服务器中的资源并将它们的当前状态缓存到本地。Informer提供了一种方法,让客户端应用程序可以高效地监视资源的更改,而无需不断地向 API 服务器发出请求。

相比直接采用HTTP Watch,使用 Kubernetes Informer有以下优势:

  • 减少API服务器的负载:通过在本地缓存资源信息,Informer减少了需要向API服务器发出的请求数量。这可以防止由于API服务器过载而影响整个集群的性能。
  • 提高应用程序性能:使用缓存的数据,客户端应用程序可以快速访问资源信息,而无需等待API服务器响应。这可以提高应用程序性能并减少延迟。
  • 简化代码:Informer提供了一种更简单、更流畅的方式来监视 Kubernetes 中的资源更改。客户端应用程序可以使用现有的Informer库来处理这些任务,而无需编写复杂的代码来管理与API服务器的连接并处理更新。
  • 更高的可靠性:由于Informer在本地缓存数据,因此即使API服务器不可用或存在问题,它们也可以继续工作。这可以确保客户端应用程序即使在底层Kubernetes基础结构出现问题时也能保持功能。

采用Informer库编写的Controller的架构如下图所示:

图中间的虚线将图分为上下两部分,其中上半部分是 Informer 库中的组件,下半部分则是使用 Informer 库编写的自定义 Controller 中的组件,这两部分一起组成了一个完整的 Controller。

采用 Informer 机制编写的 Controller 中的主要流程如下:

  1. Reflector采用K8s HTTP API List/Watch API Server中指定的资源。
    Reflector会先List资源,然后使用 List 接口返回的resourceVersion来watch后续的资源变化。对应的源码Reflector ListAndWatch

  2. Reflector将List得到的资源列表和后续的资源变化放到一个FIFO(先进先出)队列中。对应的源码:

    • 使用 List 的结果刷新 FIFO 队列
    • 将 Watch 收到的事件加入到 FIFO 队列
  3. Informer在一个循环中从FIFO队列中拿出资源对象进行处理。对应源码processLoop

  4. Informer将从FIFO队列中拿出的资源对象放到Indexer中。对应的源码processDeltas
    Indexer是Informer中的一个本地缓存,该缓存提供了索引功能(这是该组件取名为 Indexer 的原因),允许基于特定条件(如标签、注释或字段选择器)快速有效地查找资源。此处代码中的clientState 就是Indexer,来自于NewIndexerInformer方法中构建的Indexer,该Indexer作为clientState参数传递给了newInformer方法。

  5. Indexer将收到的资源对象放入其内部的缓存ThreadSafeStore中。

  6. 回调Controller的ResourceEventHandler,将资源对象变化通知到应用逻辑。对应的源码processDeltas

  7. ResourceEventHandler对资源对象的变化进行处理
    ResourceEventHandler处于用户的Controller代码中,K8S推荐的编程范式是将收到的消息放入到一个队列中,然后在一个循环中处理该队列中的消息,执行调谐逻辑。推荐该模式的原因是采用队列可以解耦消息生产者(Informer)和消费者(Controller调谐逻辑),避免消费者阻塞生产者。在用户代码中需要注意几点:

    • 前面我们已经讲到,Reflector会使用List的结果刷新FIFO队列,因此ResourceEventHandler收到的资源变化消息其实包含了Informer启动时获取的完整资源列表,Informer会采用ADDED事件将列表的资源通知到用户Controller。该机制屏蔽了List和Watch的细节,保证用户的ResourceEventHandler代码中会接收到Controller监控的资源的完整数据,包括启动Controller前已有的资源数据,以及之后的资源变化。
    • ResourceEventHandler中收到的消息中只有资源对象的key,用户在Controller中可以使用该key为关键字,通过Indexer查询本地缓存中的完整资源对象。

实现一个Informer

package main

import (
	"flag"
	"fmt"
	"os"
	"resourceInformer/types"

	. "resourceInformer/utils"
	"time"

	"github.com/spf13/viper"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/flowcontrol"
	"k8s.io/client-go/util/workqueue"
)

var conf = flag.String("c", "./conf/app.yaml", "config file")

// Controller demonstrates how to implement a controller with client-go
type Controller struct {
	indexer  cache.Indexer
	queue    workqueue.RateLimitingInterface
	informer cache.Controller
	newObj   string
}

// NewController creates a new Controller
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
	return &Controller{
		informer: informer,
		indexer:  indexer,
		queue:    queue,
	}
}

func (c *Controller) processNextItem() bool {
	// Wait until there is a new item in the working queue
	queuekey, quit := c.queue.Get()
	if quit {
		return false
	}
	// Tell the queue that we are done with processing this key
	defer c.queue.Done(queuekey)
	// Invoke the method containing the business logic
	err := c.syncToStdout(queuekey.(*types.QueueKey))
	// Handle the error if something went wrong during the execution of the business logic
	c.handleErr(err, queuekey)
	return true
}

// syncToStdout is the business logic of the controller. In this controller it simply prints
// information about the pod to stdout. In case an error happened, it has to simply return the error.
// The retry logic should not be part of the business logic
func (c *Controller) syncToStdout(queuekey *types.QueueKey) error {
	obj, exists, err := c.indexer.GetByKey(queuekey.Key)
	if err != nil {
		Log.Errorf("Fetching object[%s] from store failed: %v", queuekey.Key, err)
		return err
	}
	if !exists {
		// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
		Log.Infof("Object[%s] does not exists, maybe it has been deleted", queuekey.Key)
	} else {
		phase := obj.(*corev1.Pod).Status.Phase
		if queuekey.Type == types.EVENT_ADDED && phase == corev1.PodPending {
			c.newObj = queuekey.Key
		}
		// Note that you also have to check the uid if you have a local controlled resource, which
		// is dependent on the actual instance, to detect that a Pod was recreated with the same name
		Log.Infof("Pod[%s] is %s, status = %s", obj.(*corev1.Pod).GetName(), queuekey.Type, phase)
		if queuekey.Type == types.EVENT_MODIFIED && phase == corev1.PodRunning && queuekey.Key == c.newObj {
			Log.Infof("A new pod is created, need to restart tools")
			// 重启tools deployment写在这里
		}
	}
	return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
	if err == nil {
		// Forget about the #AddRateLimited history of the key on every successful synchronization.
		// This ensures that future processing of updates for this key is not delayed because of
		// an outdated error history.
		c.queue.Forget(key)
		return
	}

	// This controller retries 5 times if something goes wrong. After that, it stops trying
	if c.queue.NumRequeues(key) < 5 {
		Log.Infof("Error synching pod[%v]: %v", key, err)
		// Re-enqueue the key rate limited. Based on the rate limiter on the
		// queue and the re-enqueue history, the key will be processed later again
		c.queue.AddRateLimited(key)
		return
	}

	c.queue.Forget(key)
	// Report to an external entity that, even after several retries, we could not successfully process this key
	runtime.HandleError(err)
	Log.Infof("Dropping pod[%q] out of the queue: %v", key, err)
}

func (c *Controller) Run(workers int, stopch chan struct{}) {
	defer runtime.HandleCrash()

	// Let the workers stop when we are done
	defer c.queue.ShutDown()

	Log.Infof("Starting Pod controller")

	go c.informer.Run(stopch)

	if !cache.WaitForCacheSync(stopch, c.informer.HasSynced) {
		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopch)
	}

	<-stopch
	Log.Infof("Stopping Pod controller")
}

func (c *Controller) runWorker() {
	for c.processNextItem() {
	}
}

var environment = "local"
var kubeconfig = "./kubeconfig"

func createKubernetes() *kubernetes.Clientset {
	Log.Info("Using namespace=default")
	var conf *rest.Config
	var err error
	if environment == "local" {
		Log.Infof("Using kubeconfig=%s", kubeconfig)
		conf, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
	} else {
		Log.Info("Using serviceaccount=default")
		conf, err = rest.InClusterConfig()
		conf.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(1000, 1000) // setting a big ratelimiter for client-side throttling, default 5
	}
	if err != nil {
		Log.Error(err)
		os.Exit(0)
	}
	clientset, err := kubernetes.NewForConfig(conf)
	if err != nil {
		Log.Error(err)
		os.Exit(0)
	}
	return clientset
}

func main() {
	clientset := createKubernetes()

	// create the pod watcher
	optionsModifier := func(options *metav1.ListOptions) {
		options.LabelSelector = "app=nginx"
	}
	podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "default", optionsModifier)

	// create the workqueue
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
	// whenever the cache is updated, the pod key is added to the workqueue.
	// Note that when we finally process the item from the workqueue, we might see a newer version
	// of the Pod than the version which was responsible for triggering the update.
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &corev1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(&types.QueueKey{
					Type: types.EVENT_ADDED,
					Key:  key,
				})
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(&types.QueueKey{
					Type: types.EVENT_MODIFIED,
					Key:  key,
				})
			}
		},
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(&types.QueueKey{
					Type: types.EVENT_DELETED,
					Key:  key,
				})
				Log.Infof("Pod[%s] is DELETED", key)
			}
		},
	}, cache.Indexers{})

	controller := NewController(queue, indexer, informer)

	// Now let's start the controller
	stop := make(chan struct{})
	defer close(stop)
	go controller.Run(1, stop)

	// Wait forever
	select {}
}

func init() {
	flag.Parse()
	if *conf == "" {
		Log.Fatal("No config file specified")
	}
	viper.SetConfigFile(*conf)
	if err := viper.ReadInConfig(); err != nil {
		Log.Fatalf("Reading config file failed: %v\n", err)
	}
	viper.WatchConfig()
	InitLogger(viper.Get("Log.Level").(string))
	if viper.GetString("Kubeconfig") != "" {
		kubeconfig = viper.GetString("Kubeconfig")
	}
}

编译运行此程序,指定好kubeconfig文件,使其连接到一个K8S上:

student@k8s-master:~$ ./informer -c app.yaml
2023/12/22 11:42:24.446 [INFO] Using namespace=default
2023/12/22 11:42:24.446 [INFO] Using kubeconfig=/home/student/.kube/config
2023/12/22 11:42:24.449 [INFO] Starting Pod controller

本程序监听了default命名空间下,label为app=nginx的POD的状态,其状态判断条件为:
if queuekey.Type == types.EVENT_MODIFIED && phase == corev1.PodRunning && queuekey.Key == c.newObj
即一个POD事件是MODIFIED,且其状态为PodRunning,且POD名是一个新的名称(即这个POD是一个新建的POD,对应的场景是其所属的deployment被重启了)。

现在向default空间提交一个nginx的deployment,查看informer监听的事件:

2023/12/22 11:45:13.456 [INFO] Pod[nginx-5776d4fd9d-pplcs] is ADDED, status = Pending
2023/12/22 11:45:13.474 [INFO] Pod[nginx-5776d4fd9d-pplcs] is MODIFIED, status = Pending
2023/12/22 11:45:13.518 [INFO] Pod[nginx-5776d4fd9d-pplcs] is MODIFIED, status = Pending
2023/12/22 11:45:14.638 [INFO] Pod[nginx-5776d4fd9d-pplcs] is MODIFIED, status = Pending
2023/12/22 11:45:15.538 [INFO] Pod[nginx-5776d4fd9d-pplcs] is MODIFIED, status = Running
2023/12/22 11:45:15.541 [INFO] A new pod is created, need to restart tools

本文涉及到的两份完整代码参见:https://gitee.com/hongyuxuan2138/resource-informer

后记

通过自定义一个Informer相当于自己实现了一个资源的Controller,通常可用于以下场景:需要监听一些事件,并根据这些事件做一些其它资源的变更。这种场景如果不使用Informer,还可以通过定时刷新K8S资源状态来实现,但显然从逼格上来看就不如Informer。

Informer在我司的应用场景

我们在公司的K8S集群内部署了一套RedisCluster,因为他是容器内部署,客户端要连接它的话也必须部署在相同的容器集群内。如果客户端在容器集群外,就无法连上。原因是Cluster方式的Redis节点间通信使用的是私有地址(即POD IP),当我set一个key到Redis时,它会根据一定的规则决定将key分配给固定的slot,这个slot可能不在我当前连接的Redis副本上,而在另一个副本上(POD)。当前连接会把slot所在副本的IP(即POD IP)返回给客户端,这个IP是POD私有地址,容器外的客户端根本无法连接。解决这个问题的办法是我们又装了一个RedisCluster代理predixy,但这个代理有个bug,当它连接的RedisCluster有一个副本挂了,它不会自动清除这个已经失效的连接。此时如果客户端的访问正好落到这个失效的副本,就会报错连不上,必须重启predix才能重新和RedisCluster的当副本建立新连接。

问题来了,我怎么知道RedisCluster有一个POD重启了呢?Informer正是干这个事情的。我们自定义开发了一个Informer监视RedisCluster的POD重启事件,一旦发生后就去重启predixy,算是临时解决了这个bug。


文章作者: 洪宇轩
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 洪宇轩 !
评论
  目录