Posted on • Originally published atdannypsnl.github.io on
Tracing source code of Kubernetes client-go
Whole thing is started fromIngress this feature ofKubernetes.But today I’m not going to talk too much about it, basically just I have to letIngress Controller will send packets to ourRouter so that we could do the thing we want, if you are interested in ourRouter , you can more infos fromour blog and demo by just login to play with it.
Anyway, the thing I’m going to do for this is I have to create a proxy for real kubernetes API server,and modify the real data to what we want. To do that, I have to understand howclient-go(Ingress use client-go to get info, of course) send requests and what it expected. Let’s start!
NOTE: I just mention some part of codes, not explaining whole big piture
epEventHandler:=cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){updateCh.In()<-Event{Type:CreateEvent,Obj:obj,}},DeleteFunc:func(objinterface{}){updateCh.In()<-Event{Type:DeleteEvent,Obj:obj,}},UpdateFunc:func(old,curinterface{}){oep:=old.(*corev1.Endpoints)cep:=cur.(*corev1.Endpoints)if!reflect.DeepEqual(cep.Subsets,oep.Subsets){updateCh.In()<-Event{Type:UpdateEvent,Obj:cur,}}},}
These codes atingress-nginx tagnginx-v0.20.0
(at following context we just use this tag),fileinternal/ingress/controller/store/store.go
line446
The purpose is emit these callbacks intoSharedInformer
to get kubernetes events for updating the datas in store,to generate nginx configuration for load balancing these pods.
Ok, so where we useepEventHandler
? We would see it be passed intostore.informers.Endpoint
atthe same function, line519
store.informers.Endpoint.AddEventHandler(epEventHandler)
Here we should care about two things
- what is
Endpoint
? - how it uses the functions sent into
AddEventHandler
?
Let’s keep dig into the code, we would seeAddEventHandler
is a method of aninterface
:SharedInformer
, yes, we just talk about it, now we see it.SharedInformer
is defined underk8s.io/client-go/tools/cache/shared_informer.go
(remember, here what I’m tracing is theclient-go
underingress-nginx
vendor, so it might outdated with latestclient-go
)
The only implementor ofSharedInformer
issharedIndexInformer
(still at same file), it’s a structure, here is the real code ofAddEventHandler
func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){// ignore, here would do some period syncinglistener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)// ignore, here would emit `listener` into `processer`}funcnewProcessListener(handlerResourceEventHandler,requestedResyncPeriod,resyncPeriodtime.Duration,nowtime.Time,bufferSizeint)*processorListener{ret:=&processorListener{nextCh:make(chaninterface{}),addCh:make(chaninterface{}),handler:handler,pendingNotifications:*buffer.NewRingGrowing(bufferSize),requestedResyncPeriod:requestedResyncPeriod,resyncPeriod:resyncPeriod,}ret.determineNextResync(now)returnret}
To here, we should stop this part, because we can’t get more from these.So I go back to how to usesharedIndexInformer
I found type ofstore.informers
have a methodRun
that would be called by store,that’s mean what it call is the point we care, that’sstore.informers.Endpoint
func(i*Informer)Run(stopChchanstruct{}){// this is *sharedIndexInformer.Rungoi.Endpoint.Run(stopCh)// ignore, all resource is working under the same way}func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){// this is last line, I ignore others codess.controller.Run(stopCh)}
Then I take a look at how controller works
// Run begins processing items, and will continue until a value is sent down stopCh.// It's an error to call Run more than once.// Run blocks; call via go.func(c*controller)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()gofunc(){<-stopChc.config.Queue.Close()}()r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync=c.config.ShouldResyncr.clock=c.clockc.reflectorMutex.Lock()c.reflector=rc.reflectorMutex.Unlock()varwgwait.Groupdeferwg.Wait()wg.StartWithChannel(stopCh,r.Run)wait.Until(c.processLoop,time.Second,stopCh)}
The point iswg.StartWithChannel(stopCh, r.Run)
, inreflector.Run
,it callr.ListAndWatch(stopCh)
, andListAndWatch
is based onlistWatcher
list,err:=r.listerWatcher.List(options)iferr!=nil{returnfmt.Errorf("%s: Failed to list %v: %v",r.name,r.expectedType,err)}
We would go back here later, let’s find out what islisterWatcher
We setstore.informers.Endpoint
by thisstore.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
atinternal/ingress/controller/store/store.go:L264
Then we seeinfFactory
, line257
infFactory:=informers.NewSharedInformerFactoryWithOptions(client,resyncPeriod,informers.WithNamespace(namespace),informers.WithTweakListOptions(func(*metav1.ListOptions){}))
informer
:
func(f*endpointsInformer)Informer()cache.SharedIndexInformer{returnf.factory.InformerFor(&corev1.Endpoints{},f.defaultInformer)}// defaultInformerfunc(f*endpointsInformer)defaultInformer(clientkubernetes.Interface,resyncPeriodtime.Duration)cache.SharedIndexInformer{returnNewFilteredEndpointsInformer(client,f.namespace,resyncPeriod,cache.Indexers{cache.NamespaceIndex:cache.MetaNamespaceIndexFunc},f.tweakListOptions)}// NewFilteredEndpointsInformerfuncNewFilteredEndpointsInformer(clientkubernetes.Interface,namespacestring,resyncPeriodtime.Duration,indexerscache.Indexers,tweakListOptionsinternalinterfaces.TweakListOptionsFunc)cache.SharedIndexInformer{returncache.NewSharedIndexInformer(&cache.ListWatch{ListFunc:func(optionsmetav1.ListOptions)(runtime.Object,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Endpoints(namespace).List(options)},WatchFunc:func(optionsmetav1.ListOptions)(watch.Interface,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Endpoints(namespace).Watch(options)},},&corev1.Endpoints{},resyncPeriod,indexers,)}
Ha, we gotListWatch
now, it would call an instance of*kubernetes.ClientSet
to get the info it wanted!
Now we can back toListAndWatch
, let’s take a look at the details of it.
In fact, I’m more focused on watch API, because it’s a little bit weird.I found it’s server with keep sending data until client part close the connection.How it did it? Atk8s.io/client-go/tools/cache/reflector.go:L226
for{// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect{case<-stopCh:returnnildefault:}timeoutSeconds:=int64(minWatchTimeout.Seconds()*(rand.Float64()+1.0))options=metav1.ListOptions{ResourceVersion:resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window.TimeoutSeconds:&timeoutSeconds,}r.metrics.numberOfWatches.Inc()w,err:=r.listerWatcher.Watch(options)iferr!=nil{switcherr{caseio.EOF:// watch closed normallycaseio.ErrUnexpectedEOF:glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v",r.name,r.expectedType,err)default:utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v",r.name,r.expectedType,err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.ifurlError,ok:=err.(*url.Error);ok{ifopError,ok:=urlError.Err.(*net.OpError);ok{iferrno,ok:=opError.Err.(syscall.Errno);ok&&errno==syscall.ECONNREFUSED{time.Sleep(time.Second)continue}}}returnnil}iferr:=r.watchHandler(w,&resourceVersion,resyncerrc,stopCh);err!=nil{iferr!=errorStopRequested{glog.Warningf("%s: watch of %v ended with: %v",r.name,r.expectedType,err)}returnnil}}
Of course is an endless loop, would stop by channel or return.
The tricky part is it check error content, if it’s a probable EOF, it would keep taking data rather stop the connection.
Ok, everything seems to make sense right now, but that’s not enough, I’m very confused by why it could receiving a JSON data by such as a streaming way, so let’s go back to seeclient.CoreV1().Endpoints(namespace).Watch(options)
// Watch returns a watch.Interface that watches the requested endpoints.func(c*endpoints)Watch(optsmetav1.ListOptions)(watch.Interface,error){opts.Watch=truereturnc.client.Get().Namespace(c.ns).Resource("endpoints").VersionedParams(&opts,scheme.ParameterCodec).Watch()}// Watch attempts to begin watching the requested location.// Returns a watch.Interface, or an error.func(r*Request)Watch()(watch.Interface,error){returnr.WatchWithSpecificDecoders(func(bodyio.ReadCloser)streaming.Decoder{framer:=r.serializers.Framer.NewFrameReader(body)returnstreaming.NewDecoder(framer,r.serializers.StreamingSerializer)},r.serializers.Decoder,)}
And I found the point isr.serializers
, and the shit thing is it still is a function send by external code.
If you trace back then you would find it’s from*RESTClient.serializers
, atk8s.io/client-go/rest/client.go
, line225
and227
send this intoNewRequest
And you found it’s created at line108
in same file,serializers, err := createSerializers(config)
funccreateSerializers(configContentConfig)(*Serializers,error){// ignore, we don't care them since we just use `StreamSerializer` of `Serializers`ifinfo.StreamSerializer!=nil{s.StreamingSerializer=info.StreamSerializer.Serializers.Framer=info.StreamSerializer.Framer}returns,nil}
We would see the type ofStreamSerializer
isruntime.Serializer
, it’s an interface, and since we are sending JSON data, so we go to the JSON one implementor of it to see it’sDecode
import(jsoniter"github.com/json-iterator/go")
After seeing that, I know the trace already done, because my question already been answered, they usegithub.com/json-iterator/go
this library
I guess I would talk about something about how to create a kube API proxy with modifying data after completing my proxy of kube API server. (It’s really hard XD)
I guess today the most interesting thing we learned is Go*http.Response
is aReadCloser
!(How Kubernetes did their watch trick)
Anyway, thanks for the read, hope these could help you more detailed understanding Kubernetes client implementation and be a little start point to read more about it.
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse