KubernetesControllerManager工作原理

NO IMAGE

《Kubernetes Controller Manager 工作原理》 最早發佈在:blog.ihypo.net/15763910382…

本文基於對 Kubernetes v1.16 的源碼閱讀,文章有一定的源碼,但我會通過配圖儘量描述清晰

在 Kubernetes Master 節點中,有三個重要組件:ApiServer、ControllerManager、Scheduler,它們一起承擔了整個集群的管理工作。本文嘗試梳理清楚 ControllerManager 的工作流程和原理。

KubernetesControllerManager工作原理

什麼是 Controller Manager

根據官方文檔的說法:kube-controller-manager 運行控制器,它們是處理集群中常規任務的後臺線程。

說白了,Controller Manager 就是集群內部的管理控制中心,由負責不同資源的多個 Controller 構成,共同負責集群內的 Node、Pod 等所有資源的管理,比如當通過 Deployment 創建的某個 Pod 發生異常退出時,RS Controller 便會接受並處理該退出事件,並創建新的 Pod 來維持預期副本數。

幾乎每種特定資源都有特定的 Controller 維護管理以保持預期狀態,而 Controller Manager 的職責便是把所有的 Controller 聚合起來:

  1. 提供基礎設施降低 Controller 的實現複雜度
  2. 啟動和維持 Controller 的正常運行

可以這麼說,Controller 保證集群內的資源保持預期狀態,而 Controller Manager 保證了 Controller 保持在預期狀態。

Controller 工作流程

在講解 Controller Manager 怎麼為 Controller 提供基礎設施和運行環境之前,我們先了解一下 Controller 的工作流程是什麼樣子的。

從比較高維度的視角看,Controller Manager 主要提供了一個分發事件的能力,而不同的 Controller 只需要註冊對應的 Handler 來等待接收和處理事件。

KubernetesControllerManager工作原理

以 Deployment Controller 舉例,在 pkg/controller/deployment/deployment_controller.goNewDeploymentController 方法中,便包括了 Event Handler 的註冊,對於 Deployment Controller 來說,只需要根據不同的事件實現不同的處理邏輯,便可以實現對相應資源的管理。

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})

可以看到,在 Controller Manager 的幫助下,Controller 的邏輯可以做的非常純粹,只需要實現相應的 EventHandler 即可,那麼 Controller Manager 都做了哪些具體的工作呢?

Controller Manager 架構

輔助 Controller Manager 完成事件分發的是 client-go,而其中比較關鍵的模塊便是 informer。

kubernetes 在 github 上提供了一張 client-go 的架構圖,從中可以看出,Controller 正是下半部分(CustomController)描述的內容,而 Controller Manager 主要完成的是上半部分。

KubernetesControllerManager工作原理

Informer 工廠

從上圖可以看到 Informer 是一個非常關鍵的 “橋樑” 作用,因此對 Informer 的管理便是 Controller Manager 要做的第一件事。

在 Controller Manager 啟動時,便會創建一個名為 SharedInformerFactory 的單例工廠,因為每個 Informer 都會與 Api Server 維持一個 watch 長連接,所以這個單例工廠通過為所有 Controller 提供了唯一獲取 Informer 的入口,來保證每種類型的 Informer 只被實例化一次。

該單例工廠的初始化邏輯:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client:           client,
namespace:        v1.NamespaceAll,
defaultResync:    defaultResync,
informers:        make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync:     make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}

從上面的初始化邏輯中可以看到,sharedInformerFactory 中最重要的是名為 informers 的 map,其中 key 為資源類型,而 value 便是關注該資源類型的 Informer。每種類型的 Informer 只會被實例化一次,並存儲在 map 中,不同 Controller 需要相同資源的 Informer 時只會拿到同一個 Informer 實例。

對於 Controller Manager 來說,維護所有的 Informer 使其正常工作,是保證所有 Controller 正常工作的基礎條件。sharedInformerFactory 通過該 map 維護了所有的 informer 實例,因此,sharedInformerFactory 也承擔了提供統一啟動入口的職責:

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

當 Controller Manager 啟動時,最重要的就是通過該工廠的 Start 方法,將所有的 Informer 運行起來。

Informer 的創建

下面看下這些 Informer 是怎麼被創建的。Controller Manager 在 cmd/kube-controller-manager/app/controllermanager.goNewControllerInitializers 函數中初識化了所有的 Controller,因為代碼冗長,這裡僅拿 Deployment Controller 舉例子。

初始化 Deployment Controller 的邏輯在 cmd/kube-controller-manager/app/apps.gostartDeploymentController 的函數中:

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}

最關鍵的邏輯在 deployment.NewDeploymentController 上,該函數真正創建了 Deployment Controller,而該創建函數的前三個參數分別為 Deployment、ReplicaSet、Pod 的 Informer。可以看到,Informer 的單例工廠以 ApiGroup 為路徑提供了不同資源的 Informer 創建入口。

不過要注意的是,.Apps().V1().Deployments() 雖然返回的是 deploymentInformer 類型的實例,但是,deploymentInformer 其實並不是一個真正的 Informer(儘管他以 Informer 命名),它只是一個模板類,主要功能是提供關注 Deployment 這一特定資源 Informer 的創建模板:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

真正創建 Informer 的邏輯是在 deploymentInformer.Informer() 中(client-go/informers/apps/v1/deployment.go),f.defaultInformer 是默認的 Deployment Informer 創建模板方法,通過將資源實例和該模板方法傳入 Informer 工廠的 InformerFor 方法,來創建僅關注 Deployment 資源的 Informer:

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

簡單梳理一下:

  1. 可以通過 Informer 工廠獲得特定類型的 Informer 模板類(即這裡的 deploymentInformer
  2. 真正創建該特定資源 Informer 的是 Informer 模板類的 Informer() 方法
  3. Informer() 方法只不過是通過 Informer 工廠的 InformerFor 來創建真正的 Informer

這裡用到了模板方法(設計模式),雖然有一點繞口,但可以參考下圖梳理一下,理解關鍵在於 Informer 的 差異化的創建邏輯下放給了模板類

KubernetesControllerManager工作原理

最後,名為 sharedIndexInformer 的結構體將被實例化,並真正的承擔 Informer 的職責。被註冊到 Informer 工廠 map 中的也是該實例。

Informer 的運行

因為真正的 Informer 實例是一個 sharedIndexInformer 類型的對象,當 Informer 工廠啟動時(執行 Start 方法),被真正運行起來的是 sharedIndexInformer

sharedIndexInformer 是 client-go 裡的組件,它的 Run 方法雖然短短几十行,但是卻承擔了很多工作。到這裡,才到了 Controller Manager 最有趣的部分。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue:            fifo,
ListerWatcher:    s.listerWatcher,
ObjectType:       s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError:     false,
ShouldResync:     s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait()              // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}

sharedIndexInformer 的啟動邏輯主要做了下面幾件事:

  1. 創建了名為 fifo 的隊列
  2. 創建並運行了一個名為 controller 的實例
  3. 啟動了 cacheMutationDetector
  4. 啟動了 processor

這幾個名詞(或者說組件)前文並沒有提到過,而這四件事情是 Controller Manager 工作的核心內容,因此下面我會分別介紹。

sharedIndexInformer

sharedIndexInformer 是一個共享的 Informer 框架,不同的 Controller 只需要提供一個模板類(比如上文提到的 deploymentInformer ),便可以創建一個符合自己需求的特定 Informer。

sharedIndexInformer 包含了一堆工具來完成 Informer 的任務,其主要代碼在 client-go/tools/cache/shared_informer.go 中。其創建邏輯也在其中:

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor:                       &sharedProcessor{clock: realClock},
indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher:                   lw,
objectType:                      objType,
resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock:                           realClock,
}
return sharedIndexInformer
}

在創建邏輯中,有幾個東西需要留意:

  1. processor:提供了 EventHandler 註冊和事件分發的功能
  2. indexer:提供了資源緩存的功能
  3. listerWatcher:由模板類提供,包含特定資源的 List 和 Watch 方法
  4. objectType:用來標記關注哪種特定資源類型
  5. cacheMutationDetector:監控 Informer 的緩存

除此之外,還包含了上文啟動邏輯中提到了 DeltaFIFO 隊列和 controller,下面就分別介紹。

sharedProcessor

processor 是 sharedIndexInformer 中一個非常有趣的組件,Controller Manager 通過一個 Informer 單例工廠來保證不同的 Controller 共享了同一個 Informer,但是不同的 Controller 對該共享的 Informer 註冊的 Handler 不同,那麼 Informer 應該怎麼管理被註冊的 Handler 呢?

processor 便是用來管理被註冊的 Handler 以及將事件分發給不同 Handler 的組件。

type sharedProcessor struct {
listenersStarted bool
listenersLock    sync.RWMutex
listeners        []*processorListener
syncingListeners []*processorListener
clock            clock.Clock
wg               wait.Group
}

sharedProcessor 的工作核心是圍繞著 listeners 這個 Listener 切片展開的。

當我們註冊一個 Handler 到 Informer 時,最終會被轉換為一個名為 processorListener 結構體的實例:

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh:                make(chan interface{}),
addCh:                 make(chan interface{}),
handler:               handler,
pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod:          resyncPeriod,
}
ret.determineNextResync(now)
return ret
}

該實例主要包含兩個 channel 和外面註冊的 Handler 方法。而此處被實例化的 processorListener 對象最終會被添加到 sharedProcessor.listeners 列表中:

func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}

如圖所示,Controller 中的 Handler 方法最終會被添加到 Listener 中,而 Listener 將會被 append 到 sharedProcessor 的 Listeners 切片中。

KubernetesControllerManager工作原理

前文提到,sharedIndexInformer 啟動時會將 sharedProcessor 運行起來,而 sharedProcessor 的啟動邏輯便是和這些 listener 有關:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看到,sharedProcessor 啟動時會依次執行 listenerrunpop 方法,我們現在看下這兩個方法。

listener 的啟動

因為 listener 包含了 Controller 註冊進來的 Handler 方法,因此 listener 最重要的職能就是當事件發生時來觸發這些方法,而 listener.run 就是不停的從 nextCh 這個 channel 中拿到事件並執行對應的 handler:

func (p *processorListener) run() {
// this call blocks until the channel is closed.  When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted.  This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}

可以看到,listener.run 不停的從 nextCh 這個 channel 中拿到事件,但是 nextCh 這個 channel 裡的事件又是從哪來的呢?listener.pop 的職責便是將事件放入 nextCh 中。

listener.pop 是一段非常精巧和有趣的邏輯:

func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

listener 之所以包含了兩個 channel:addChnextCh,是因為 Informer 無法預知 listener.handler 的事件消費的速度是否大於事件生產的速度,因此添加了一個名為 pendingNotifications 的緩衝隊列來保存未來得及消費的事件。

KubernetesControllerManager工作原理

pop 方法一方面會不停的從 addCh 中獲得最新事件,以保證不會讓生產方阻塞。然後判斷是否存在 buffer,如果存在則把事件添加到 buffer 中,如果不存在則嘗試推給 nextCh

而另一方面,會判斷 buffer 中是否還有事件,如果還有存量,則不停的傳遞給 nextCh

pop 方法實現了一個帶 buffer 的分發機制,使得事件可以源源不斷的從 addChnextCh。但是問題來了,那 addCh 的事件從哪來呢。

其實來源非常簡單,listener 有一個 add 方法,入參是一個事件,該方法會將新事件推入 addCh 中。而調用該 add 方法的是管理所有 listenersharedProcessor

上面提到過,sharedProcessor 的職責便是管理所有的 Handler 以及分發事件,而真正做分發工作的是 distribute 方法:

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}

到目前為止,我們有一部分比較清晰了:

  1. Controller 將 Handler 註冊給 Informer
  2. Informer 通過 sharedProcessor 維護了所有的 Handler(listener)
  3. Informer 收到事件時,通過 sharedProcessor.distribute 將事件分發下去
  4. Controller 被觸發對應的 Handler 來處理自己的邏輯

那麼剩下的問題就是 Informer 的事件從哪來呢?

DeltaFIFO

在分析 Informer 獲取事件之前,需要提前講一個非常有趣的小工具,就是在 sharedIndexInformer.Run 的時候創建的 fifo 隊列:

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO 是一個非常有趣的隊列,相關代碼定義在 client-go/tools/cache/delta_fifo.go 中。對於一個隊列來說,最重要的肯定是 Add 方法和 Pop 方法,DeltaFIFO 提供了多個 Add 方法,雖然根據不同的事件類型(add/update/delete/sync)區分不同的方法,但是最終都會執行 queueActionLocked

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}

queueActionLocked 方法的第一個參數 actionType 便是事件類型:

const (
Added   DeltaType = "Added"   // watch api 獲得的創建事件
Updated DeltaType = "Updated" // watch api 獲得的更新事件
Deleted DeltaType = "Deleted" // watch api 獲得的刪除事件
Sync DeltaType = "Sync"       // 觸發了 List Api,需要刷新緩存
)

從事件類型以及入隊列方法可以看出,這是一個帶有業務功能的隊列,並不是單純的“先入先出”,入隊列方法中有兩個非常精巧的設計:

  1. 入隊列的事件會先判斷該資源是否存在未被消費的事件,然後適當處理
  2. 如果 list 方法時發現該資源已經被刪除了,則不再處理

第二點比較好理解,如果觸發了 List 請求,而且發現要被處理的資源已經被刪除了,則就不需要再入隊列處理。而第一點需要結合出隊列方法一起來看:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

DeltaFIFO 的 Pop 方法有一個入參,即是處理函數,出隊列時,DeltaFIFO 會先根據資源 id 獲得該資源 所有的事件,然後交給處理函數。

工作流程如圖所示:

KubernetesControllerManager工作原理

總體來看,DeltaFIFO 的入隊列方法,會先判斷該資源是否已經在 items 中, 如果已經存在,說明該資源還沒有被消費(還在 queue 中排隊),則直接將事件 append 到 items[resource_id] 中即可。如果發現不在 items 中,便會創建 items[resource_id],並將資源 id append 到 queue 中。

而 DeltaFIFO 出隊列方法,會從 queue 中拿到隊列最前面的資源 id,然後從 items 中拿走該資源所有的事件,最後調用 Pop 方法傳入的 PopProcessFunc 類型的處理函數。

因此,DeltaFIFO 的特點在於,入隊列的是(資源的)事件,而出隊列時是拿到的是最早入隊列的資源的所有事件。這樣的設計保證了不會因為有某個資源瘋狂的製造事件,導致其他資源沒有機會被處理而產生飢餓。

controller

DeltaFIFO 是一個非常重要的組件,真正讓他發揮價值的,便是 Informer 的 controller

雖然 K8s 源碼中的確用的是 controller 這個詞,但是此 controller 並不是 Deployment Controller 這種資源控制器。而是一個承上啟下的事件控制器(從 API Server 拿到事件,下發給 Informer 進行處理)。

controller 的職責就兩個:

  1. 通過 List-Watch 從 Api Server 獲得事件、並將該事件推入 DeltaFIFO 中
  2. sharedIndexInformerHandleDeltas 方法作為參數,來調用 DeltaFIFO 的 Pop 方法

controller 的定義非常簡單,它的核心就是 Reflector

type controller struct {
config         Config
reflector      *Reflector
reflectorMutex sync.RWMutex
clock          clock.Clock
}

Reflector 的代碼比較繁瑣但是功能比較簡單,就是通過 sharedIndexInformer 裡定義的 listerWatcher 進行 List-Watch,並將獲得的事件推入 DeltaFIFO 中。

controller 啟動之後會先將 Reflector 啟動,然後在執行 processLoop,通過一個死循環,不停的將從 DeltaFIFO 讀出需要處理的資源事件,並交給 sharedIndexInformerHandleDeltas 方法(創建 controller 時賦值給了 config.Process)。

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

如果我們再查看下 sharedIndexInformer 的 HandleDeltas 方法,就會發現整個事件消費流程被打通了:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

前面我們知道了 processor.distribute 方法可以將事件分發給所有 listener,而 controller 會使用 Reflector 從 ApiServer 拿到事件,併入隊列,然後通過 processLoop 從隊列中拿出要處理的資源的所有事件,最後通過 sharedIndexInformerHandleDeltas 方法,調用了 processor.distribute

因此,我們可以將整個事件流向整理為下圖:

KubernetesControllerManager工作原理

Indexer

以上,我們將事件從接收到分發,中間所有的邏輯已經梳理了一遍,但是在 sharedIndexInformer 的 HandleDeltas 方法中,還有一些邏輯比較令人注意,就是所有的事件都會先對 s.indexer 進行更新,然後在分發。

前面提到 Indexer 是一個線程安全的存儲,作為緩存使用,為了減輕資源控制器(Controller)查詢資源時對 ApiServer 的壓力。

當有任何事件更新時,會先刷新 Indexer 裡的緩存,然後再將事件分發給資源控制器,資源控制器在需要獲得資源詳情的時候,優先從 Indexer 獲得,就可以減少對 APIServer 不必要的查詢請求。

Indexer 存儲的具體實現在 client-go/tools/cache/thread_safe_store.go 中,數據存儲在 threadSafeMap 中:

type threadSafeMap struct {
lock  sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

從本質上講,threadSafeMap 就是加了一個讀寫鎖的 map。除此之外,還可以定義索引,索引的實現非常有趣,通過兩個字段完成:

  1. Indexers 是一個 map,定義了若干求索引函數,key 為 indexName,value 為求索引的函數(計算資源的索引值)。
  2. Indices 則保存了索引值和數據 key 的映射關係,Indices 是一個兩層的 map,第一層的 key 為 indexName,和 Indexers 對應,確定使用什麼方法計算索引值,value 是一個 map,保存了 “索引值-資源key” 的關聯關係。

相關邏輯比較簡單,可以參考下圖:

KubernetesControllerManager工作原理

MutationDetector

sharedIndexInformerHandleDeltas 方法中,除了向 s.indexer 更新的數據之外,還向 s.cacheMutationDetector 更新了數據。

在一開始講到 sharedIndexInformer 啟動時還會啟動一個 cacheMutationDetector,來監控 indexer 的緩存。

因為 indexer 緩存的其實是一個指針,多個 Controller 訪問 indexer 緩存的資源,其實獲得的是同一個資源實例。如果有一個 Controller 並不本分,修改了資源的屬性,勢必會影響到其他 Controller 的正確性。

MutationDetector 的作用正是定期檢查有沒有緩存被修改,當 Informer 接收到新事件時,MutationDetector 會保存該資源的指針(和 indexer 一樣),以及該資源的深拷貝。通過定期檢查指針指向的資源和開始存儲的深拷貝是否一致,便知道被緩存的資源是否被修改。

不過,具體是否啟用監控是受到環境變量 KUBE_CACHE_MUTATION_DETECTOR 影響的,如果不設置該環境變量,sharedIndexInformer 實例化的是 dummyMutationDetector,在啟動後什麼事情也不做。

如果 KUBE_CACHE_MUTATION_DETECTOR 為 true,則 sharedIndexInformer 實例化的是 defaultCacheMutationDetector,該實例會以 1s 為間隔,定期執行檢查緩存,如果發現緩存被修改,則會觸發一個失敗處理函數,如果該函數沒被定義,則會觸發一個 panic。

總結

本文講解的應該算是狹義的 Controller Manager,畢竟沒有包含具體的資源管理器(Controller),而只是講解 Controller Manager 是怎麼 “Manage Controller” 的。

可以看到 Controller Manager 做了很多工作來保證 Controller 可以只專注於處理自己關心的事件,而這些工作的核心就是 Informer,當理解了 Informer 是如何與其他組件協同工作,那麼 Controller Manager 為資源管理器鋪墊了什麼也就瞭然了。

KubernetesControllerManager工作原理

相關文章

[解鎖新姿勢]回想起被`ifelse`支配的恐懼,我們要打倒ifelse

全國到底有多少人在看直播?我用Node寫了個爬蟲統計了一下

左邊敲打IDE!右邊出現了一個世界!!!

聊聊短地址及其原理