标签:完成 参考 nic 同步 efault period url cas factor
在实际的开发工作中,Informer 主要用在两处:
package main import ( "flag" "fmt" "log" "path/filepath" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } // 初始化 client clientset, err := kubernetes.NewForConfig(config) if err != nil { log.Panic(err.Error()) } stopper := make(chan struct{}) defer close(stopper) // 初始化 informer factory := informers.NewSharedInformerFactory(clientset, 0) nodeInformer := factory.Core().V1().Nodes() informer := nodeInformer.Informer() defer runtime.HandleCrash() // 启动 informer,list & watch go factory.Start(stopper) // 从 apiserver 同步资源,即 list if !cache.WaitForCacheSync(stopper, informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } // 使用自定义 handler informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用 DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") }, }) // 创建 lister nodeLister := nodeInformer.Lister() // 从 lister 中获取所有 items nodeList, err := nodeLister.List(labels.Everything()) if err != nil { fmt.Println(err) } fmt.Println("nodelist:", nodeList) <-stopper } func onAdd(obj interface{}) { node := obj.(*corev1.Node) fmt.Println("add a node:", node.Name) }
(1) 创建一个控制器
(2) 启动控制器
(3) 当收到变更事件后,执行 callback
(4) worker loop
下面是自定义 controller 使用的一个参考:
var ( masterURL string kubeconfig string ) func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") } func main() { flag.Parse() stopCh := signals.SetupSignalHandler() cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { glog.Fatalf("Error building kubeconfig: %s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } // 所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client // informer watch apiserver,每隔 30 秒 resync 一次(list) kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30) controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes()) // 启动 informer go kubeInformerFactory.Start(stopCh) // start controller if err = controller.Run(2, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } } // NewController returns a new network controller func NewController( kubeclientset kubernetes.Interface, networkclientset clientset.Interface, networkInformer informers.NetworkInformer) *Controller { // Create event broadcaster // Add sample-controller types to the default Kubernetes Scheme so Events can be // logged for sample-controller types. utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme)) glog.V(4).Info("Creating event broadcaster") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) controller := &Controller{ kubeclientset: kubeclientset, networkclientset: networkclientset, networksLister: networkInformer.Lister(), networksSynced: networkInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"), recorder: recorder, } glog.Info("Setting up event handlers") // Set up an event handler for when Network resources change networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueNetwork, UpdateFunc: func(old, new interface{}) { oldNetwork := old.(*samplecrdv1.Network) newNetwork := new.(*samplecrdv1.Network) if oldNetwork.ResourceVersion == newNetwork.ResourceVersion { // Periodic resync will send update events for all known Networks. // Two different versions of the same Network will always have different RVs. return } controller.enqueueNetwork(new) }, DeleteFunc: controller.enqueueNetworkForDelete, }) return controller }
自定义controller参考:https://github.com/resouer/k8s-controller-custom-resource
标签:完成 参考 nic 同步 efault period url cas factor
原文地址:https://www.cnblogs.com/wuchangblog/p/14609879.html