标签:项目 类型 cancel 故障 信号 分发 蓝牙 结合 目标
感谢分享原文-http://bjbsair.com/2020-04-03/tech-info/29914.html
Kubeedge除了在kubernetes的方面做了各种异步通信通道,保障offline后的业务连续性之外;还定义了一系列的设备抽象,用来管理边缘设备。而且,其v1.0版本正朝着边缘端服务网格,以及函数式计算等方向发展。
官方文档:https://docs.kubeedge.io/en/latest/
架构
整体架构图比较明了,在不考虑edgesite的情况下,其架构分为了云端和边缘端。其实可以理解为kubernetes的管理侧和kubelet节点侧(对应edge端)。但是请注意,这里的场景是边缘计算,意味着edge端的网络环境难以保障。
云边通信
于是就衍生出了cloud端的cloud Hub与edge端的Edge Hub。这两个模块之间通过websocket或者quic通信,相当于建立了一条底层通信隧道,供k8s和其他应用通信。当然,使用什么协议通信不是重点,重点是如何保障当着之间的链路都无法保障的时候,业务不受到影响,这就是MetaManager的要解决的问题了。
边缘端
云端
Controller
然后再说controller,其实准确的说controller是包括了用于edge端与API-Server同步信息的edgeController与用于DeviceTwin与API-Server同步device CRD信息的deviceController组成。这两个模块相对也比较简单,后面具体讲解。
各个模块实现
边缘端
beehive模块在整个kubeedge中扮演了非常重要的作用,它实现了一套Module管理的接口,程序中各个模块的启动、运行、模块间的通信等都是由其统一封装管理。下图是kubeedge的edge端代码的main启动流程,这里涉及到的modules就是由beehive提供。
可以看到,在初始化的时候,分别加载了各个edge端modules的init函数,用来注册其modules到heehive框架中。然后在core.Run中遍历启动(StartModules)。
另外,值得提及的是,用于模块间通信,发送message到group/module的功能,在beehive中,其实是通过channel来通信的。这也是golang推荐的goroutine间通信的方式。
EdgeHub
重点是启动了两个go routine,用来实现往两个方向的消息接收和分发。这里go ehc.routeToEdge对应从隧道端点接收cloud端发往edge端的消息,然后调用ehc.dispatch解析出消息的目标module并基于heehive模块module间消息的通信机制来转发出去。
同理,go ehc.routeToCloud实现将edge端消息基于隧道转发到cloud端的cloudHub模块处理。当然,该模块中实现了对同步消息的response等到超时处理的逻辑,当在未超时期间收到response消息,会转发到消息发送端模块。比较暴力的是,一旦发送消息到cloud失败,该goroutine会退出,通知所有模块,当前与cloud端是未连接状态,然后重新发起连接。
metaManager在与cloud的连接断开期间,会使用本地DB中的数据,不会发起往cloud端的查询。
Edged
这块基本是调用kubelet的代码,实现较多的是启动流程。另外,将之前kubelet的client作为fake的假接口,转而将数据都通过metaClient来存储数据到metaManager,从而代理之前直接访问api-server的操作。这块的学习可以参考之前分析kubelet架构的一篇文章Kubelet源码架构简介。
这里差异化的一块代码在e.syncPod的实现,通过读取metaManager和EdgeController的pod任务列表,来执行对本地pod的操作。同时,这些pod关联的configmap和secret也会随着处理pod的过程而一并处理。对pod的操作也是基于一个操作类别的queue,比如e.podAddWorkerRun就启动了一个用于消费添加pod的queue的goroutine。外部的封装基本就这样,内部完全通过引用kubelet原生的包来处理。
MetaManager
从代码架构看起来,该模块比较简单,首先在外层按照一定周期给自己发送消息,触发定时同步pod状态到cloud端。另外,在mainLoop中启动一个独立的goroutine接收外部消息,并执行处理逻辑。
处理逻辑基于消息类型分类,分别包括:
重点来说增删查改,拿添加举例。当接收到要添加某个资源时,会将资源解析出来,组织成为key、type、value的三元组,以一种类似于模拟NoSQL的方式保存到本地的SqlLite数据库中。这样保存的目的也是为了方便快速检索和增删。保存完之后,需要对应发送response消息到请求消息的源模块。
EventBus与ServiceBus
eventBus用于对接MQTT Broker与beehive,MQTT broker有几种启动模式,从代码实现的角度分为:
在内嵌MQTT broker模式下,eventBus启动了golang实现的broker包gomqtt用来作为外部MQTT设备的接入,具体用法请参考其github项目首页。两种模式下eventBus都做了一些共性的操作,具体包括:
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
"SYS/dis/upload_records",
}
2.当接收到对应的event时,触发回调函数onSubscribe
3.回调函数中,对event做了简单的分类,分别发送到不同的目的地(DeviceTwin或EventHub)
所有 $hw/events/device/+/twin/+和 $hw/events/node/+/membership/gettopic 的event发送到DeviceTwin,其他的event直接发送到EventHub再同步到Cloud端。
当然,该部分也包括了创建客户端,往MQTT broker发布events的接口,这里就不展开了。
ServiceBus启动一个goroutine来接受来自beehive的消息,然后基于消息中带的参数,通过调用http client将消息通过REST-API发送到本地127.0.0.1上的目标APP。这相当于一个客户端,而APP是一个http Rest-API server,所有的操作和设备状态都需要客户端调用接口来下发和获取。
DeviceTwin包含一下几个部分的功能:
由于这个部分和设备更紧密相关,为何要分着几个类别,都是如何抽象,这块的理解方面还不够透彻,我们暂时只关注其主业务逻辑,官方文档对这块有比较详细的描述devicetwin。
云端
入口
这里重点关注在init中加载了cloudHub、controller(也就是edgeController)和devicecontroller三个部分。然后和edge端一样,都是beehive的套路,调用StartModules来启动所有的模块。
CloudHub
handler.WebSocketHandler.ServeEvent在websocket server上接收新边缘node的连接,然后为新node分配channel queue。再进一步将消息交给负责内容读写的逻辑处理。
channelq.NewChannelEventQueue为每一个边缘node维护了一个对应的channel queue(这里默认有10个消息的缓存),然后调用go q.dispatchMessage 来接收由controller发送到clouHub的消息,基于消息内容解析其目的node,然后将消息发送到node对应的channel排队处理。
clouHub的核心逻辑包括这两部分,读和写:
如果cloudHub往node发送消息失败,就会触发EventHandler的CancelNode操作;如果结合edgeHub端的行为的话,我们知道edgeHub会重新发起到cloud端的新连接,然后重新走一遍同步流程。
Controller(EdgeController)
controller的核心逻辑是upstream和downstream。
DeviceController
deviceController同edgeController,只是其关心的资源不再是k8s的workload的子资源,而是为device定义的CRD,包括:device和deviceModel。由于主要逻辑都通edgeControler,这里不再做详细介绍。感谢分享原文-http://bjbsair.com/2020-04-03/tech-info/29914.html
Kubeedge除了在kubernetes的方面做了各种异步通信通道,保障offline后的业务连续性之外;还定义了一系列的设备抽象,用来管理边缘设备。而且,其v1.0版本正朝着边缘端服务网格,以及函数式计算等方向发展。
官方文档:https://docs.kubeedge.io/en/latest/
架构
整体架构图比较明了,在不考虑edgesite的情况下,其架构分为了云端和边缘端。其实可以理解为kubernetes的管理侧和kubelet节点侧(对应edge端)。但是请注意,这里的场景是边缘计算,意味着edge端的网络环境难以保障。
云边通信
于是就衍生出了cloud端的cloud Hub与edge端的Edge Hub。这两个模块之间通过websocket或者quic通信,相当于建立了一条底层通信隧道,供k8s和其他应用通信。当然,使用什么协议通信不是重点,重点是如何保障当着之间的链路都无法保障的时候,业务不受到影响,这就是MetaManager的要解决的问题了。
边缘端
云端
Controller
然后再说controller,其实准确的说controller是包括了用于edge端与API-Server同步信息的edgeController与用于DeviceTwin与API-Server同步device CRD信息的deviceController组成。这两个模块相对也比较简单,后面具体讲解。
各个模块实现
边缘端
beehive模块在整个kubeedge中扮演了非常重要的作用,它实现了一套Module管理的接口,程序中各个模块的启动、运行、模块间的通信等都是由其统一封装管理。下图是kubeedge的edge端代码的main启动流程,这里涉及到的modules就是由beehive提供。
可以看到,在初始化的时候,分别加载了各个edge端modules的init函数,用来注册其modules到heehive框架中。然后在core.Run中遍历启动(StartModules)。
另外,值得提及的是,用于模块间通信,发送message到group/module的功能,在beehive中,其实是通过channel来通信的。这也是golang推荐的goroutine间通信的方式。
EdgeHub
重点是启动了两个go routine,用来实现往两个方向的消息接收和分发。这里go ehc.routeToEdge对应从隧道端点接收cloud端发往edge端的消息,然后调用ehc.dispatch解析出消息的目标module并基于heehive模块module间消息的通信机制来转发出去。
同理,go ehc.routeToCloud实现将edge端消息基于隧道转发到cloud端的cloudHub模块处理。当然,该模块中实现了对同步消息的response等到超时处理的逻辑,当在未超时期间收到response消息,会转发到消息发送端模块。比较暴力的是,一旦发送消息到cloud失败,该goroutine会退出,通知所有模块,当前与cloud端是未连接状态,然后重新发起连接。
metaManager在与cloud的连接断开期间,会使用本地DB中的数据,不会发起往cloud端的查询。
Edged
这块基本是调用kubelet的代码,实现较多的是启动流程。另外,将之前kubelet的client作为fake的假接口,转而将数据都通过metaClient来存储数据到metaManager,从而代理之前直接访问api-server的操作。这块的学习可以参考之前分析kubelet架构的一篇文章Kubelet源码架构简介。
这里差异化的一块代码在e.syncPod的实现,通过读取metaManager和EdgeController的pod任务列表,来执行对本地pod的操作。同时,这些pod关联的configmap和secret也会随着处理pod的过程而一并处理。对pod的操作也是基于一个操作类别的queue,比如e.podAddWorkerRun就启动了一个用于消费添加pod的queue的goroutine。外部的封装基本就这样,内部完全通过引用kubelet原生的包来处理。
MetaManager
从代码架构看起来,该模块比较简单,首先在外层按照一定周期给自己发送消息,触发定时同步pod状态到cloud端。另外,在mainLoop中启动一个独立的goroutine接收外部消息,并执行处理逻辑。
处理逻辑基于消息类型分类,分别包括:
重点来说增删查改,拿添加举例。当接收到要添加某个资源时,会将资源解析出来,组织成为key、type、value的三元组,以一种类似于模拟NoSQL的方式保存到本地的SqlLite数据库中。这样保存的目的也是为了方便快速检索和增删。保存完之后,需要对应发送response消息到请求消息的源模块。
EventBus与ServiceBus
eventBus用于对接MQTT Broker与beehive,MQTT broker有几种启动模式,从代码实现的角度分为:
在内嵌MQTT broker模式下,eventBus启动了golang实现的broker包gomqtt用来作为外部MQTT设备的接入,具体用法请参考其github项目首页。两种模式下eventBus都做了一些共性的操作,具体包括:
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
"SYS/dis/upload_records",
}
2.当接收到对应的event时,触发回调函数onSubscribe
3.回调函数中,对event做了简单的分类,分别发送到不同的目的地(DeviceTwin或EventHub)
所有 $hw/events/device/+/twin/+和 $hw/events/node/+/membership/gettopic 的event发送到DeviceTwin,其他的event直接发送到EventHub再同步到Cloud端。
当然,该部分也包括了创建客户端,往MQTT broker发布events的接口,这里就不展开了。
ServiceBus启动一个goroutine来接受来自beehive的消息,然后基于消息中带的参数,通过调用http client将消息通过REST-API发送到本地127.0.0.1上的目标APP。这相当于一个客户端,而APP是一个http Rest-API server,所有的操作和设备状态都需要客户端调用接口来下发和获取。
DeviceTwin包含一下几个部分的功能:
由于这个部分和设备更紧密相关,为何要分着几个类别,都是如何抽象,这块的理解方面还不够透彻,我们暂时只关注其主业务逻辑,官方文档对这块有比较详细的描述devicetwin。
云端
入口
这里重点关注在init中加载了cloudHub、controller(也就是edgeController)和devicecontroller三个部分。然后和edge端一样,都是beehive的套路,调用StartModules来启动所有的模块。
CloudHub
handler.WebSocketHandler.ServeEvent在websocket server上接收新边缘node的连接,然后为新node分配channel queue。再进一步将消息交给负责内容读写的逻辑处理。
channelq.NewChannelEventQueue为每一个边缘node维护了一个对应的channel queue(这里默认有10个消息的缓存),然后调用go q.dispatchMessage 来接收由controller发送到clouHub的消息,基于消息内容解析其目的node,然后将消息发送到node对应的channel排队处理。
clouHub的核心逻辑包括这两部分,读和写:
如果cloudHub往node发送消息失败,就会触发EventHandler的CancelNode操作;如果结合edgeHub端的行为的话,我们知道edgeHub会重新发起到cloud端的新连接,然后重新走一遍同步流程。
Controller(EdgeController)
controller的核心逻辑是upstream和downstream。
DeviceController
deviceController同edgeController,只是其关心的资源不再是k8s的workload的子资源,而是为device定义的CRD,包括:device和deviceModel。由于主要逻辑都通edgeControler,这里不再做详细介绍。
标签:项目 类型 cancel 故障 信号 分发 蓝牙 结合 目标
原文地址:https://www.cnblogs.com/lihanlin/p/12657682.html