Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

stream api will lose data when use sslsocket #2414

Open
Labels
help wantedDenotes an issue that needs help from a contributor. Must meet "help wanted" guidelines.kind/bugCategorizes issue or PR as related to a bug.
@Novelfor

Description

@Novelfor

What happened (please include outputs or screenshots):
I use stream api to implement "kubectl exec", when i test my code, sometimes it will lose data. After debuging, i think the reason is kubernete client handle non blocking sslsocket error.

According to the Python documentation on SSL Sockets (https://docs.python.org/3/library/ssl.html#ssl-nonblocking), "SSL socket may still have data available for reading withoutselect() being aware of it".

Current code:

defupdate(self,timeout=0):

Can FIX it use sock.pending function

defupdate(self,timeout=0):"""Update channel buffers with at most one complete frame of input."""ifnotself.is_open():returnifnotself.sock.connected:self._connected=Falsereturn# The options here are:# select.select() - this will work on most OS, however, it has a#                   limitation of only able to read fd numbers up to 1024.#                   i.e. does not scale well. This was the original#                   implementation.# select.poll()   - this will work on most unix based OS, but not as#                   efficient as epoll. Will work for fd numbers above 1024.# select.epoll()  - newest and most efficient way of polling.#                   However, only works on linux.ssl_pending=0ifself.sock.is_ssl():ssl_pending=self.sock.sock.pending()ifhasattr(select,"poll"):poll=select.poll()poll.register(self.sock.sock,select.POLLIN)iftimeoutisnotNone:timeout*=1_000# poll method uses milliseconds as the time unitr=poll.poll(timeout)poll.unregister(self.sock.sock)else:r,_,_=select.select(                (self.sock.sock, ), (), (),timeout)ifrorssl_pending>0:

What you expected to happen:

How to reproduce it (as minimally and precisely as possible):
Simple test to exec vim, it will lose cursor easily.. but should write a simple tty, here is my code. vim\htop can easily reproduce the data lose.

fromkubernetes.streamimportws_clientimporttermiosimportptyimportfcntlimportstructimportjsonimportsignalimportttyimportselectimportosimportthreadingimportyamlimportsysimporttimefromdatetimeimportdatetime,timezonedefappend_file(data):withopen("test.txt","a")asf:now=datetime.now(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")f.write(f"{now}{data}\n")classInteractiveShell:def__init__(self,client:ws_client.WSClient,has_stdin=True,has_tty=True,outfile=None,errfile=None):self.client=clientself.has_stdin=has_stdinself.has_tty=has_ttyself.master_fd=Noneself.keep_ping=threading.Thread(target=self._keep_ping,daemon=True)self.keep_ping.start()iferrfileisNone:self.errfile=pty.STDERR_FILENOelse:self.errfile=errfileifoutfileisNone:self.outfile=pty.STDOUT_FILENOelse:self.outfile=outfiledef_keep_ping(self):whileTrue:try:self.client.write_channel(6,"ping")time.sleep(60*10)exceptExceptionase:breakdef_set_pty_size(self,a=None,b=None):"""        Sets the window size of the child pty based on the window size of               our own controlling terminal.        """ifnotself.has_tty:returnpacked=fcntl.ioctl(pty.STDOUT_FILENO,termios.TIOCGWINSZ,struct.pack('HHHH',0,0,0,0))rows,cols,h_pixels,v_pixels=struct.unpack('HHHH',packed)self.client.write_channel(ws_client.RESIZE_CHANNEL,json.dumps({"Height":rows,"Width":cols}))defspawn(self,argv=None):ifself.has_tty:old_handler=signal.signal(signal.SIGWINCH,self._set_pty_size)try:self.old_settings=tty.tcgetattr(pty.STDIN_FILENO)tty.setraw(pty.STDIN_FILENO)excepttty.error:passself._set_pty_size()ret_data=Nonereturncode=-1try:ret_data=self.main_loop()finally:ifself.has_tty:termios.tcsetattr(sys.stdin,termios.TCSADRAIN,self.old_settings)ifret_dataisNone:err=self.client.read_channel(ws_client.ERROR_CHANNEL)ret_data=yaml.safe_load(err)ifret_dataisNoneorret_data['status']=="Success":returncode=0else:returncode=int(ret_data['details']['causes'][0]['message'])returnreturncodedefforward_stdin_thread(self):whileTrue:rfds,_,_=select.select([pty.STDIN_FILENO], [], [],1.0)iflen(rfds)==0:continueifpty.STDIN_FILENOinrfdsandself.has_stdin:data=os.read(pty.STDIN_FILENO,1024)append_file(f"STDIN:{data}")ifdata:ifdata==b"0":termios.tcsetattr(sys.stdin,termios.TCSADRAIN,self.old_settings)fromIPythonimportembedembed()op_code,rdata=self.client.sock.recv_data_frame(False)append_file(f"0 received, op_code:{op_code}, data:{rdata}")self.client.write_stdin(data)else:breakdefmain_loop(self):forward_thread=threading.Thread(target=self.forward_stdin_thread,daemon=True)forward_thread.start()whileTrue:self.client.update(timeout=1.0)ifself.client.peek_channel(ws_client.STDOUT_CHANNEL):data=self.client.read_channel(ws_client.STDOUT_CHANNEL)ifdata:append_file(f"STDOUT:{data}\n")self.write_stdout(data)elifself.client.peek_channel(ws_client.STDERR_CHANNEL):error_data=self.client.read_channel(ws_client.STDERR_CHANNEL)iferror_data:self.write_stderr(error_data)elifself.client.peek_channel(ws_client.RESIZE_CHANNEL):resize_data=self.client.read_channel(ws_client.RESIZE_CHANNEL)ifresize_data:resize_info=json.loads(resize_data)rows=resize_info.get("Height",24)cols=resize_info.get("Width",80)elifself.client.peek_channel(ws_client.ERROR_CHANNEL):error_data=self.client.read_channel(ws_client.ERROR_CHANNEL)error_msg=yaml.safe_load(error_data)returnerror_msgdefwrite_stdout(self,data):os.write(self.outfile,data)defwrite_stderr(self,data):os.write(self.errfile,data)defforward_stdin(self,data):assertself.clientisnotNoneself.client.write_stdin(data)if__name__=="__main__":fromkubernetesimportclient,configfromkubernetes.streamimportstreampod_name="tlaunch-4fc21ba6-0"namespace="ws-xingyuan"config.load_kube_config("/etc/kube.conf")core_api=client.CoreV1Api()client=stream(core_api.connect_get_namespaced_pod_exec,name=pod_name,namespace=namespace,command="zsh",stderr=True,stdin=True,stdout=True,tty=True,_preload_content=False,binary=True    )shell=InteractiveShell(client,has_stdin=True,has_tty=True)return_code=shell.spawn()

I also write a test.py, but it's difficult to reproduce. Use exec to run test code, input any key, send time to host, sometimes (almost 30~50 times, depends on host cpu speed) it delay receving the data until next input.

fromdatetimeimportdatetime,timezoneimportsysif__name__=="__main__":try:whileTrue:input()print(datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")+"padding test",flush=True)exceptKeyboardInterrupt:sys.exit(254)

Anything else we need to know?:

Environment:

Metadata

Metadata

Assignees

No one assigned

    Labels

    help wantedDenotes an issue that needs help from a contributor. Must meet "help wanted" guidelines.kind/bugCategorizes issue or PR as related to a bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2025 Movatter.jp