本篇文章给大家分享的是有关如何通过源码分析Informer机制,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
目前累计服务客户千余家,积累了丰富的产品开发及服务经验。以网站设计水平和技术实力,树立企业形象,为客户提供网站设计制作、网站制作、网站策划、网页设计、网络营销、VI设计、网站改版、漏洞修补等服务。创新互联公司始终以务实、诚信为根本,不断创新和提高建站品质,通过对领先技术的掌握、对创意设计的研究、对客户形象的视觉传递、对应用系统的结合,为客户提供更好的一站式互联网解决方案,携手广大客户,共同发展进步。
在之前的文章kubernetes之面向终态设计与控制器中了解主要了解了控制器通过ListAndWatch方式实现终态架构。而并未分析内部实现的细节,故本章主要分析内部实现的细节。
参考项目为官方的sample-controller
其实,每个Kubernetes扩展组件,都可以看做是一个控制器,用于处理自己关心的资源类型(API对象)。而要把自己关系的资源和自己的控制器关联起来,则是通过Informer机制,可称为“通知器”,而Informer与API对象都是一一对应的,因此可实现我们创建一个自定义资源,我们的控制器就可以通过该Informer机制


通过上图,可知一个Kubernetes的扩展组件主要分为两大部分:client-go组件和自定义的Controller组件。
client-go:
Reflector:主要对接Kubernetes的APIServer,依托ListWatch实现数据从ETCD中同步到本地缓存(Delta Fifo queue)中。
Informer:用于把本地缓存的数据构建索引及调用事先注册好的ResourceEventHandler。
Indexer:用于构建索引,底层采用一个线程安全的Map存储。每个资源默认的Key为
Custom Controller
Workqueue是一个去重队列,内部除了items列表外还带有processing和dirty set记录.
同一个资源对象的多次事件触发,入队列后会去重;
同一个资源对象不会被多个worker同时处理。详细可见Learning Concurrent Reconciling controller对资源对象的查询都应该从Informer中查cache,而不是直接调用kube-apiserver查询。
Informer reference : 编写自定义Controller时,需要创建一个关注自已资源的Informer对象。
ResourceEventHandler: 用于注册相关的事件,待有数据时,Informer会进行相关的回调。
ProcessItem: 通过Workqueue去数据,并通过下发给Handler进行处理。
Workqueue:工作队列工具类,每个controller都需要有一个工作队列。从event handler触发的事件会先放入工作队列,然后由controller的ProcessItem取出来处理。
Client-go
HandleDeltas中会进行调用Indexer进行索引构建,并最终存储在本地的一个线程安全的Map中
之后,会进行该事件的分发,通知所有的listener进行调用用户注册好的ResourceEventHandler进行处理。
Reflector首先通过List进行全量数据同步,由ETCD到本地的Delta Fifo queue中。Reflector是最终和Kubernetes APIServer建立连接的。
Reflector其次再通过最新的ResourceVersion进行Watch数据,此时若有未同步到的数据,将进行补齐(因List完成之后,可能存在新数据的增加,因此可能存在遗漏)。
启动自定义控制器时,通过Informer调用Reflector执行List&Watch进行数据同步及注册观察事件。
当用户创建了一个自定义资源时,会被Reflector的Watch观察到,并放入本地的Delta缓存中。
Informer通过chche中的Controller定时(1s)调用processLoop方法,并Pop出队列(Delta)中的数据,交给Informer的HandleDeltas处理;
Custom Controller
自定的ResourceEventHandler中会进行相关过滤处理,并最终加入到Workqueue中,该工作队列只存储KEY,不会存储具体对象
一旦加入后,ProcessItem方案就会Pop出数据,并交给Handle Object方法进行处理
Handle Object会根据Key调用Indexer reference获取到具体对象,并开始处理业务。注意:这里一定要通过缓存去数据,不要去直接调用Kubernetes的API,否则会影响性能。
main方法中初始化相关结构及启动Informer
	//https://github.com/kubernetes/sample-controller/blob/master/main.go#L62
	
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)
	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}以上的代码涵盖两部分:
Client-go部分,构建SharedInformerFactory并启动,实现ListAndWatch,第一部分解析
自定义Controller部分,内部主要业务为等待事件,并做响应,第二部分解析
用户通过构建Informer并启动后,就会进入到client-go内部的Informer内,主要逻辑如下
调用Start方法初始化请求的Informer
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}调用Run正式开始启动相关业务
//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
	cfg := &Config{
		//注册DeltaFIFO队列
		Queue:            fifo,
		//注册listerWatcher,后续会和APIServer建立连接
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		//检查是否需要进行Resync,该方法会把需要Resync的listener加入到需要同步的队列中
		ShouldResync:     s.processor.shouldResync,
		//这里先注册用于构建索引和分发事件的方法
		Process: s.HandleDeltas,
	}
	......
	//启动用于缓存比较的方法
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	//启动用于接收事件消息并回调用户注册的ResourceEventHandler
	wg.StartWithChannel(processorStopCh, s.processor.run)
	......
	//运行内部的Controller
	s.controller.Run(stopCh)
}启动内置的Controller
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	......
   //调用reflector中的Run,进行启动ListAndWatch,同APIServer建立连接
   //client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:219
   wg.StartWithChannel(stopCh, r.Run)
	//启动定时器,每秒运行一次,用于调用processLoop进行读取数据
	wait.Until(c.processLoop, time.Second, stopCh)
}ListAndWatch,实现同APIServer建立连接
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:121
	func (r *Reflector) Run(stopCh <-chan struct{}) {
		wait.Until(func() {
			if err := r.ListAndWatch(stopCh); err != nil {
				utilruntime.HandleError(err)
			}
		}, r.period, stopCh)
	}
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:159
	// ListAndWatch first lists all items and get the resource version at the moment of call,
	// and then use the resource version to watch.
	// It returns error if ListAndWatch didn't even try to initialize watch.
	func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
		......
		// Explicitly set "0" as resource version - it's fine for the List()
		// to be served from cache and potentially be delayed relative to
		// etcd contents. Reflector framework will catch up via Watch() eventually.
		options := metav1.ListOptions{ResourceVersion: "0"}
		if err := func() error {
			......
			go func() {
				.....
				// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
				// list request will return the full response.
				pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
					//这里是真正同Kubernetes连接并获取数据的地方
					return r.listerWatcher.List(opts)
				}))
				......
				//这里会执行list,获取数据
				list, err = pager.List(context.Background(), options)
				close(listCh)
			}()
			......
			//解析数据类型
			listMetaInterface, err := meta.ListAccessor(list)
			......
			resourceVersion = listMetaInterface.GetResourceVersion()
			//抽取list中的数据
			items, err := meta.ExtractList(list)
			......
			//通过list到的数据进行数据全量本地队列(Delta FIFO Queue)数据替换
			if err := r.syncWith(items, resourceVersion); err != nil {
				return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
			}
			initTrace.Step("SyncWith done")
			r.setLastSyncResourceVersion(resourceVersion)
			initTrace.Step("Resource version updated")
			return nil
		}(); err != nil {
			return err
		}
		......
		for {
			......
			//获取Watch对象
			w, err := r.listerWatcher.Watch(options)
			......
			//开始处理Watch到的数据
			if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
				......
			}
		}
	}
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:319
	// watchHandler watches w and keeps *resourceVersion up to date.
	func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
	......
	loop:
		for {
			select {
			case <-stopCh:
				return errorStopRequested
			case err := <-errc:
				return err
			case event, ok := <-w.ResultChan(): //这里当Watch到数据后,会触发该CHANEL
				......
				// 通过得到的事件对象,访问到具体的数据
				meta, err := meta.Accessor(event.Object)
				......
				newResourceVersion := meta.GetResourceVersion()
				//根据获取到的事件类型,触发相应动作
				switch event.Type {
				case watch.Added:
					err := r.store.Add(event.Object)
					......
				case watch.Modified:
					err := r.store.Update(event.Object)
					......
				case watch.Deleted:
					// TODO: Will any consumers need access to the "last known
					// state", which is passed in event.Object? If so, may need
					// to change this.
					err := r.store.Delete(event.Object)
					......
				case watch.Bookmark:
					// A `Bookmark` means watch has synced here, just update the resourceVersion
				default:
					utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
				}
				*resourceVersion = newResourceVersion
				//设置最新需要Watch的版本
				r.setLastSyncResourceVersion(newResourceVersion)
				eventCount++
			}
		}
		......
	}通过以上步骤,实现了Kubernetes存储在ETCD中的数据到Controller本地缓存中的过程。接下来就需要对存储在Delta FIFO Queue中的数据进行处理的过程。
处理Watch到的数据
执行List和Watch逻辑
定时调用ListAndWatch
处理Delta中的数据,建立索引及分发事件
	func (c *controller) processLoop() {
		for {
			//读取Delta中的数据并调用之前设置好的方法HandleDelta,进行业务处理
			//vendor/k8s.io/client-go/tools/cache/controller.go:150
			obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
			......
		}
	}
	//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:344
	func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
		s.blockDeltas.Lock()
		defer s.blockDeltas.Unlock()
		// from oldest to newest
		for _, d := range obj.(Deltas) {
			switch d.Type {
			//根据事件的类型进行相关的事件分类下发
			case Sync, Added, Updated:
				isSync := d.Type == Sync
				s.cacheMutationDetector.AddObject(d.Object)
				//到索引中先查询是否有数据
				if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
					//若存在数据,则更新索引数据
					if err := s.indexer.Update(d.Object); err != nil {
						return err
					}
					//给listener分发更新事件
					s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
				} else {
					//若没有数据,则直接添加新数据到索引中去
					if err := s.indexer.Add(d.Object); err != nil {
						return err
					}
					//给listener分发添加事件
					s.processor.distribute(addNotification{newObj: d.Object}, isSync)
				}
			case Deleted:
				//若是删除类型,则先删除索引
				if err := s.indexer.Delete(d.Object); err != nil {
					return err
				}
				//给listener分发删除事件
				s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
			}
		}
		return nil
	}以上需要注意一点,关于最后的删除事件:若是一个删除事件,在之前已经删除了索引中的数据了,因此无法再在自定义的Controller中,获取到该数据的内容了。因此虽然得到了删除事件通知,但是却无法通过该Key,查询到事件内容。因此当我们需要在删除时,需要处理该数据的话,应该添加finalizer阻止提前删除,待处理完毕后,在删除该标记即可。
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:453
	func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
		......
	   //这里通过分发到相应的listener中
		if sync {
			for _, listener := range p.syncingListeners {
				listener.add(obj)
			}
		} else {
			for _, listener := range p.listeners {
				listener.add(obj)
			}
		}
	}
	//触发add的CHANEL,实现对接到用户定义的ResourceEventHandler中
	func (p *processorListener) add(notification interface{}) {
		p.addCh <- notification
	}这里需要注意,若要触发用户定义的ResourceEventHandler,则需要先让用户注册才行。故以下代码是用户注册ResourceEventHandler的部分。
	//https://github.com/kubernetes/sample-controller/blob/master/controller.go#L116
	//这里调用Informer的AddEventHandler方法进行注册ResourceEventHandler,并添加入队方式
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFoo,
		UpdateFunc: func(old, new interface{}) {
			//这里向工作队列中加入数据,同时这里可以做一些过滤操作
			controller.enqueueFoo(new)
		},
	})
	//加入到工作队列中
	func (c *Controller) enqueueFoo(obj interface{}) {
		......
		//把收到的Key加入到工作队列中
		c.workqueue.Add(key)
	}
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:326
	func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
		s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
	}
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:347
	func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
		......
		//构建ProcessListener对象
		listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
		if !s.started {
			//注册listener
			s.processor.addListener(listener)
			return
		}
		......
		//若已经处于启动状态下,则还需要添加事件消息给该listener,用于及时处理消息
		s.processor.addListener(listener)
		for _, item := range s.indexer.List() {
			listener.add(addNotification{newObj: item})
		}
	}
	//k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:437
	func (p *sharedProcessor) addListener(listener *processorListener) {
		......
		if p.listenersStarted {
			//运行listener开始处理收到的数据,比如回调用户定义的EventHandler
			// 定时调用用户的Handler进行处理
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
	}添加用户的事件,并开始处理收到的数据
自定义控制器中注册ResourceEventHandler
事件分发
HandleDelta对读取到的事件进行处理
通过processLoop读取Delta中的数据
以上client-go处理完毕后,会把数据通过用户注册的ResourceEventHandler调用相应的方法。通过自定义的ResourceEventHandler进行预处理,并加入到工作队列(这里不建议处理复杂逻辑,因为一旦该方法阻塞,会导致相应的链路阻塞,而应该把需要处理的事件放入到工作列表中,通过用户侧的协程进行处理)。
自定义Controller定义定时器执行业务
//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L150
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	
	......
	// Wait for the caches to be synced before starting workers
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
	klog.Info("Starting workers")
	// Launch two workers to process Foo resources
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
}
	
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
	//死循环,执行业务逻辑
	for c.processNextWorkItem() {
	}
}	从队列中取出数据,并进行调用syncHandler方法进行处理,处理完毕后从工作队列中删除
//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L186
func (c *Controller) processNextWorkItem() bool {
	//读取工作队列中的数据,在之前通过用户定义的ResourceEventHandler已经加入到了工作队列中,这里区出做处理
	obj, shutdown := c.workqueue.Get()
	if shutdown {
		return false
	}
	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		// We call Done here so the workqueue knows we have finished
		// processing this item. We also must remember to call Forget if we
		// do not want this work item being re-queued. For example, we do
		// not call Forget if a transient error occurs, instead the item is
		// put back on the workqueue and attempted again after a back-off
		// period.
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		// We expect strings to come off the workqueue. These are of the
		// form namespace/name. We do this as the delayed nature of the
		// workqueue means the items in the informer cache may actually be
		// more up to date that when the item was initially put onto the
		// workqueue.
		if key, ok = obj.(string); !ok {
			// As the item in the workqueue is actually invalid, we call
			// Forget here else we'd go into a loop of attempting to
			// process a work item that is invalid.
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		// Foo resource to be synced.
		if err := c.syncHandler(key); err != nil {
			// Put the item back on the workqueue to handle any transient errors.
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)
	if err != nil {
		utilruntime.HandleError(err)
		return true
	}
	return true
}// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	.....
}至此,已经全部分析完毕。
对Kubernetes做扩展开发,一般都是采用自定义的Controller方式。借助官方提供的client-go组件,已经实现了Informer机制。而我们只需要注册ResourceEventHandler事件,并实现自定义的Controller即可完成扩展。
步骤回顾:
main方法中构建Informer对象并启动,同时启动自己的Controller,主要逻辑为轮询去工作队列中取数据,并做处理,若无数据,则会阻塞在取数据的地方。
Informer构建,主要步骤如下
调用reflector进行ListAndWatch,主要是首次获取全量的数据(List)及监听所有需要关注资源的最新版本(Watch)存储到Delta FIFO Queue中。
调用内置controller从Delta中取出数据并构建数据索引及分发消息给用户注册的ResourceEventHandler中;
自定义ResourceEventHandler中根据事件类型进行处理(如过滤)后,再加入到自定义Controller的工作队列中;
当加入到工作队列中后,自定义Controller中的轮询取数据的地方就会继续,取出数据,处理,成功后删除该数据。
以上就是如何通过源码分析Informer机制,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。