分析kubernetes中的事件机制-源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 我们通过 kubectl describe [资源] 命令,可以在看到Event输出,并且经常依赖event进行问题定位,从event中可以分析整个POD的运行轨迹,为服务的客观测性提供数据来源,由此可见,event在Kubernetes中起着举足轻重的作用。

我们通过 kubectl describe [资源] 命令,可以在看到Event输出,并且经常依赖event进行问题定位,从event中可以分析整个POD的运行轨迹,为服务的客观测性提供数据来源,由此可见,event在Kubernetes中起着举足轻重的作用。

event展示

event并不只是kubelet中都有的,关于event的操作被封装在client-go/tools/record包,我们完全可以在写入自定义的event。

现在让我们来一步步揭开event的面纱。

Event定义

其实event也是一个资源对象,并且通过apiserver将event存储在etcd中,所以我们也可以通过 kubectl get event 命令查看对应的event对象。

以下是一个event的yaml文件:

apiVersion: v1
count: 1
eventTime: null
firstTimestamp: "2020-03-02T13:08:22Z"
involvedObject:
  apiVersion: v1
  kind: Pod
  name: example-foo-d75d8587c-xsf64
  namespace: default
  resourceVersion: "429837"
  uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
kind: Event
lastTimestamp: "2020-03-02T13:08:22Z"
message: Pod sandbox changed, it will be killed and re-created.
metadata:
  creationTimestamp: "2020-03-02T13:08:30Z"
  name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
  namespace: default
  resourceVersion: "479466"
  selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
  uid: 9fe6f72a-341d-4c49-960b-e185982d331a
reason: SandboxChanged
reportingComponent: ""
reportingInstance: ""
source:
  component: kubelet
  host: minikube
type: Normal


主要字段说明:**

  • involvedObject: 触发event的资源类型
  • lastTimestamp:最后一次触发的时间
  • message:事件说明
  • metadata :event的元信息,name,namespace等
  • reason:event的原因
  • source:上报事件的来源,比如kubelet中的某个节点
  • type:事件类型,Normal或Warning

event字段定义可以看这里:types.go#L5078

接下来我们来看看,整个event是如何下入的。

写入事件

1、这里以kubelet为例,看看是如何进行事件写入的

2、文中代码以Kubernetes 1.17.3为例进行分析

先以一幅图来看下整个的处理流程
event处理过程

创建操作事件的客户端:
kubelet/app/server.go#L461

// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
    if kubeDeps.Recorder != nil {
        return
    }
    //事件广播
    eventBroadcaster := record.NewBroadcaster()
    //创建EventRecorder
    kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
    //发送event至log输出
    eventBroadcaster.StartLogging(klog.V(3).Infof)
    if kubeDeps.EventClient != nil {
        klog.V(4).Infof("Sending events to api server.")
        //发送event至apiserver
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
    } else {
        klog.Warning("No api server defined - no events will be sent to API server.")
    }
}

通过 makeEventRecorder 创建了 EventRecorder 实例,这是一个事件广播器,通过它提供了StartLogging和StartRecordingToSink两个事件处理函数,分别将event发送给log和apiserver。
NewRecorder创建了 EventRecorder 的实例,它提供了 EventEventf 等方法供事件记录。

EventBroadcaster

我们来看下EventBroadcaster接口定义:event.go#L113

// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
    //
    StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
    StartRecordingToSink(sink EventSink) watch.Interface
    StartLogging(logf func(format string, args ...interface{})) watch.Interface
    NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder

    Shutdown()
}

具体实现是通过 eventBroadcasterImpl struct来实现了各个方法。

其中StartLogging 和 StartRecordingToSink 其实就是完成了对事件的消费,EventRecorder实现对事件的写入,中间通过channel实现了生产者消费者模型。

EventRecorder

我们先来看下EventRecorder 接口定义:event.go#L88,提供了一下4个方法

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
    // Event constructs an event from the given information and puts it in the queue for sending.
    // 'object' is the object this event is about. Event will make a reference-- or you may also
    // pass a reference to the object directly.
    // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
    // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
    // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
    // to automate handling of events, so imagine people writing switch statements to handle them.
    // You want to make that easy.
    // 'message' is intended to be human readable.
    //
    // The resulting event will be created in the same namespace as the reference object.
    Event(object runtime.Object, eventtype, reason, message string)

    // Eventf is just like Event, but with Sprintf for the message field.
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

    // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
    PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

    // AnnotatedEventf is just like eventf, but with annotations attached
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

主要参数说明:

  • object 对应event资源定义中的 involvedObject
  • eventtype 对应event资源定义中的type,可选Normal,Warning.
  • reason :事件原因
  • message :事件消息

我们来看下当我们调用 Event(object runtime.Object, eventtype, reason, message string) 的整个过程。
发现最终都调用到了 generateEvent 方法:event.go#L316

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
    .....
    event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
    event.Source = recorder.source
    go func() {
        // NOTE: events should be a non-blocking operation
        defer utilruntime.HandleCrash()
        recorder.Action(watch.Added, event)
    }()
}

最终事件在一个 goroutine 中通过调用 recorder.Action 进入处理,这里保证了每次调用event方法都是非阻塞的。
其中 makeEvent 的作用主要是构造了一个event对象,事件name根据InvolvedObject中的name加上时间戳生成:

注意看:对于一些非namespace资源产生的event,event的namespace是default

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
    t := metav1.Time{Time: recorder.clock.Now()}
    namespace := ref.Namespace
    if namespace == "" {
        namespace = metav1.NamespaceDefault
    }
    return &v1.Event{
        ObjectMeta: metav1.ObjectMeta{
            Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
            Namespace:   namespace,
            Annotations: annotations,
        },
        InvolvedObject: *ref,
        Reason:         reason,
        Message:        message,
        FirstTimestamp: t,
        LastTimestamp:  t,
        Count:          1,
        Type:           eventtype,
    }
}

进一步跟踪Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
    m.incoming <- Event{action, obj}
}

将event写入到了一个channel里面。
注意:
这个Action方式是apimachinery包中的方法,因为实现的sturt recorderImpl
*watch.Broadcaster 作为一个匿名struct,并且在 NewRecorder 进行 Broadcaster 赋值,这个Broadcaster其实就是 eventBroadcasterImpl 中的Broadcaster

到此,基本清楚了event最终被写入到了 Broadcaster 中的 incoming channel中,下面看下是怎么进行消费的。

消费事件

makeEventRecorder 调用的 StartLoggingStartRecordingToSink 其实就是完成了对事件的消费。

  • StartLogging直接将event输出到日志
  • StartRecordingToSink将事件写入到apiserver

两个方法内部都调用了 StartEventWatcher 方法,并且传入一个 eventHandler 方法对event进行处理

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
    watcher := e.Watch()
    go func() {
        defer utilruntime.HandleCrash()
        for watchEvent := range watcher.ResultChan() {
            event, ok := watchEvent.Object.(*v1.Event)
            if !ok {
                // This is all local, so there's no reason this should
                // ever happen.
                continue
            }
            eventHandler(event)
        }
    }()
    return watcher
}

其中 watcher.ResultChan 方法就拿到了事件,这里是在一个goroutine中通过func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法调用将event又写入了broadcasterWatcher.result

主要看下 StartRecordingToSink 提供的的eventHandlerrecordToSink 方法:

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
    // Make a copy before modification, because there could be multiple listeners.
    // Events are safe to copy like this.
    eventCopy := *event
    event = &eventCopy
    result, err := eventCorrelator.EventCorrelate(event)
    if err != nil {
        utilruntime.HandleError(err)
    }
    if result.Skip {
        return
    }
    tries := 0
    for {
        if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
            break
        }
        tries++
        if tries >= maxTriesPerEvent {
            klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
            break
        }
        // Randomize the first sleep so that various clients won't all be
        // synced up if the master goes down.
        // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
        if tries == 1 {
            time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
        } else {
            time.Sleep(sleepDuration)
        }
    }
}

其中event被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)

下面一个for循环就是在进行重试,最大重试次数是12次,调用 recordEvent 方法才真正将event写入到了apiserver。

事件处理

我们来看下EventCorrelate方法:

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
    if newEvent == nil {
        return nil, fmt.Errorf("event is nil")
    }
    aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
    observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
    if c.filterFunc(observedEvent) {
        return &EventCorrelateResult{Skip: true}, nil
    }
    return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

分别调用了 aggregator.EventAggregate logger.eventObservefilterFunc 三个方法,分别作用是:

  1. aggregator.EventAggregate:聚合event,如果在最近 10 分钟出现过 10 个相似的事件(除了 message 和时间戳之外其他关键字段都相同的事件),aggregator 会把它们的 message 设置为 (combined from similar events)+event.Message
  2. logger.eventObserve:它会把相同的事件以及包含 aggregator 被聚合了的相似的事件,通过增加 Count 字段来记录事件发生了多少次。
  3. filterFunc: 这里实现了一个基于令牌桶的限流算法,如果超过设定的速率则丢弃,保证了apiserver的安全。

我们主要来看下aggregator.EventAggregate方法:

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
    now := metav1.NewTime(e.clock.Now())
    var record aggregateRecord
    // eventKey is the full cache key for this event
    //eventKey 是将除了时间戳外所有字段结合在一起
    eventKey := getEventKey(newEvent)
    // aggregateKey is for the aggregate event, if one is needed.
    //aggregateKey 是除了message和时间戳外的字段结合在一起,localKey 是message
    aggregateKey, localKey := e.keyFunc(newEvent)

    // Do we have a record of similar events in our cache?
    e.Lock()
    defer e.Unlock()
    //从cache中根据aggregateKey查询是否存在,如果是相同或者相类似的事件会被放入cache中
    value, found := e.cache.Get(aggregateKey)
    if found {
        record = value.(aggregateRecord)
    }

    //判断上次事件产生的时间是否超过10分钟,如何操作则重新生成一个localKeys集合(集合中存放message)
    maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
    interval := now.Time.Sub(record.lastTimestamp.Time)
    if interval > maxInterval {
        record = aggregateRecord{localKeys: sets.NewString()}
    }

    // Write the new event into the aggregation record and put it on the cache
    //将locakKey也就是message放入集合中,如果message相同就是覆盖了
    record.localKeys.Insert(localKey)
    record.lastTimestamp = now
    e.cache.Add(aggregateKey, record)

    // If we are not yet over the threshold for unique events, don't correlate them
    //判断localKeys集合中存放的类似事件是否超过10个,
    if uint(record.localKeys.Len()) < e.maxEvents {
        return newEvent, eventKey
    }

    // do not grow our local key set any larger than max
    record.localKeys.PopAny()

    // create a new aggregate event, and return the aggregateKey as the cache key
    // (so that it can be overwritten.)
    eventCopy := &v1.Event{
        ObjectMeta: metav1.ObjectMeta{
            Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
            Namespace: newEvent.Namespace,
        },
        Count:          1,
        FirstTimestamp: now,
        InvolvedObject: newEvent.InvolvedObject,
        LastTimestamp:  now,
        //这里会对message加个前缀:(combined from similar events):
        Message:        e.messageFunc(newEvent),
        Type:           newEvent.Type,
        Reason:         newEvent.Reason,
        Source:         newEvent.Source,
    }
    return eventCopy, aggregateKey
}

aggregator.EventAggregate方法中其实就是判断了通过cache和localKeys判断事件是否相似,如果最近 10 分钟出现过 10 个相似的事件就合并并加上前缀,后续通过logger.eventObserve方法进行count累加,如果message也相同,肯定就是直接count++。

总结

好了,event处理的整个流程基本就是这样,我们可以概括一下,可以结合文中的图对比一起看下:

  1. 创建 EventRecorder 对象,通过其提供的 Event 等方法,创建好event对象
  2. 将创建出来的对象发送给 EventBroadcaster 中的channel中
  3. EventBroadcaster 通过后台运行的goroutine,从管道中取出事件,并广播给提前注册好的handler处理
  4. 当输出log的handler收到事件就直接打印事件
  5. EventSink handler收到处理事件就通过预处理之后将事件发送给apiserver
  6. 其中预处理包含三个动作,1、限流 2、聚合 3、计数
  7. apiserver收到事件处理之后就存储在etcd中

回顾event的整个流程,可以看到event并不是保证100%事件写入(从预处理的过程来看),这样做是为了后端服务etcd的可用性,因为event事件在整个集群中产生是非常频繁的,尤其在服务不稳定的时候,而相比Deployment,Pod等其他资源,又没那么的重要。所以这里做了个取舍。

参考文档:

关注公众号,获取最新文章推送
学点程序

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
9天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
9天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
9天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
10天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
8天前
|
Prometheus Kubernetes 监控
OpenAI故障复盘 - 阿里云容器服务与可观测产品如何保障大规模K8s集群稳定性
聚焦近日OpenAI的大规模K8s集群故障,介绍阿里云容器服务与可观测团队在大规模K8s场景下我们的建设与沉淀。以及分享对类似故障问题的应对方案:包括在K8s和Prometheus的高可用架构设计方面、事前事后的稳定性保障体系方面。
|
5天前
|
Kubernetes Ubuntu 网络安全
ubuntu使用kubeadm搭建k8s集群
通过以上步骤,您可以在 Ubuntu 系统上使用 kubeadm 成功搭建一个 Kubernetes 集群。本文详细介绍了从环境准备、安装 Kubernetes 组件、初始化集群到管理和使用集群的完整过程,希望对您有所帮助。在实际应用中,您可以根据具体需求调整配置,进一步优化集群性能和安全性。
38 12
|
10天前
|
Kubernetes 网络协议 应用服务中间件
Kubernetes Ingress:灵活的集群外部网络访问的利器
《Kubernetes Ingress:集群外部访问的利器-打造灵活的集群网络》介绍了如何通过Ingress实现Kubernetes集群的外部访问。前提条件是已拥有Kubernetes集群并安装了kubectl工具。文章详细讲解了Ingress的基本组成(Ingress Controller和资源对象),选择合适的版本,以及具体的安装步骤,如下载配置文件、部署Nginx Ingress Controller等。此外,还提供了常见问题的解决方案,例如镜像下载失败的应对措施。最后,通过部署示例应用展示了Ingress的实际使用方法。
27 2
|
22天前
|
存储 Kubernetes 关系型数据库
阿里云ACK备份中心,K8s集群业务应用数据的一站式灾备方案
本文源自2024云栖大会苏雅诗的演讲,探讨了K8s集群业务为何需要灾备及其重要性。文中强调了集群与业务高可用配置对稳定性的重要性,并指出人为误操作等风险,建议实施周期性和特定情况下的灾备措施。针对容器化业务,提出了灾备的新特性与需求,包括工作负载为核心、云资源信息的备份,以及有状态应用的数据保护。介绍了ACK推出的备份中心解决方案,支持命名空间、标签、资源类型等维度的备份,并具备存储卷数据保护功能,能够满足GitOps流程企业的特定需求。此外,还详细描述了备份中心的使用流程、控制台展示、灾备难点及解决方案等内容,展示了备份中心如何有效应对K8s集群资源和存储卷数据的灾备挑战。
|
2月前
|
Kubernetes 监控 Cloud Native
Kubernetes集群的高可用性与伸缩性实践
Kubernetes集群的高可用性与伸缩性实践
78 1
|
3月前
|
JSON Kubernetes 容灾
ACK One应用分发上线:高效管理多集群应用
ACK One应用分发上线,主要介绍了新能力的使用场景

热门文章

最新文章

推荐镜像

更多