- Notifications
You must be signed in to change notification settings - Fork3.4k
Description
What happened (please include outputs or screenshots):
I use stream api to implement "kubectl exec", when i test my code, sometimes it will lose data. After debuging, i think the reason is kubernete client handle non blocking sslsocket error.
According to the Python documentation on SSL Sockets (https://docs.python.org/3/library/ssl.html#ssl-nonblocking), "SSL socket may still have data available for reading withoutselect() being aware of it".
Current code:
defupdate(self,timeout=0): |
Can FIX it use sock.pending function
defupdate(self,timeout=0):"""Update channel buffers with at most one complete frame of input."""ifnotself.is_open():returnifnotself.sock.connected:self._connected=Falsereturn# The options here are:# select.select() - this will work on most OS, however, it has a# limitation of only able to read fd numbers up to 1024.# i.e. does not scale well. This was the original# implementation.# select.poll() - this will work on most unix based OS, but not as# efficient as epoll. Will work for fd numbers above 1024.# select.epoll() - newest and most efficient way of polling.# However, only works on linux.ssl_pending=0ifself.sock.is_ssl():ssl_pending=self.sock.sock.pending()ifhasattr(select,"poll"):poll=select.poll()poll.register(self.sock.sock,select.POLLIN)iftimeoutisnotNone:timeout*=1_000# poll method uses milliseconds as the time unitr=poll.poll(timeout)poll.unregister(self.sock.sock)else:r,_,_=select.select( (self.sock.sock, ), (), (),timeout)ifrorssl_pending>0:
What you expected to happen:
How to reproduce it (as minimally and precisely as possible):
Simple test to exec vim, it will lose cursor easily.. but should write a simple tty, here is my code. vim\htop can easily reproduce the data lose.
fromkubernetes.streamimportws_clientimporttermiosimportptyimportfcntlimportstructimportjsonimportsignalimportttyimportselectimportosimportthreadingimportyamlimportsysimporttimefromdatetimeimportdatetime,timezonedefappend_file(data):withopen("test.txt","a")asf:now=datetime.now(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")f.write(f"{now}{data}\n")classInteractiveShell:def__init__(self,client:ws_client.WSClient,has_stdin=True,has_tty=True,outfile=None,errfile=None):self.client=clientself.has_stdin=has_stdinself.has_tty=has_ttyself.master_fd=Noneself.keep_ping=threading.Thread(target=self._keep_ping,daemon=True)self.keep_ping.start()iferrfileisNone:self.errfile=pty.STDERR_FILENOelse:self.errfile=errfileifoutfileisNone:self.outfile=pty.STDOUT_FILENOelse:self.outfile=outfiledef_keep_ping(self):whileTrue:try:self.client.write_channel(6,"ping")time.sleep(60*10)exceptExceptionase:breakdef_set_pty_size(self,a=None,b=None):""" Sets the window size of the child pty based on the window size of our own controlling terminal. """ifnotself.has_tty:returnpacked=fcntl.ioctl(pty.STDOUT_FILENO,termios.TIOCGWINSZ,struct.pack('HHHH',0,0,0,0))rows,cols,h_pixels,v_pixels=struct.unpack('HHHH',packed)self.client.write_channel(ws_client.RESIZE_CHANNEL,json.dumps({"Height":rows,"Width":cols}))defspawn(self,argv=None):ifself.has_tty:old_handler=signal.signal(signal.SIGWINCH,self._set_pty_size)try:self.old_settings=tty.tcgetattr(pty.STDIN_FILENO)tty.setraw(pty.STDIN_FILENO)excepttty.error:passself._set_pty_size()ret_data=Nonereturncode=-1try:ret_data=self.main_loop()finally:ifself.has_tty:termios.tcsetattr(sys.stdin,termios.TCSADRAIN,self.old_settings)ifret_dataisNone:err=self.client.read_channel(ws_client.ERROR_CHANNEL)ret_data=yaml.safe_load(err)ifret_dataisNoneorret_data['status']=="Success":returncode=0else:returncode=int(ret_data['details']['causes'][0]['message'])returnreturncodedefforward_stdin_thread(self):whileTrue:rfds,_,_=select.select([pty.STDIN_FILENO], [], [],1.0)iflen(rfds)==0:continueifpty.STDIN_FILENOinrfdsandself.has_stdin:data=os.read(pty.STDIN_FILENO,1024)append_file(f"STDIN:{data}")ifdata:ifdata==b"0":termios.tcsetattr(sys.stdin,termios.TCSADRAIN,self.old_settings)fromIPythonimportembedembed()op_code,rdata=self.client.sock.recv_data_frame(False)append_file(f"0 received, op_code:{op_code}, data:{rdata}")self.client.write_stdin(data)else:breakdefmain_loop(self):forward_thread=threading.Thread(target=self.forward_stdin_thread,daemon=True)forward_thread.start()whileTrue:self.client.update(timeout=1.0)ifself.client.peek_channel(ws_client.STDOUT_CHANNEL):data=self.client.read_channel(ws_client.STDOUT_CHANNEL)ifdata:append_file(f"STDOUT:{data}\n")self.write_stdout(data)elifself.client.peek_channel(ws_client.STDERR_CHANNEL):error_data=self.client.read_channel(ws_client.STDERR_CHANNEL)iferror_data:self.write_stderr(error_data)elifself.client.peek_channel(ws_client.RESIZE_CHANNEL):resize_data=self.client.read_channel(ws_client.RESIZE_CHANNEL)ifresize_data:resize_info=json.loads(resize_data)rows=resize_info.get("Height",24)cols=resize_info.get("Width",80)elifself.client.peek_channel(ws_client.ERROR_CHANNEL):error_data=self.client.read_channel(ws_client.ERROR_CHANNEL)error_msg=yaml.safe_load(error_data)returnerror_msgdefwrite_stdout(self,data):os.write(self.outfile,data)defwrite_stderr(self,data):os.write(self.errfile,data)defforward_stdin(self,data):assertself.clientisnotNoneself.client.write_stdin(data)if__name__=="__main__":fromkubernetesimportclient,configfromkubernetes.streamimportstreampod_name="tlaunch-4fc21ba6-0"namespace="ws-xingyuan"config.load_kube_config("/etc/kube.conf")core_api=client.CoreV1Api()client=stream(core_api.connect_get_namespaced_pod_exec,name=pod_name,namespace=namespace,command="zsh",stderr=True,stdin=True,stdout=True,tty=True,_preload_content=False,binary=True )shell=InteractiveShell(client,has_stdin=True,has_tty=True)return_code=shell.spawn()
I also write a test.py, but it's difficult to reproduce. Use exec to run test code, input any key, send time to host, sometimes (almost 30~50 times, depends on host cpu speed) it delay receving the data until next input.
fromdatetimeimportdatetime,timezoneimportsysif__name__=="__main__":try:whileTrue:input()print(datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")+"padding test",flush=True)exceptKeyboardInterrupt:sys.exit(254)
Anything else we need to know?:
Environment: