K8S二次开发系列之二:自定义开发CRD+Operator


本文篇幅较长,将手把手教你如何自定义开发一个自己的Operator。本文适合于已经对各种云原生软件Operator安装非常熟悉(例如prometheus-operator等),一时技痒想自己手撸一个Operator;或者是在实际工作中因业务需要自己定制一个Operator。本文不适合新手,如果你还不知道Operator是干啥的,建议先补补课再来。

前言

当前很多云原生软件都会提供一种叫做Operator的安装方式,比如prometheus-operator、istio-operator、awx-operator、rook-operator等等。为什么会出现Operator?什么又是Operator?随着我们越发深入的使用Kubernetes,就会发现K8S中内置的对象,比如DeploymentStatefulSet已经不能满足我们的需求。我们希望在K8S定义一些自己的对象,一是可以通过kube-apiserver提供统一的访问入口,二是可以像其他内置对象一样,通过kubectl命令管理这些自定义的对象。

Kubernetes 中提供了这种自定义对象的方式,其中之一就是 CRD。

CRD

CRD(CustomResourceDefinitions)是Operator的基础,讲Operator之前必先讲CRD。

CustomResourceDefinition API资源允许我们定义定制资源。 定义CRD对象的操作会使用你所设定的名字和模式定义(Schema)创建一个新的定制资源,Kubernetes API负责为我们的定制资源提供存储和访问服务。K8S官网给出了下面的一个例子:

crontabs.crd.yaml
apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: ## 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>' name: crontabs.stable.example.com spec: ## 组名称,用于 REST API: /apis/<组>/<版本> group: stable.example.com ## 列举此 CustomResourceDefinition 所支持的版本 versions: - name: v1 ## 每个版本都可以通过 served 标志来独立启用或禁止 served: true ## 其中一个且只有一个版本必需被标记为存储版本 storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: cronSpec: type: string image: type: string replicas: type: integer ## 可以是 Namespaced 或 Cluster scope: Namespaced names: ## 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式> plural: crontabs ## 名称的单数形式,作为命令行使用时和显示时的别名 singular: crontab ## kind 通常是单数形式的驼峰命名(CamelCased)形式。你的资源清单会使用这一形式。 kind: CronTab ## shortNames 允许你在命令行使用较短的字符串来匹配资源 shortNames: - ct

现在我们使用kubectl apply -f crontabs.crd.yaml把CRD提交到K8S,一个类型为CronTab的对象就在kube-apiserver中注册好了,我们可以通过如下的REST接口访问它:/apis/stable.example.com/v1/namespaces/ns1/crontabs/,这种接口跟Kubernetes内置的其他对象的接口风格是一模一样的。

在创建了CustomResourceDefinition对象之后,我们可以接着创建CronTab对象实例:

apiVersion: stable.example.com/v1
kind: CronTab
metadata:
  name: my-new-cron-object
spec:
  cronSpec: "* * * * */5"
  image: my-awesome-cron-image

现在可以使用kubectl来管理CronTab对象了。例如:

kubectl get crontab

这里的资源名是大小写不敏感的,我们可以使用复数形式kubectl get crontabs,也可以使用缩写kubectl get ct。和原生内置的API对象一样,这些CRD不仅可以通过kubectl来创建、查看、修改、删除,还可以给其配置RBAC权限。

什么是Kubernetes Operator

了解了CRD以后,再来看看什么是Kubernetes Operator。Kubernetes通过声明式API来定义对象,其描述的是对象期望的状态(比如replicas:2表示期望POD的副本数是2),而K8S的控制器负责实时查看对应对象的状态,确保对象最终达到定义的期望状态。这也叫做K8S的控制器模式。

kube-controller-manager就是由这样一组控制器组成的。

当我们创建好CRD以后,这只是自定义对象的期望状态,还没有任何控制器来控制它的创建、删除、修改、查看等等行为,所以我们还需要编写一个控制器,即Operator,来完成控制逻辑。

Kubernetes Operator是如何工作的

Operator工作的时候采用上述控制器模式,会持续的观察Kubernetes中自定义的对象,即CR(Custom Resource)

graph LR A[[user]] --modifications--> B{Custom Resource} B --tracking--> C(Operator) C --change events--> B B --in case of error-->B C ---> D{Current State}

Operator会持续跟踪这些CR的变化事件,比如ADD、UPDATE、DELETE,然后采取一系列操作,使其达到期望的状态。

构建自己的Kubernetes Operator

社区目前有些成熟的脚手架工具用于构建自己的Operator,常见的有三种:

本文将使用Operator-SDK来构建自己的Operator。

安装Operator-SDK

在下面的地址选择合适的二进制包下载:
https://github.com/operator-framework/operator-sdk/releases

把二进制文件operator-sdk放到PATH里,使其全局可用。

使用Go创建Operator

Operator SDK 提供以下工作流来开发一个新的 Operator:

graph TD A(使用operator-sdk工具创建一个新的Operator项目) --> B(使用operator-sdk工具创建一个新的api) B --> C(编辑工具自动生成的CRD, 添加自己需要的字段) C --> D(在controller部分编辑协调Reconcile逻辑) D --> E(使用operator-sdk工具构建并生成部署清单文件) E --> F{选择部署方式} F --> G(容器外部署) F --> H(容器集群内部署) H --> I(打包成Docker Image并上传镜像仓库, 在集群内以deployment方式部署)

具体步骤

接下来我们将自定义开发一个具有如下功能的Operator:

  • 定义一个CRD叫DemoApplication
  • 当我提交一个DemoApplication的CR(metadata.name=demo)时
    • 如果名为demo的deployment不存在,则创建它
    • 如果CR的字段里包含Service,则检查名为demo的service是否存在,如果不存在则创建它,并且关联上一步的deployment
    • 如果CR的字段里包含PersistentVolumeClaims,则检查CR指定的PVC是否已存在,如果不存在则创建它
    • 如果CR的字段里包含Ingress,则检查CR指定的Ingress是否存在
      • 如果Ingress不存在则创建它
      • 如果Ingress已存在则把CR里指定的Rule添加到已有的Ingress的rules后面
  • 当我删除一个DemoApplication的CR时
    • 如果名为demo的deployment已存在,则删除它
    • 如果名为demo的service已存在,则删除它

下面正式开撸

创建工程

用operator-sdk命令行工具创建一个工程叫demo-operator

mkdir demo-operator && cd demo-operator
operator-sdk init --domain example.com --repo demo

–domain指定了自定义的CRD的api-versions里的域名后缀,例如一个istio的networking.istio.io/v1alpha3,其中istio.io即domain,networking是下文要创建的api的一个参数group,v1alpha3是api的version。–repo指定了go工程的mod名,和go mod init xxx是一个意思。

执行完以后查看工程目录

tree
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── config
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   └── manager_config_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── manifests
│   │   └── kustomization.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   └── scorecard
│       ├── bases
│       │   └── config.yaml
│       ├── kustomization.yaml
│       └── patches
│           ├── basic.config.yaml
│           └── olm.config.yaml
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

10 directories, 29 files

查看自动创建好的main.go

func main() {
	var metricsAddr string
	var enableLeaderElection bool
	var probeAddr string
	flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
	flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
	flag.BoolVar(&enableLeaderElection, "leader-elect", false,
		"Enable leader election for controller manager. "+
			"Enabling this will ensure there is only one active controller manager.")

main函数里定义了程序监听的端口(默认8080)、metrics和健康检查的端口(默认8081)。并初始化创建了一个Manager:

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
  Scheme:                 scheme,
  MetricsBindAddress:     metricsAddr,
  Port:                   9443,
  HealthProbeBindAddress: probeAddr,
  LeaderElection:         enableLeaderElection,
  LeaderElectionID:       "aefd3536.example.com",
})

这时候还没有创建任何api,可以试着运行一下程序:

$ go run main.go
2023-12-07T17:12:55+08:00       INFO    controller-runtime.metrics      Metrics server is starting to listen    {"addr": ":8080"}
2023-12-07T17:12:55+08:00       INFO    setup   starting manager
2023-12-07T17:12:55+08:00       INFO    Starting server {"path": "/metrics", "kind": "metrics", "addr": "[::]:8080"}
2023-12-07T17:12:55+08:00       INFO    Starting server {"kind": "health probe", "addr": "[::]:8081"}

访问一下healthz:

curl localhost:8081/healthz;echo
ok

访问一下readyz

curl localhost:8081/readyz;echo
ok

创建API及Controller

$ operator-sdk create api --group application --version v1 --kind DemoApplication --resource --controller
Writing kustomize manifests for you to edit...
Writing scaffold for you to edit...
api/v1/demoapplication_types.go
controllers/demoapplication_controller.go
Update dependencies:
$ go mod tidy
Running make:
$ make generate
mkdir -p /home/student/operator/demo-operator/bin
test -s /home/student/operator/demo-operator/bin/controller-gen && /home/student/operator/demo-operator/bin/controller-gen --version | grep -q v0.11.1 || \
GOBIN=/home/student/operator/demo-operator/bin go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.11.1
/home/student/operator/demo-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
Next: implement your new API and generate the manifests (e.g. CRDs,CRs) with:
$ make manifests

–group指定了组为application,–version指定了版本为v1,于是和上一步的–domain组合起来以后,我们在api-versions里看到的自定义的CRD就是application.example.com/v1

上述的操作将会生成DemoApplication resource API文件,其文件位于api/v1/demoapplication_types.go及控制器文件位于controllers/demoapplication_controller.go。

这时候再查看工程目录,发现多了api/v1目录,config下面多了crd目录

$ tree -L 2
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
├── bin
│   └── controller-gen
├── config
│   ├── crd
│   ├── default
│   ├── manager
│   ├── manifests
│   ├── prometheus
│   ├── rbac
│   ├── samples
│   └── scorecard
├── controllers
│   ├── demoapplication_controller.go
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

14 directories, 11 files

注意这里只介绍了单组API的使用,如果我们想支持多组 API,请参考Single Group to Multi-Group文档。

有关Kubernetes API和group-version-kind模型的深入解读,我们可以查看这些kubebuilder docs
一般来说,建议让一个控制器负责管理工程的每个API,以遵循controller-runtime设定的设计目标。

编辑API类型定义

接下来修改 api/v1/demoapplication_types.go中的Go类型定义,为DemoApplication自定义资源(CR)定义API,使其具有以下属性字段和状态:

// DemoApplicationSpec defines the desired state of DemoApplication
type DemoApplicationSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// replicas of the application deployment
	//+kubebuilder:validation:Minimum=0
	Replica int32 `json:"replica"`
	// containers of the application deployment
	InitContainers []corev1.Container `json:"initContainers,omitempty"`
	Containers     []corev1.Container `json:"containers"`
	// volumes of the application deployment
	Volumes []Volume `json:"volumes,omitempty"`
	// service of the application
	Service *Service `json:"service,omitempty"`
	// PersistentVolumeClaim of the application
	PersistentVolumeClaims []PersistentVolumeClaim `json:"persistentVolumeClaims,omitempty"`
	// Ingress of the application, will add rules to an exist's ingress
	Ingress *Ingress `json:"ingress,omitempty"`
	// ServiceAccount of the application
	ServiceAccountName string                     `json:"serviceAccountName,omitempty"`
	SecurityContext    *corev1.PodSecurityContext `json:"securityContext,omitempty"`
}

// DemoApplicationStatus defines the observed state of DemoApplication
type DemoApplicationStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
	Nodes []string `json:"nodes"`
}

其中DemoApplicationStatus描述了CRD的状态信息,里面定义了一个字段Nodes用于存放DemoApplication对应POD的IP。

上述DemoApplicationSpec结构里用到了的VolumeServicePersistentVolumeClaimIngress,原本client-go里都是有的,我这里自己又重新定义了一遍,是为了减少很多用不上的字段,详见下方:

type Volume struct {
	// Volume name
	Name string `json:"name"`
	// PersistentVolumeClaim type of volume
	PersistentVolumeClaim *corev1.PersistentVolumeClaimVolumeSource `json:"persistentVolumeClaim,omitempty"`
	// HostPath type of volume
	HostPath *corev1.HostPathVolumeSource `json:"hostPath,omitempty"`
	// ConfigMap type of volume
	ConfigMap *corev1.ConfigMapVolumeSource `json:"configMap,omitempty"`
}

type Service struct {
	Type *corev1.ServiceType `json:"type,omitempty"`
}

type PersistentVolumeClaim struct {
	AccessModes      []corev1.PersistentVolumeAccessMode `json:"accessModes"`
	StorageClassName *string                             `json:"storageClassName"`
	Resources        corev1.ResourceRequirements         `json:"resources"`
	Name             string                              `json:"name"`
}

type Ingress struct {
	IngressName string              `json:"ingressName"`
	Rules       []netv1.IngressRule `json:"rules"`
}

为了控制器可以在不更改CR对象其余部分的情况下更新CR状态,添加marker +kubebuilder:subresource:status 到CR:

// DemoApplication is the Schema for the demoapplications API
// +kubebuilder:subresource:status  // 添加此行
type DemoApplication struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   DemoApplicationSpec   `json:"spec,omitempty"`
	Status DemoApplicationStatus `json:"status,omitempty"`
}

修改完*_types.go文件后,运行以下命令为该资源类型生成代码:

make generate

上面的makefile的generate子命令将调用controller-gen来更新zz_generated.deepcopy.go以确保我们API的Go类型定义实现所有Kind类型必须实现的runtime.Object接口。

生成CRD清单

定义完并修改完API以后,使用下面的命令生成和更新CRD清单:

make manifests

这个makefile的manifests子命令将调用controller-gen在config/crd/bases/application.example.com_demoapplications.yaml文件中生成CRD清单。

修改Controller

接下来将修改最主要的Operator的Reconcile逻辑了。编辑controllers/demoapplication_controller.go文件,修改Reconcile函数

func (r *DemoApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := ctrllog.FromContext(ctx)
	log.Info("Entering operator reconciling...")
  // ……
}

将要实现的主要逻辑为:

  1. 如果CR不存在(比如使用kubectl delete -f删除了CR),则对应将deployment和service删了。
// Check if CR exist
demo := &demov1.DemoApplication{}
err := r.Get(ctx, req.NamespacedName, demo)
if err != nil {
  if errors.IsNotFound(err) {
    log.Info("DemoApplication resource not found. will delete related resources if they exit")
    // if deployment exists then delete it
    deploy := &appsv1.Deployment{}
    r.Get(ctx, req.NamespacedName, deploy)
    if deploy.Name != "" {
      err = r.Delete(ctx, deploy)
      if err != nil {
        log.Error(err, "Delete deployment", deploy.Name, "failed")
      } else {
        log.Info("Delete deployment", deploy.Name, "success")
      }
    }
    // if service exists then delete it
    service := &corev1.Service{}
    r.Get(ctx, req.NamespacedName, service)
    if service.Name != "" {
      err = r.Delete(ctx, service)
      if err != nil {
        log.Error(err, "Delete service", deploy.Name, "failed")
      } else {
        log.Info("Delete service", service.Name, "success")
      }
    }
    // Return and don't requeue
    return ctrl.Result{}, nil
  }
  // Error reading the object - requeue the request.
  log.Error(err, "Failed to get DemoApplication")
  return ctrl.Result{}, err
}
  1. 如果CR中传递了PersistentVolumeClaims,且CR指定的PVC不存在,则创建之(注意创建PVC需要在创建Deployment之前进行,否则Deployment会起不来)
// Check if the pvc already exists, if not create a new one
if demo.Spec.PersistentVolumeClaims != nil {
  for _, x := range demo.Spec.PersistentVolumeClaims {
    pvc := &corev1.PersistentVolumeClaim{}
    err = r.Get(ctx, types.NamespacedName{Name: x.Name, Namespace: demo.Namespace}, pvc)
    if err != nil && errors.IsNotFound(err) {
      newPvc := r.pvcForDemoApplication(demo, x)
      log.Info("Creating a new PersistentVolumeClaim", "PVC.Namespace", newPvc.Namespace, "PVC.Name", newPvc.Name)
      err = r.Create(ctx, newPvc)
      if err != nil {
        log.Error(err, "Failed to create new PersistentVolumeClaim", "PersistentVolumeClaim.Namespace", newPvc.Namespace, "PersistentVolumeClaim.Name", newPvc.Name)
        return ctrl.Result{}, err
      }
      // Deployment created successfully - return and requeue
      // return ctrl.Result{RequeueAfter: time.Minute}, nil
    } else if err != nil {
      log.Error(err, "Failed to get PersistentVolumeClaim")
      return ctrl.Result{}, err
    } else {
      log.Info("PersistentVolumeClaim already exist, ignore creating", "PVC.Namespace", pvc.Namespace, "PVC.Name", pvc.Name)
    }
  }
}

pvcForDemoApplication函数返回了将要创建的PVC对象。其代码为:

func (r *DemoApplicationReconciler) pvcForDemoApplication(f *demov1.DemoApplication, x demov1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim {
	pvc := &corev1.PersistentVolumeClaim{
		ObjectMeta: metav1.ObjectMeta{
			Name:      x.Name,
			Namespace: f.Namespace,
		},
		Spec: corev1.PersistentVolumeClaimSpec{
			AccessModes:      x.AccessModes,
			Resources:        x.Resources,
			StorageClassName: x.StorageClassName,
		},
	}
	return pvc
}
  1. 如果Deployment不存在,则创建之
// Check if the deployment already exists, if not create a new one
deploy := &appsv1.Deployment{}
err = r.Get(ctx, types.NamespacedName{Name: demo.Name, Namespace: demo.Namespace}, deploy)
if err != nil && errors.IsNotFound(err) {
  // Define a new deployment
  newDeploy := r.deploymentForDemoApplication(demo)
  log.Info("Creating a new Deployment", "Deployment.Namespace", newDeploy.Namespace, "Deployment.Name", newDeploy.Name)
  err = r.Create(ctx, newDeploy)
  if err != nil {
    log.Error(err, "Failed to create new Deployment", "Deployment.Namespace", newDeploy.Namespace, "Deployment.Name", newDeploy.Name)
    return ctrl.Result{}, err
  }
  // Deployment created successfully - return and requeue
  // return ctrl.Result{Requeue: true}, nil
} else if err != nil {
  log.Error(err, "Failed to get Deployment")
  return ctrl.Result{}, err
} else {
  // update deployment
  deploy = r.deploymentForDemoApplication(demo)
  if err = r.Update(ctx, deploy); err != nil {
    log.Error(err, "Failed to update Deployment", "Deployment.Namespace", deploy.Namespace, "Deployment.Name", deploy.Name)
  } else {
    log.Info("Update Deployment success", "Deployment.Namespace", deploy.Namespace, "Deployment.Name", deploy.Name)
  }
}

deploymentForDemoApplication函数返回了将要创建的Deployment对象,其代码为:

func (r *DemoApplicationReconciler) deploymentForDemoApplication(f *demov1.DemoApplication) *appsv1.Deployment {
	labels := map[string]string{
		"app": f.Name,
	}
	// if not set resource, give the default value
	var containers []corev1.Container
	for _, c := range f.Spec.Containers {
		limitsCpu, _ := resource.ParseQuantity("500m")
		limitsMem, _ := resource.ParseQuantity("500Mi")
		requestsCpu, _ := resource.ParseQuantity("250m")
		requestsMem, _ := resource.ParseQuantity("250Mi")
		resources := corev1.ResourceRequirements{
			Limits: corev1.ResourceList{
				corev1.ResourceCPU:    limitsCpu,
				corev1.ResourceMemory: limitsMem,
			},
			Requests: corev1.ResourceList{
				corev1.ResourceCPU:    requestsCpu,
				corev1.ResourceMemory: requestsMem,
			},
		}
		if c.Resources.Size() == 0 {
			c.Resources = resources
		}
		containers = append(containers, c)
	}
	var initContainers []corev1.Container
	for _, c := range f.Spec.InitContainers {
		limitsCpu, _ := resource.ParseQuantity("100m")
		limitsMem, _ := resource.ParseQuantity("100Mi")
		requestsCpu, _ := resource.ParseQuantity("50m")
		requestsMem, _ := resource.ParseQuantity("50Mi")
		resources := corev1.ResourceRequirements{
			Limits: corev1.ResourceList{
				corev1.ResourceCPU:    limitsCpu,
				corev1.ResourceMemory: limitsMem,
			},
			Requests: corev1.ResourceList{
				corev1.ResourceCPU:    requestsCpu,
				corev1.ResourceMemory: requestsMem,
			},
		}
		if c.Resources.Size() == 0 {
			c.Resources = resources
		}
		initContainers = append(initContainers, c)
	}
	deploy := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      f.Name,
			Namespace: f.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &f.Spec.Replica,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Volumes: lo.Map[demov1.Volume, corev1.Volume](f.Spec.Volumes, func(x demov1.Volume, _ int) corev1.Volume {
						var volumeSource corev1.VolumeSource
						if x.HostPath != nil {
							volumeSource = corev1.VolumeSource{
								HostPath: x.HostPath,
							}
						} else if x.PersistentVolumeClaim != nil {
							volumeSource = corev1.VolumeSource{
								PersistentVolumeClaim: x.PersistentVolumeClaim,
							}
						} else if x.ConfigMap != nil {
							volumeSource = corev1.VolumeSource{
								ConfigMap: x.ConfigMap,
							}
						}
						return corev1.Volume{
							Name:         x.Name,
							VolumeSource: volumeSource,
						}
					}),
					InitContainers:           initContainers,
					Containers:               containers,
					ServiceAccountName:       f.Spec.ServiceAccountName,
					DeprecatedServiceAccount: f.Spec.ServiceAccountName,
					SecurityContext:          f.Spec.SecurityContext,
				},
			},
		},
	}
	return deploy
}

这其中还为没有指定resources的container和initContainer设置了默认的resources

  1. 如果和Deployment同名的Service不存在,则创建之:
// If DemoApplication Service is not nil, check if the service exists, if not create a new one
if demo.Spec.Service != nil {
  service := &corev1.Service{}
  err = r.Get(ctx, types.NamespacedName{Name: demo.Name, Namespace: demo.Namespace}, service)
  if err != nil && errors.IsNotFound(err) {
    // Define a new service
    newService := r.serviceForDemoApplication(demo)
    log.Info("Creating a new Service", "Serivce.Namespace", newService.Namespace, "Serivce.Name", newService.Name)
    err = r.Create(ctx, newService)
    if err != nil {
      log.Error(err, "Failed to create new Serivce", "Serivce.Namespace", newService.Namespace, "Serivce.Name", newService.Name)
      return ctrl.Result{}, err
    }
    // Service created successfully - return and requeue
    // return ctrl.Result{Requeue: true}, nil
  } else if err != nil {
    log.Error(err, "Failed to get Serivce")
    return ctrl.Result{}, err
  } else {
    // update service
    service = r.serviceForDemoApplication(demo)
    if err = r.Update(ctx, service); err != nil {
      log.Error(err, "Failed to update Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
    } else {
      log.Info("Update Service success", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
    }
  }
}

serviceForDemoApplication函数返回了将要创建的Service对象,其代码为:

func (r *DemoApplicationReconciler) serviceForDemoApplication(f *demov1.DemoApplication) *corev1.Service {
	var serviceType corev1.ServiceType = "ClusterIP"
	if f.Spec.Service.Type != nil {
		serviceType = *f.Spec.Service.Type
	}
	service := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      f.Name,
			Namespace: f.Namespace,
		},
		Spec: corev1.ServiceSpec{
			Ports: lo.Map[corev1.ContainerPort, corev1.ServicePort](f.Spec.Containers[0].Ports, func(x corev1.ContainerPort, _ int) corev1.ServicePort {
				return corev1.ServicePort{
					Name:       fmt.Sprintf("port-%d", x.ContainerPort),
					Protocol:   x.Protocol,
					Port:       x.ContainerPort,
					TargetPort: intstr.FromInt(int(x.ContainerPort)),
				}
			}),
			Selector: map[string]string{
				"app": f.Name,
			},
			Type: serviceType,
		},
	}
	return service
}
  1. 如果Ingress不存在,则创建之;如果Ingress已存在,将CR中指定的Ingress rule补到已有的Ingress的rules后面
// If DemoApplication Ingress is not nil, add rules to Ingress
if demo.Spec.Ingress != nil {
  ingress := &netv1.Ingress{}
  err = r.Get(ctx, types.NamespacedName{Name: demo.Spec.Ingress.IngressName, Namespace: demo.Namespace}, ingress)
  if err != nil && errors.IsNotFound(err) {
    // Define a new ingress
    newIngress := r.ingressForDemoApplication(demo)
    log.Info("Creating a new Ingress", "Ingress.Namespace", newIngress.Namespace, "Ingress.Name", newIngress.Name)
    err = r.Create(ctx, newIngress)
    if err != nil {
      log.Error(err, "Failed to create new Ingress", "Ingress.Namespace", newIngress.Namespace, "Ingress.Name", newIngress.Name)
      return ctrl.Result{}, err
    }
    // Service created successfully - return and requeue
    // return ctrl.Result{Requeue: true}, nil
  } else {
    ingress.Spec.Rules = lo.UniqBy[netv1.IngressRule, string](append(ingress.Spec.Rules, demo.Spec.Ingress.Rules...), func(x netv1.IngressRule) string {
      return x.Host + x.HTTP.Paths[0].Path
    })
    if err = r.Update(ctx, ingress); err != nil {
      log.Error(err, "Cannot update Ingress", "Ingress.Namespace", demo.Namespace, "Ingress.Name", demo.Spec.Ingress.IngressName)
      return ctrl.Result{}, err
    } else {
      log.Info("Update Ingress success", "Ingress.Namespace", demo.Namespace, "Ingress.Name", demo.Spec.Ingress.IngressName)
    }
  }
}

ingressForDemoApplication函数返回了将要创建的Ingress对象,其代码为:

func (r *DemoApplicationReconciler) ingressForDemoApplication(f *demov1.DemoApplication) *netv1.Ingress {
	ingressClassName := "nginx" // default ingressClassName
	ing := &netv1.Ingress{
		ObjectMeta: metav1.ObjectMeta{
			Name:      f.Spec.Ingress.IngressName,
			Namespace: f.Namespace,
		},
		Spec: netv1.IngressSpec{
			IngressClassName: &ingressClassName,
			Rules:            f.Spec.Ingress.Rules,
		},
	}
	return ing
}

Controller watch的资源

现在主要逻辑已经写完了,下来需要指定controller应该监视(watch)哪些资源。SetupWithManager()函数指定了如何构建控制器以监视CR和该控制器拥有和管理的其他资源。

// SetupWithManager sets up the controller with the Manager.
func (r *DemoApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&demov1.DemoApplication{}).
		WithEventFilter(filterPredicate()).
		Complete(r)
}

NewControllerManagedBy()提供了一个控制器构建器,允许各种控制器的配置。

For(&demov1.DemoApplication{})将DemoApplication类型指定为要监视的主要资源。对于每个DemoApplication类型的Add/Update/Delete事件,Reconcile Loop将为该DemoApplication对象发送一个Reconcile Request(参数为命名空间和key名称)。

WithEventFilter(filterPredicate())是事件过滤器,如果不设定这个过滤器,当CR的状态从Creating变为Pending、从Pending变为Running的过程中还会额外两次触发Reconcile,这是没有必要的。filterPredicate函数代码如下:

func filterPredicate() predicate.Predicate {
	return predicate.Funcs{
		UpdateFunc: func(e event.UpdateEvent) bool {
			// Ignore updates to CR status in which case metadata.Generation does not change
			return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
		},
		DeleteFunc: func(e event.DeleteEvent) bool {
			// Evaluates to false if the object has been confirmed deleted.
			return !e.DeleteStateUnknown
		},
		CreateFunc: func(e event.CreateEvent) bool {
			return true
		},
	}
}

更多Controller配置

在初始化控制器时可以进行许多其他有用的配置。有关这些配置的更多详细信息,可以查看上游builder和controller的帮助文档。

  • 通过MaxConcurrentReconciles选项设置控制器的最大并 Reconciles数。默认为 1。
  • 使用predicates过滤监视事件。
  • 使用EventHandler将一个Reconcile请求重新放入队列(enqueue)以进行Reconcile循环。
  • 对于比主次资源更复杂的operator关系,可以使用EnqueueRequestsFromMapFunc处理程序以将监视事件转换为任意一组Reconcile请求。

Reconcile Loop

每个Controller都有一个Reconciler对象,该对象带有一个Reconcile()方法,用于实现Reconcile循环。每次在监视的CR或资源上发生事件时,它都会运行,并将根据这些状态是否匹配并返回一些值。

Reconcile循环传递了Request参数,该参数是用于查找缓存中定义的CRD资源对象DemoApplication。

有关Reconcilers、客户端以及与资源事件交互的指南,可以参考客户端API文档。

以下是Reconciler的一些可能的返回选项:

  • 发生错误时:
return ctrl.Result{}, err
  • 没有错误时:
return ctrl.Result{Requeue: true}, nil
  • 否则, 需要停止Reconcile,如下:
return ctrl.Result{}, nil
  • 在X时间之后,再次Reconcile:
return ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())}, nil

指定权限及生成RBAC清单

controller需要一定的RBAC权限与其管理的资源进行交互,这些是通过RBAC标记指定的,如下代码所示:

//+kubebuilder:rbac:groups=application.example.com,resources=demoapplications,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=application.example.com,resources=demoapplications/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=application.example.com,resources=demoapplications/finalizers,verbs=update

// controller需要对deployments/services/persistentvolumeclaims/pods等资源进行增删改查
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;create;update;patch
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list
//+kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;create;update;patch
func (r *DemoApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
}

通过以上markers可以在config/rbac/目录下面生成对应的rbac文件。执行以下命令:

make manifests

运行Operator

在本地(集群之外)运行

$ make install
test -s /home/student/operator/demo-operator/bin/controller-gen && /home/student/operator/demo-operator/bin/controller-gen --version | grep -q v0.11.1 || \
GOBIN=/home/student/operator/demo-operator/bin go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.11.1
/home/student/operator/demo-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
/home/student/operator/demo-operator/bin/kustomize build config/crd | kubectl apply -f -
The CustomResourceDefinition "demoapplications.application.example.com" is invalid: 
* spec.validation.openAPIV3Schema.properties[spec].properties[initContainers].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
* spec.validation.openAPIV3Schema.properties[spec].properties[persistentVolumeClaims].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
* spec.validation.openAPIV3Schema.properties[spec].properties[containers].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
make: *** [Makefile:154: install] Error 1

make install命令会下载kustomize到工程根目录的bin/下面,如果没网无法下载,可以手动去下载后放到bin。

接着会报出must be atomic as item of a list with x-kubernetes-list-type=set这个莫名其妙的错误,可能是个bug或者版本问题。根据提示修改config/crd/bases/application.example.com_demoapplications.yaml文件,搜索关键词claims,把所有claims块下面的x-kubernetes-list-type: set这一行删了

properties:
  claims:
    summary: "Claims lists the names of resources, defined
      in spec.resourceClaims, that are used by this container.
      \n This is an alpha field and requires enabling the DynamicResourceAllocation
      feature gate. \n This field is immutable."
    items:
      summary: ResourceClaim references one entry in PodSpec.ResourceClaims.
      properties:
        name:
          summary: Name must match the name of one entry
            in pod.spec.resourceClaims of the Pod where this
            field is used. It makes that resource available
            inside a container.
          type: string
      required:
      - name
      type: object
    type: array
    x-kubernetes-list-type: set  // <--这一行删掉

删掉以后不能再次运行make install,否则controller-gen又会自动生成出claims这块内容。直接手动提交这个yaml:

kubectl apply -f config/crd/bases/application.example.com_demoapplications.yaml

CRD创建好以后,直接运行:

$ go run main.go
2023-12-13T10:27:50+08:00       INFO    controller-runtime.metrics      Metrics server is starting to listen    {"addr": ":8080"}
2023-12-13T10:27:50+08:00       INFO    setup   starting manager
2023-12-13T10:27:50+08:00       INFO    Starting server {"path": "/metrics", "kind": "metrics", "addr": "[::]:8080"}
2023-12-13T10:27:50+08:00       INFO    Starting server {"kind": "health probe", "addr": "[::]:8081"}
2023-12-13T10:27:50+08:00       INFO    Starting EventSource    {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication", "source": "kind source: *v1.DemoApplication"}
2023-12-13T10:27:50+08:00       INFO    Starting Controller     {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication"}
2023-12-13T10:27:50+08:00       INFO    Starting workers        {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication", "worker count": 1}

现在提交一个CR到集群:

apiVersion: application.example.com/v1
kind: DemoApplication
metadata:
  name: demo
  namespace: default
spec:
  containers:
  - image: nginx:1.21.4
    imagePullPolicy: Always
    name: demo-container
    ports:
    - containerPort: 80
      protocol: TCP
  replica: 1
  service: {}

将会看到controller打印下面的日志:

2023-12-13T10:32:07+08:00       INFO    Entering operator reconciling...        {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication", "DemoApplication": {"name":"demo","namespace":"default"}, "namespace": "default", "name": "demo", "reconcileID": "a94b5cc2-c9d6-467a-83cc-fb8be5ade3a8"}
2023-12-13T10:32:07+08:00       INFO    Creating a new Deployment       {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication", "DemoApplication": {"name":"demo","namespace":"default"}, "namespace": "default", "name": "demo", "reconcileID": "a94b5cc2-c9d6-467a-83cc-fb8be5ade3a8", "Deployment.Namespace": "default", "Deployment.Name": "demo"}
2023-12-13T10:32:07+08:00       INFO    Creating a new Service  {"controller": "demoapplication", "controllerGroup": "application.example.com", "controllerKind": "DemoApplication", "DemoApplication": {"name":"demo","namespace":"default"}, "namespace": "default", "name": "demo", "reconcileID": "a94b5cc2-c9d6-467a-83cc-fb8be5ade3a8", "Serivce.Namespace": "default", "Serivce.Name": "demo"}

修改CR里面的各个字段,会发现controller对应的更新了deployment、service、ingress等资源。最后删除这个CR,deployment和service也随之会被删除。

在集群中以Deployment运行

首先编译出可执行文件:

go build -o bin/manager main.go

然后修改Dockerfile,改为如下内容:

FROM alpine:3.15.4
WORKDIR /
COPY bin/manager /manager
## USER 65532:65532
ENTRYPOINT ["/manager"]

然后build出镜像并上传到镜像仓库。假设镜像为:demo-operator:1.0.0,修改Makefile:

VERSION ?= 1.0.0
……
IMAGE_TAG_BASE ?= demo-operator
……
IMG ?= $(IMAGE_TAG_BASE):$(VERSION)

执行以下命令部署:

$ make deploy
namespace/demo-operator-system created
serviceaccount/demo-operator-controller-manager created
role.rbac.authorization.k8s.io/demo-operator-leader-election-role created
clusterrole.rbac.authorization.k8s.io/demo-operator-manager-role created
clusterrole.rbac.authorization.k8s.io/demo-operator-metrics-reader created
clusterrole.rbac.authorization.k8s.io/demo-operator-proxy-role created
rolebinding.rbac.authorization.k8s.io/demo-operator-leader-election-rolebinding created
clusterrolebinding.rbac.authorization.k8s.io/demo-operator-manager-rolebinding created
clusterrolebinding.rbac.authorization.k8s.io/demo-operator-proxy-rolebinding created
service/demo-operator-controller-manager-metrics-service created
deployment.apps/demo-operator-controller-manager created
The CustomResourceDefinition "demoapplications.application.example.com" is invalid: 
* spec.validation.openAPIV3Schema.properties[spec].properties[persistentVolumeClaims].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
* spec.validation.openAPIV3Schema.properties[spec].properties[initContainers].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
* spec.validation.openAPIV3Schema.properties[spec].properties[containers].items.properties[resources].properties[claims].items.x-kubernetes-map-type: Invalid value: "null": must be atomic as item of a list with x-kubernetes-list-type=set
make: *** [Makefile:163: deploy] Error 1

同样是最后报must be atomic as item of a list with x-kubernetes-list-type=set错误,前面已经单独提交过CRD了,这个报错可以忽略。

可以看到创建了ns,创建了一堆rbac,最后部署了deployment,如果deployment起不来,再去手动检查什么问题。

最后可执行以下命令清理环境:

make undeploy

后记

就在几年前没有Operator的时候,大部分软件都是用一个yaml文件来安装,或者复杂点的用Helm Charts。现在我们随便去安装部署一个云原生的软件,基本上都是Operator方式安装,为什么要用Operator,它到底有啥好的?

实际上,如果软件架构简单、安装过程不需要任何逻辑,用一个yaml完全可以应付;如果架构复杂,安装过程带一些判断逻辑,有一些前后依赖关系,这种情况就需要Helm Charts了;而如果要对软件做一些自动化控制,或者是对某些资源类型做高级抽象,就必须依赖CRD+Operator了。

举个例子,Prometheus-Operator里有一个CRD叫ServiceMonitor,翻译过来叫服务监控导出器,它可以通过LabelSelector选中Service,并暴露服务的/metrics端点。而另外一个叫做Prometheus的CRD再通过LabelSelector选中ServiceMonitor,这样就能实现当我需要配置Prometheus去scrape一个服务的时候,只需要为这个服务创建一个ServiceMonitor这个CR就行了,Prometheus就能通过这个ServiceMonitor自动地抓取它对应的Service的/metrics,而这个自动的过程就由Prometheus-Operator来完成。像这种逻辑复杂、高度抽象的东西,就必须由CRD+Operator来完成了。

设想一下如果Prometheus不是云原生安装,而是虚机部署的,为了scrape一个endpoint的/metrics,我要么写一个static_config,要么写一个file_sd_config,把endpoint写入文件,再重启Prometheus;要么高级点的用一个consul_sd_config,那我还得装个Consul,把endpoint注册到Consul里,这复杂度成几何倍数增长。

再举一个例子,用过Prometheus的rules的人都知道,为了新增一个rule,我需要改rules.yml,这种手工改文件的操作,没法集成做平台化。如果每个人都需要自定义自己的rule,rules.yml可能有上千行。如果我要修改一个现有的rule,我得打开rules.yml文件,可能是从几千行里找到我要的rule并且修改它,再保存文件,这工作根本没法做了。曾经我在某国有大行工作的时候负责Prometheus,就一直没解决这个rules的问题。

云原生部署Prometheus的时候是如何解决rules的问题的?很简单,Prometheus-Operator提供了一个CRD叫PrometheusRules,一个PrometheusRules即是一个rule规则,它有自己的名字。我要新增一个rule,我就提交一个PrometheusRules的CR到K8S就行了。我要修改一个rule,我只需要找到这个名字的PrometheusRules,修改它再保存就行了。完美解决了之前的rules问题。

好了,说了半天,我又为什么要自己写一个Operator?目前我在一中字头券商部门负责DevOps和云原生的相关工作。当我为一个项目去往K8S部署应用的时候,它不仅仅是部署一个Deployment就完了,还要对应的配置ServicePVCConfigMapIngress,这是一个应用的全套资源。部署下一个应用的时候还是这套资源。于是我就把这一套资源抽象成一个CRD叫Application,用Operator来完成部署,当我提交一个Application就相当于提交了一整套应用,这大大加快了应用的部署速度(卸载应用的时候也是全套卸载)。

一旦你用过了Operator,你会发现其它方式瞬间就弱爆了,Operator真香~


文章作者: 洪宇轩
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 洪宇轩 !
评论
  目录