2626
2727# XXX temporary: a monkey-patched subprocess.Popen
2828if compat .PY34 :
29- from .import tmp_subprocess
29+ from .tmp_subprocess import _Popen
3030else :
31- # Python 3.3 has a different version of Popen
32- from .import tmp_subprocess33 as tmp_subprocess
31+ # shows that we can fallback to an older version of subprocess.Popen
32+ # safely: it will block, but asyncio will still work.
33+ _Popen = subprocess .Popen
3334
3435
3536__all__ = ['SelectorEventLoop' ,
@@ -665,97 +666,108 @@ def _set_inheritable(fd, inheritable):
665666fcntl .fcntl (fd ,fcntl .F_SETFD ,old & ~ cloexec_flag )
666667
667668
668- class _NonBlockingPopen (tmp_subprocess ._Popen ):
669- """A modified Popen which performs IO operations using an event loop."""
670- # TODO can we include the stdin trick in popen?
671- def __init__ (self ,loop ,exec_waiter ,watcher ,* args ,** kwargs ):
672- self ._loop = loop
673- self ._watcher = watcher
674- self ._exec_waiter = exec_waiter
675- super ().__init__ (* args ,** kwargs )
676-
677- def _cleanup_on_exec_failure (self ):
678- super ()._cleanup_on_exec_failure ()
679- self ._exec_waiter = None
680- self ._loop = None
681- self ._watcher = None
682-
683- def _get_exec_err_pipe (self ):
684- errpipe_read ,errpipe_write = self ._loop ._socketpair ()
685- errpipe_read .setblocking (False )
686- _set_inheritable (errpipe_write .fileno (),False )
687- return errpipe_read .detach (),errpipe_write .detach ()
669+ if hasattr (_Popen ,"_wait_exec_done" ):
670+ class _NonBlockingPopen (_Popen ):
671+ """A modified Popen which performs IO operations using an event loop."""
672+ def __init__ (self ,loop ,exec_waiter ,watcher ,* args ,** kwargs ):
673+ self ._loop = loop
674+ self ._watcher = watcher
675+ self ._exec_waiter = exec_waiter
676+ super ().__init__ (* args ,** kwargs )
688677
689- def _wait_exec_done (self ,orig_executable ,cwd ,errpipe_read ):
690- errpipe_data = bytearray ()
691- self ._loop .add_reader (errpipe_read ,self ._read_errpipe ,
692- orig_executable ,cwd ,errpipe_read ,errpipe_data )
693-
694- def _read_errpipe (self ,orig_executable ,cwd ,errpipe_read ,errpipe_data ):
695- try :
696- part = os .read (errpipe_read ,50000 )
697- except BlockingIOError :
698- return
699- except Exception as exc :
700- self ._loop .remove_reader (errpipe_read )
701- os .close (errpipe_read )
702- self ._exec_waiter .set_exception (exc )
703- self ._cleanup_on_exec_failure ()
704- else :
705- if part and len (errpipe_data )<= 50000 :
706- errpipe_data .extend (part )
678+ def _cleanup_on_exec_failure (self ):
679+ super ()._cleanup_on_exec_failure ()
680+ self ._exec_waiter = None
681+ self ._loop = None
682+ self ._watcher = None
683+
684+ def _get_exec_err_pipe (self ):
685+ errpipe_read ,errpipe_write = self ._loop ._socketpair ()
686+ errpipe_read .setblocking (False )
687+ _set_inheritable (errpipe_write .fileno (),False )
688+ return errpipe_read .detach (),errpipe_write .detach ()
689+
690+ def _wait_exec_done (self ,orig_executable ,cwd ,errpipe_read ):
691+ errpipe_data = bytearray ()
692+ self ._loop .add_reader (errpipe_read ,self ._read_errpipe ,
693+ orig_executable ,cwd ,errpipe_read ,
694+ errpipe_data )
695+
696+ def _read_errpipe (self ,orig_executable ,cwd ,errpipe_read ,
697+ errpipe_data ):
698+ try :
699+ part = os .read (errpipe_read ,50000 )
700+ except BlockingIOError :
707701return
702+ except Exception as exc :
703+ self ._loop .remove_reader (errpipe_read )
704+ os .close (errpipe_read )
705+ self ._exec_waiter .set_exception (exc )
706+ self ._cleanup_on_exec_failure ()
707+ else :
708+ if part and len (errpipe_data )<= 50000 :
709+ errpipe_data .extend (part )
710+ return
708711
709- self ._loop .remove_reader (errpipe_read )
710- os .close (errpipe_read )
712+ self ._loop .remove_reader (errpipe_read )
713+ os .close (errpipe_read )
711714
712- if errpipe_data :
713- # asynchronously wait until the process terminated
714- self ._watcher .add_child_handler (
715- self .pid ,self ._check_exec_result ,orig_executable ,
716- cwd ,errpipe_data )
717- else :
715+ if errpipe_data :
716+ # asynchronously wait until the process terminated
717+ self ._watcher .add_child_handler (
718+ self .pid ,self ._check_exec_result ,orig_executable ,
719+ cwd ,errpipe_data )
720+ else :
721+ if not self ._exec_waiter .cancelled ():
722+ self ._exec_waiter .set_result (None )
723+ self ._exec_waiter = None
724+ self ._loop = None
725+ self ._watcher = None
726+
727+ def _check_exec_result (self ,pid ,returncode ,orig_executable ,cwd ,
728+ errpipe_data ):
729+ try :
730+ super ()._check_exec_result (orig_executable ,cwd ,errpipe_data )
731+ except Exception as exc :
718732if not self ._exec_waiter .cancelled ():
719- self ._exec_waiter .set_result (None )
720- self ._exec_waiter = None
721- self ._loop = None
722- self ._watcher = None
723-
724- def _check_exec_result (self ,pid ,returncode ,orig_executable ,cwd ,
725- errpipe_data ):
726- try :
727- super ()._check_exec_result (orig_executable ,cwd ,errpipe_data )
728- except Exception as exc :
729- if not self ._exec_waiter .cancelled ():
730- self ._exec_waiter .set_exception (exc )
731- self ._cleanup_on_exec_failure ()
733+ self ._exec_waiter .set_exception (exc )
734+ self ._cleanup_on_exec_failure ()
735+ else :
736+ _NonBlockingPopen = None
732737
733738
734739class _UnixSubprocessTransport (base_subprocess .BaseSubprocessTransport ):
735740@coroutine
736741def _start (self ,args ,shell ,stdin ,stdout ,stderr ,bufsize ,** kwargs ):
742+ stdin_w = None
743+ if stdin == subprocess .PIPE :
744+ # Use a socket pair for stdin, since not all platforms
745+ # support selecting read events on the write end of a
746+ # socket (which we use in order to detect closing of the
747+ # other end). Notably this is needed on AIX, and works
748+ # just fine on other platforms.
749+ stdin ,stdin_w = self ._loop ._socketpair ()
750+
751+ # Mark the write end of the stdin pipe as non-inheritable,
752+ # needed by close_fds=False on Python 3.3 and older
753+ # (Python 3.4 implements the PEP 446, socketpair returns
754+ # non-inheritable sockets)
755+ _set_inheritable (stdin_w .fileno (),False )
756+
737757with events .get_child_watcher ()as watcher :
738- stdin_w = None
739- if stdin == subprocess .PIPE :
740- # Use a socket pair for stdin, since not all platforms
741- # support selecting read events on the write end of a
742- # socket (which we use in order to detect closing of the
743- # other end). Notably this is needed on AIX, and works
744- # just fine on other platforms.
745- stdin ,stdin_w = self ._loop ._socketpair ()
746-
747- # Mark the write end of the stdin pipe as non-inheritable,
748- # needed by close_fds=False on Python 3.3 and older
749- # (Python 3.4 implements the PEP 446, socketpair returns
750- # non-inheritable sockets)
751- _set_inheritable (stdin_w .fileno (),False )
752- exec_waiter = self ._loop .create_future ()
753758try :
754- self ._proc = _NonBlockingPopen (
755- self ._loop ,exec_waiter ,watcher ,args ,shell = shell ,
756- stdin = stdin ,stdout = stdout ,stderr = stderr ,
757- universal_newlines = False ,bufsize = bufsize ,** kwargs )
758- yield from exec_waiter
759+ if _NonBlockingPopen :
760+ exec_waiter = self ._loop .create_future ()
761+ self ._proc = _NonBlockingPopen (
762+ self ._loop ,exec_waiter ,watcher ,args ,shell = shell ,
763+ stdin = stdin ,stdout = stdout ,stderr = stderr ,
764+ universal_newlines = False ,bufsize = bufsize ,** kwargs )
765+ yield from exec_waiter
766+ else :
767+ self ._proc = subprocess .Popen (
768+ args ,shell = shell ,stdin = stdin ,stdout = stdout ,
769+ stderr = stderr ,universal_newlines = False ,
770+ bufsize = bufsize ,** kwargs )
759771except :
760772self ._failed_before_start = True
761773# TODO stdin is probably closed by proc, but what about stdin_w
@@ -766,9 +778,10 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
766778else :
767779watcher .add_child_handler (self ._proc .pid ,
768780self ._child_watcher_callback )
769- if stdin_w is not None :
770- stdin .close ()
771- self ._proc .stdin = open (stdin_w .detach (),'wb' ,buffering = bufsize )
781+
782+ if stdin_w is not None :
783+ stdin .close ()
784+ self ._proc .stdin = open (stdin_w .detach (),'wb' ,buffering = bufsize )
772785
773786def _child_watcher_callback (self ,pid ,returncode ):
774787self ._loop .call_soon_threadsafe (self ._process_exited ,returncode )