Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

林子篆
林子篆

Posted on • Originally published atdannypsnl.github.io on

     

Tracing source code of Kubernetes client-go

Whole thing is started fromIngress this feature ofKubernetes.But today I’m not going to talk too much about it, basically just I have to letIngress Controller will send packets to ourRouter so that we could do the thing we want, if you are interested in ourRouter , you can more infos fromour blog and demo by just login to play with it.

Anyway, the thing I’m going to do for this is I have to create a proxy for real kubernetes API server,and modify the real data to what we want. To do that, I have to understand howclient-go(Ingress use client-go to get info, of course) send requests and what it expected. Let’s start!

NOTE: I just mention some part of codes, not explaining whole big piture

epEventHandler:=cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){updateCh.In()<-Event{Type:CreateEvent,Obj:obj,}},DeleteFunc:func(objinterface{}){updateCh.In()<-Event{Type:DeleteEvent,Obj:obj,}},UpdateFunc:func(old,curinterface{}){oep:=old.(*corev1.Endpoints)cep:=cur.(*corev1.Endpoints)if!reflect.DeepEqual(cep.Subsets,oep.Subsets){updateCh.In()<-Event{Type:UpdateEvent,Obj:cur,}}},}
Enter fullscreen modeExit fullscreen mode

These codes atingress-nginx tagnginx-v0.20.0(at following context we just use this tag),fileinternal/ingress/controller/store/store.go line446

The purpose is emit these callbacks intoSharedInformer to get kubernetes events for updating the datas in store,to generate nginx configuration for load balancing these pods.

Ok, so where we useepEventHandler? We would see it be passed intostore.informers.Endpoint atthe same function, line519

store.informers.Endpoint.AddEventHandler(epEventHandler)
Enter fullscreen modeExit fullscreen mode

Here we should care about two things

  • what isEndpoint?
  • how it uses the functions sent intoAddEventHandler?

Let’s keep dig into the code, we would seeAddEventHandler is a method of aninterface:SharedInformer, yes, we just talk about it, now we see it.SharedInformer is defined underk8s.io/client-go/tools/cache/shared_informer.go(remember, here what I’m tracing is theclient-go underingress-nginx vendor, so it might outdated with latestclient-go)

The only implementor ofSharedInformer issharedIndexInformer(still at same file), it’s a structure, here is the real code ofAddEventHandler

func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){// ignore, here would do some period syncinglistener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)// ignore, here would emit `listener` into `processer`}funcnewProcessListener(handlerResourceEventHandler,requestedResyncPeriod,resyncPeriodtime.Duration,nowtime.Time,bufferSizeint)*processorListener{ret:=&processorListener{nextCh:make(chaninterface{}),addCh:make(chaninterface{}),handler:handler,pendingNotifications:*buffer.NewRingGrowing(bufferSize),requestedResyncPeriod:requestedResyncPeriod,resyncPeriod:resyncPeriod,}ret.determineNextResync(now)returnret}
Enter fullscreen modeExit fullscreen mode

To here, we should stop this part, because we can’t get more from these.So I go back to how to usesharedIndexInformer

I found type ofstore.informers have a methodRun that would be called by store,that’s mean what it call is the point we care, that’sstore.informers.Endpoint

func(i*Informer)Run(stopChchanstruct{}){// this is *sharedIndexInformer.Rungoi.Endpoint.Run(stopCh)// ignore, all resource is working under the same way}func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){// this is last line, I ignore others codess.controller.Run(stopCh)}
Enter fullscreen modeExit fullscreen mode

Then I take a look at how controller works

// Run begins processing items, and will continue until a value is sent down stopCh.// It's an error to call Run more than once.// Run blocks; call via go.func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()gofunc(){<-stopChc.config.Queue.Close()}()r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync=c.config.ShouldResyncr.clock=c.clockc.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()varwgwait.Groupdeferwg.Wait()wg.StartWithChannel(stopCh,r.Run)wait.Until(c.processLoop,time.Second,stopCh)}
Enter fullscreen modeExit fullscreen mode

The point iswg.StartWithChannel(stopCh, r.Run), inreflector.Run,it callr.ListAndWatch(stopCh), andListAndWatch is based onlistWatcher

list,err:=r.listerWatcher.List(options)iferr!=nil{returnfmt.Errorf("%s: Failed to list %v: %v",r.name,r.expectedType,err)}
Enter fullscreen modeExit fullscreen mode

We would go back here later, let’s find out what islisterWatcher

We setstore.informers.Endpoint by thisstore.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() atinternal/ingress/controller/store/store.go:L264

Then we seeinfFactory, line257

infFactory:=informers.NewSharedInformerFactoryWithOptions(client,resyncPeriod,informers.WithNamespace(namespace),informers.WithTweakListOptions(func(*metav1.ListOptions){}))
Enter fullscreen modeExit fullscreen mode

informer:

func(f*endpointsInformer)Informer()cache.SharedIndexInformer{returnf.factory.InformerFor(&corev1.Endpoints{},f.defaultInformer)}// defaultInformerfunc(f*endpointsInformer)defaultInformer(clientkubernetes.Interface,resyncPeriodtime.Duration)cache.SharedIndexInformer{returnNewFilteredEndpointsInformer(client,f.namespace,resyncPeriod,cache.Indexers{cache.NamespaceIndex:cache.MetaNamespaceIndexFunc},f.tweakListOptions)}// NewFilteredEndpointsInformerfuncNewFilteredEndpointsInformer(clientkubernetes.Interface,namespacestring,resyncPeriodtime.Duration,indexerscache.Indexers,tweakListOptionsinternalinterfaces.TweakListOptionsFunc)cache.SharedIndexInformer{returncache.NewSharedIndexInformer(&cache.ListWatch{ListFunc:func(optionsmetav1.ListOptions)(runtime.Object,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Endpoints(namespace).List(options)},WatchFunc:func(optionsmetav1.ListOptions)(watch.Interface,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Endpoints(namespace).Watch(options)},},&corev1.Endpoints{},resyncPeriod,indexers,)}
Enter fullscreen modeExit fullscreen mode

Ha, we gotListWatch now, it would call an instance of*kubernetes.ClientSet to get the info it wanted!

Now we can back toListAndWatch, let’s take a look at the details of it.

In fact, I’m more focused on watch API, because it’s a little bit weird.I found it’s server with keep sending data until client part close the connection.How it did it? Atk8s.io/client-go/tools/cache/reflector.go:L226

for{// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect{case<-stopCh:returnnildefault:}timeoutSeconds:=int64(minWatchTimeout.Seconds()*(rand.Float64()+1.0))options=metav1.ListOptions{ResourceVersion:resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window.TimeoutSeconds:&timeoutSeconds,}r.metrics.numberOfWatches.Inc()w,err:=r.listerWatcher.Watch(options)iferr!=nil{switcherr{caseio.EOF:// watch closed normallycaseio.ErrUnexpectedEOF:glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v",r.name,r.expectedType,err)default:utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v",r.name,r.expectedType,err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.ifurlError,ok:=err.(*url.Error);ok{ifopError,ok:=urlError.Err.(*net.OpError);ok{iferrno,ok:=opError.Err.(syscall.Errno);ok&&errno==syscall.ECONNREFUSED{time.Sleep(time.Second)continue}}}returnnil}iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);err!=nil{iferr!=errorStopRequested{glog.Warningf("%s: watch of %v ended with: %v",r.name,r.expectedType,err)}returnnil}}
Enter fullscreen modeExit fullscreen mode

Of course is an endless loop, would stop by channel or return.

The tricky part is it check error content, if it’s a probable EOF, it would keep taking data rather stop the connection.

Ok, everything seems to make sense right now, but that’s not enough, I’m very confused by why it could receiving a JSON data by such as a streaming way, so let’s go back to seeclient.CoreV1().Endpoints(namespace).Watch(options)

// Watch returns a watch.Interface that watches the requested endpoints.func(c*endpoints)Watch(optsmetav1.ListOptions)(watch.Interface,error){opts.Watch=truereturnc.client.Get().Namespace(c.ns).Resource("endpoints").VersionedParams(&opts,scheme.ParameterCodec).Watch()}// Watch attempts to begin watching the requested location.// Returns a watch.Interface, or an error.func(r*Request)Watch()(watch.Interface,error){returnr.WatchWithSpecificDecoders(func(bodyio.ReadCloser)streaming.Decoder{framer:=r.serializers.Framer.NewFrameReader(body)returnstreaming.NewDecoder(framer,r.serializers.StreamingSerializer)},r.serializers.Decoder,)}
Enter fullscreen modeExit fullscreen mode

And I found the point isr.serializers, and the shit thing is it still is a function send by external code.

If you trace back then you would find it’s from*RESTClient.serializers, atk8s.io/client-go/rest/client.go, line225 and227 send this intoNewRequest

And you found it’s created at line108 in same file,serializers, err := createSerializers(config)

funccreateSerializers(configContentConfig)(*Serializers,error){// ignore, we don't care them since we just use `StreamSerializer` of `Serializers`ifinfo.StreamSerializer!=nil{s.StreamingSerializer=info.StreamSerializer.Serializers.Framer=info.StreamSerializer.Framer}returns,nil}
Enter fullscreen modeExit fullscreen mode

We would see the type ofStreamSerializer isruntime.Serializer, it’s an interface, and since we are sending JSON data, so we go to the JSON one implementor of it to see it’sDecode

import(jsoniter"github.com/json-iterator/go")
Enter fullscreen modeExit fullscreen mode

After seeing that, I know the trace already done, because my question already been answered, they usegithub.com/json-iterator/go this library

I guess I would talk about something about how to create a kube API proxy with modifying data after completing my proxy of kube API server. (It’s really hard XD)

I guess today the most interesting thing we learned is Go*http.Response is aReadCloser!(How Kubernetes did their watch trick)

Anyway, thanks for the read, hope these could help you more detailed understanding Kubernetes client implementation and be a little start point to read more about it.

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

I am a programming language theory lover; good at system software like Networking, OS.
  • Location
    Taiwan
  • Education
    Kaohsiung Medical University
  • Joined

More from林子篆

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp