利用 kubebuilder 优化 Kubernetes Operator 开发体验

栏目: 服务器 · 发布时间: 5年前

内容简介:Kubernetes 现在已经成为了容器集群管理,乃至云计算的事实标准。相比于它曾经的竞争对手如 Mesos,Docker Swarm 等,其最大的优势在于扩展性。其扩展性的一个重要的体现,就是在没有因此接下来,本文主要介绍利用

Kubernetes 现在已经成为了容器集群管理,乃至云计算的事实标准。相比于它曾经的竞争对手如 Mesos,Docker Swarm 等,其最大的优势在于扩展性。其扩展性的一个重要的体现,就是 Custom Resource 这一特性。Kubernetes 本身有很多资源类型,被我们熟知的有 Pod,Job,Deployment 等等。而通过 Custom Resource ,用户可以定义自己的资源,并实现对应的 Operator(控制器)来处理对资源的请求。用户实现的 Operator 通过与 Kubernetes 的 API server 交互,来实现自身的业务逻辑。

在没有 kubebuilder 之前,为了实现一个 Operator,用户需要完全实现从 Kubernetes Client 的创建开始,到监听 Kubernetes API Server 的请求,再到请求的队列化,以及后面的业务逻辑一整套的逻辑。在整个过程中,有一些逻辑是所有的 Operator 在实现的时候都需要的。因此 kubebuilder 将其进行了封装和抽象成了公共的库( controller-runtime )和公共的工具( controller-tools )。随后在开发 Operator 的时候,只需要通过 kubebuilder 生成一些 Scaffolding(脚手架)代码,就可以依据脚手架代码专心于业务逻辑的开发即可。用户不再需要关心 Kubernetes API Server 发来的请求是怎样进入请求队列,然后被依次执行的,只需要关心要如何处理当前的请求即可。其他的事情会由 Scaffolding 代码中用到的 controller-runtime 等库来帮助开发者处理。因此对于用户而言,其只需要关注生成的 Scaffolding 代码中需要用户修改的部分即可。

因此接下来,本文主要介绍利用 kubebuilder v1 scaffolding 简化 Operator 开发的过程。目前 kubebuilder 社区已经推出了正在开发中的 第二版实现 ,这一实现将在后续文章进行介绍。

开发流程

初始化项目

首先,我们需要初始化项目:

kubebuilder init

在当前目录会生成一个最小可编译版本,但其中没有任何的 Operator 实现。目录结构如下所示:

├── cmd
│   └── manager
│       └── main.go
├── config
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_image_patch.yaml
│   │   └── manager_prometheus_metrics_patch.yaml
│   ├── manager
│   │   └── manager.yaml
│   └── rbac
│       ├── auth_proxy_role_binding.yaml
│       ├── auth_proxy_role.yaml
│       └── auth_proxy_service.yaml
├── Dockerfile
├── Gopkg.toml
├── hack
│   └── boilerplate.go.txt
├── Makefile
├── pkg
│   ├── apis
│   │   └── apis.go
│   ├── controller
│   │   └── controller.go
│   └── webhook
│       └── webhook.go
└── PROJECT

其中 cmd 目录是关于可执行文件的代码。config 是在 Kubernetes 上部署 Operator 的配置文件。Dockerfile 是把 Operator 打包为 Docker 镜像的配置文件。Gopkg.toml 是自动生成的关于依赖的配置文件。hack 目录下是有关 Copyright Header 的文件。而 pkg 下则是随后的 API 和代码实现。

创建 CRD 和 Operator 的 Scaffolding 代码

随后我们运行如下命令来创建 CRD 和对应的 Operator 的实现:

kubebuilder create api --group ship --version v1beta1 --kind Frigate

可以看到在目录内生成了一些新的文件和目录:

├── pkg
│   ├── apis
+│   │   ├── addtoscheme_ship_v1beta1.go
│   │   ├── apis.go
│   │   └── ship
│   │       ├── group.go
+│   │       └── v1beta1
+│   │           ├── doc.go
+│   │           ├── frigate_types.go
+│   │           ├── frigate_types_test.go
+│   │           ├── zz_generated.deepcopy.go
+│   │           ├── register.go
+│   │           └── v1beta1_suite_test.go
│   ├── controller
+│   │   ├── add_frigate.go
│   │   ├── controller.go
+│   │   └── frigate
+│   │       ├── frigate_controller.go
+│   │       ├── frigate_controller_suite_test.go
+│   │       └── frigate_controller_test.go
│   └── webhook
│       └── webhook.go
└── PROJECT

其中,需要用户修改的文件主要有两个,分别是 frigate_types.gofrigate_controller.go 。前者是 CRD 的 API 定义相关内容,后者是 CRD 对应的 Operator 实现。

修改 API

frigate_types.go 中,我们可以看到有提示用户编辑的注释:

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// FrigateSpec defines the desired state of Frigate
type FrigateSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

// FrigateStatus defines the observed state of Frigate
type FrigateStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

关于 Kubernetes Custom Resource 相关的 API 定义相关内容,建议参考 Extend the Kubernetes API with CustomResourceDefinitions ,此处不再赘述。用户只需要修改对应的 SpecStatus 的数据结构。值得一提的是,一般而言,如果不依赖 kubebuilder 实现 Operator,通常需要为定义好的 CRD 生成 Kubernetes Client,Informer 等,以便 Operator 可以利用其进行监听 API 请求等操作。而 kubebuilder 使用的 controller-runtime 利用了 dynamic client,unstructured client 和其他相关的方式完成了这些操作,因此在使用时无需生成 Kubernetes Client,Informer 等包。

修改 Controller 实现

在 Controller 中,主要有两处需要修改,一处为:

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
	// Create a new controller
	c, err := controller.New("frigate-controller", mgr, controller.Options{Reconciler: r})
	if err != nil {
		return err
	}

	// Watch for changes to Frigate
	err = c.Watch(&source.Kind{Type: &shipv1beta1.Frigate{}}, &handler.EnqueueRequestForObject{})
	if err != nil {
		return err
	}

	// TODO(user): Modify this to be the types you create
	// Uncomment watch a Deployment created by Frigate - change this for objects you create
	err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
		IsController: true,
		OwnerType:    &shipv1beta1.Frigate{},
	})
	if err != nil {
		return err
	}

	return nil
}

在这里用户需要指定 Operator 需要监听哪些资源的变动。在此例中,监听了被定义的新资源拥有(Own) 的 Deployment 的变动。当 Owner 为 Frigate 的 Deployment 的 Spec 或者 Status 一旦发生了变动时,Operator 都会接收到来自 Kubernetes API Server 的请求。

另外一处为 ReconcileFrigate 结构的方法 Reconcile

// Reconcile reads that state of the cluster for a Frigate object and makes changes based on the state read
// and what is in the Frigate.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic.  The scaffolding writes
// a Deployment as an example
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ship.example.com,resources=frigates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ship.example.com,resources=frigates/status,verbs=get;update;patch
func (r *ReconcileFrigate) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	// Fetch the Frigate instance
	instance := &shipv1beta1.Frigate{}
	err := r.Get(context.TODO(), request.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	// TODO(user): Change this to be the object type created by your controller
	// Define the desired Deployment object
	deploy := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      instance.Name + "-deployment",
			Namespace: instance.Namespace,
		},
		Spec: appsv1.DeploymentSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{"deployment": instance.Name + "-deployment"},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"deployment": instance.Name + "-deployment"}},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "nginx",
							Image: "nginx",
						},
					},
				},
			},
		},
	}
	if err := controllerutil.SetControllerReference(instance, deploy, r.scheme); err != nil {
		return reconcile.Result{}, err
	}

	// TODO(user): Change this for the object type created by your controller
	// Check if the Deployment already exists
	found := &appsv1.Deployment{}
	err = r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, found)
	if err != nil && errors.IsNotFound(err) {
		log.Info("Creating Deployment", "namespace", deploy.Namespace, "name", deploy.Name)
		err = r.Create(context.TODO(), deploy)
		return reconcile.Result{}, err
	} else if err != nil {
		return reconcile.Result{}, err
	}

	// TODO(user): Change this for the object type created by your controller
	// Update the found object and write the result back if there are any changes
	if !reflect.DeepEqual(deploy.Spec, found.Spec) {
		found.Spec = deploy.Spec
		log.Info("Updating Deployment", "namespace", deploy.Namespace, "name", deploy.Name)
		err = r.Update(context.TODO(), found)
		if err != nil {
			return reconcile.Result{}, err
		}
	}
	return reconcile.Result{}, nil
}

这里是处理所有来自 API Server 的请求的函数入口。在此例中,每次请求都会根据 Custom Resource 的 name 在内存中维护一个期望的 Deployment 的实例,随后将其与 API Server 中已有的实例进行比对,如果其 Spec 字段有不同之处,则通过 API Server 更新这一实例。Deployment Controller 会进行接下来的处理,根据更新后的 Spec 修改在集群上的 Pod 的相关运行情况。

编译与运行 Operator

如果是在本地运行,我们需要利用 make manager 来编译 Operator 为二进制可执行文件。最后的可执行文件在 bin 目录下。 如果是需要运行在 Kubernetes 集群中,我们可以利用 make docker-push && make docker-push && make deploy 来进行部署。

Under the Hood

看完了使用上的过程,接下来我们来看看, kubebuilder 生成的代码到底是怎么去运行的。首先先看 cmd 中的代码:

// Create a new Cmd to provide shared dependencies and start components
log.Info("setting up manager")
mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: metricsAddr})
if err != nil {
    log.Error(err, "unable to set up overall controller manager")
    os.Exit(1)
}

log.Info("Registering Components.")

// Setup Scheme for all resources
log.Info("setting up scheme")
if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
    log.Error(err, "unable add APIs to scheme")
    os.Exit(1)
}

// Setup all Controllers
log.Info("Setting up controller")
if err := controller.AddToManager(mgr); err != nil {
    log.Error(err, "unable to register controllers to the manager")
    os.Exit(1)
}

log.Info("setting up webhooks")
if err := webhook.AddToManager(mgr); err != nil {
    log.Error(err, "unable to register webhooks to the manager")
    os.Exit(1)
}

// Start the Cmd
log.Info("Starting the Cmd.")
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
    log.Error(err, "unable to run the manager")
    os.Exit(1)
}

之前说到 kubebuilder 是开发 Operator 的框架,其实并不十分准确。准确来说, kubebuilder 是开发 Controller Manager 的框架,Controller Manager 会管理一个或者多个 Operator。因此 cmd 中的代码也主要是围绕 Controller Managter 展开的。 manager.New 首先创建了一个 Manager 实例,在这一实例中有 client,cache 等之后需要的对象。

apis.AddToScheme 将 CRD 的结构与 Kubernetes GroupVersionKinds 的映射添加到 Manager 的 Scheme 中。

接下来,就是通过 controller.AddToManager 创建出定义的 Operator,并且添加到 Manager 中。这也就是前文提到的 add 函数做的事情。利用 controller.New 创建出 Operator,然后 Watch 对应的资源,最后返回。下面是 controller.New 的实现:

// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	// Inject dependencies into Reconciler
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// Create controller with dependencies set
	c := &controller.Controller{
		Do:                      options.Reconciler,
		Cache:                   mgr.GetCache(),
		Config:                  mgr.GetConfig(),
		Scheme:                  mgr.GetScheme(),
		Client:                  mgr.GetClient(),
		Recorder:                mgr.GetRecorder(name),
		Queue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		Name:                    name,
	}

	// Add the controller as a Manager components
	return c, mgr.Add(c)
}

其中 options.Reconciler 就是我们定义的实现了 Reconcile 函数的结构的实例。这一结构的 Reconcile 函数的实现也就是前文中提到的 Operator 实现所需的第二处需要修改的地方。 mgr.SetFields(options.Reconciler) 利用依赖注入的方式,将 Manager 的 Client 和 Scheme 注入到 options.Reconciler 中,然后将其赋值给 Controller 中指向 reconcile.Reconciler 接口的字段 Do 中。可以看到除了这一字段,Controller 还有 Queue,Recorder, Client 等其他的字段。因此 kubebuilder 是对 Controller 进行了更高层次的抽象,其有关业务逻辑的实现都通过 reconcile.Reconciler 这一接口进行,而 Queue 等底层的对象,则是由 kubebuilder 来替开发者维护。

最后一步 mgr.Add(c) 将 Operator 加入到 Manager 的一个 Operator 数组中。

type controllerManager struct {
    // ...

	// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
	runnables []Runnable
    // ...
}

// Add sets dependencies on i, and adds it to the list of runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	// Set dependencies on the object
	if err := cm.SetFields(r); err != nil {
		return err
	}

	// Add the runnable to the list
	cm.runnables = append(cm.runnables, r)
	if cm.started {
		// If already started, start the controller
		go func() {
			cm.errChan <- r.Start(cm.internalStop)
		}()
	}

	return nil
}

接下来,我们回过头来看下 Controller 是如何实现 Watch Kubernetes API Server 的资源的:

// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// Inject Cache into arguments
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	log.Info("Starting EventSource", "controller", c.Name, "source", src)
	return src.Start(evthdler, c.Queue, prct...)
}

这里的 SetFields 是 Manager 将其自己的 SetFields 函数注入了进来,所以等同于调用了 Manager 的 SetFields 方法,其定义如下:

func (cm *controllerManager) SetFields(i interface{}) error {
	if _, err := inject.ConfigInto(cm.config, i); err != nil {
		return err
	}
	if _, err := inject.ClientInto(cm.client, i); err != nil {
		return err
	}
	if _, err := inject.SchemeInto(cm.scheme, i); err != nil {
		return err
	}
	if _, err := inject.CacheInto(cm.cache, i); err != nil {
		return err
	}
	if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
		return err
	}
	if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
		return err
	}
	if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
		return err
	}
	return nil
}
// Cache is used by the ControllerManager to inject Cache into Sources, EventHandlers, Predicates, and
// Reconciles
type Cache interface {
	InjectCache(cache cache.Cache) error
}
// CacheInto will set informers on i and return the result if it implements Cache.  Returns
//// false if i does not implement Cache.
func CacheInto(c cache.Cache, i interface{}) (bool, error) {
	if s, ok := i.(Cache); ok {
		return true, s.InjectCache(c)
	}
	return false, nil
}

inject.CacheInto 为例介绍下实现,其检查被注入的对象有没有实现 Cache 接口,即有没有实现 InjectCache(cache cache.Cache) error 方法。如果实现了,则执行注入,否则直接返回。也就是通过这样的方式,Manager 最终把 Cache 注入到了 Source 中,同时如果需要的话,把 Scheme 注入到了 EventHandler 中。这里的 Scheme 在指定 Owner 的 EventHandler 会被用来获取 Owner 的 GroupKind

接下来,后面 src.Start(evthdler, c.Queue, prct...) 也就是顺理成章的实现了:

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {

	// Type should have been specified by the user.
	if ks.Type == nil {
		return fmt.Errorf("must specify Kind.Type")
	}

	// cache should have been injected before Start was called
	if ks.cache == nil {
		return fmt.Errorf("must call CacheInto on Kind before calling Start")
	}

	// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
	i, err := ks.cache.GetInformer(ks.Type)
	if err != nil {
		if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
			log.Error(err, "if kind is a CRD, it should be installed before calling Start",
				"kind", kindMatchErr.GroupKind)
		}
		return err
	}
	i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}

其利用被注入到 Source 中的 Cache 获取到针对 Source 的资源类型的 Informer,然后将 EventHandler 作为处理 Informer 的事件的处理器。这就是 kubebuilder 的高层 API 背后做的事情。

关于作者

高策才云科技 AI 平台组工程师。如有问题,敬请斧正。

License

  • This article is licensed under CC BY-NC-SA 3.0 .
  • Please contact me for commercial use.

以上所述就是小编给大家介绍的《利用 kubebuilder 优化 Kubernetes Operator 开发体验》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

互联网寡头战争

互联网寡头战争

屈运栩 / 浙江大学出版社 / 2017-5-1 / CNY 49.00

本书意在复盘2015年下半年资本寒冬袭来之后,互联网行业发生的小巨头并购等连锁反应,揭示其背后推手——以BAT(百度、阿里巴巴、腾讯)为首的互联网巨头在零售、出行、本地生活、金融等行业的布局竞争,记录和呈现行业新贵的选择与博弈,深度剖析中国互联网生态的演进过程。一起来看看 《互联网寡头战争》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

html转js在线工具
html转js在线工具

html转js在线工具