2626
2727# XXX temporary: a monkey-patched subprocess.Popen
2828if compat .PY34 :
29- from . import tmp_subprocess
29+ from .tmp_subprocess import _Popen
3030else :
31- # Python 3.3 has a different version of Popen
32- from . import tmp_subprocess33 as tmp_subprocess
31+ # shows that we can fallback to an older version of subprocess.Popen
32+ # safely: it will block, but asyncio will still work.
33+ _Popen = subprocess .Popen
3334
3435
3536__all__ = ['SelectorEventLoop' ,
@@ -665,97 +666,108 @@ def _set_inheritable(fd, inheritable):
665666 fcntl .fcntl (fd , fcntl .F_SETFD , old & ~ cloexec_flag )
666667
667668
668- class _NonBlockingPopen (tmp_subprocess ._Popen ):
669- """A modified Popen which performs IO operations using an event loop."""
670- # TODO can we include the stdin trick in popen?
671- def __init__ (self , loop , exec_waiter , watcher , * args , ** kwargs ):
672- self ._loop = loop
673- self ._watcher = watcher
674- self ._exec_waiter = exec_waiter
675- super ().__init__ (* args , ** kwargs )
676-
677- def _cleanup_on_exec_failure (self ):
678- super ()._cleanup_on_exec_failure ()
679- self ._exec_waiter = None
680- self ._loop = None
681- self ._watcher = None
682-
683- def _get_exec_err_pipe (self ):
684- errpipe_read , errpipe_write = self ._loop ._socketpair ()
685- errpipe_read .setblocking (False )
686- _set_inheritable (errpipe_write .fileno (), False )
687- return errpipe_read .detach (), errpipe_write .detach ()
669+ if hasattr (_Popen , "_wait_exec_done" ):
670+ class _NonBlockingPopen (_Popen ):
671+ """A modified Popen which performs IO operations using an event loop."""
672+ def __init__ (self , loop , exec_waiter , watcher , * args , ** kwargs ):
673+ self ._loop = loop
674+ self ._watcher = watcher
675+ self ._exec_waiter = exec_waiter
676+ super ().__init__ (* args , ** kwargs )
688677
689- def _wait_exec_done (self , orig_executable , cwd , errpipe_read ):
690- errpipe_data = bytearray ()
691- self ._loop .add_reader (errpipe_read , self ._read_errpipe ,
692- orig_executable , cwd , errpipe_read , errpipe_data )
693-
694- def _read_errpipe (self , orig_executable , cwd , errpipe_read , errpipe_data ):
695- try :
696- part = os .read (errpipe_read , 50000 )
697- except BlockingIOError :
698- return
699- except Exception as exc :
700- self ._loop .remove_reader (errpipe_read )
701- os .close (errpipe_read )
702- self ._exec_waiter .set_exception (exc )
703- self ._cleanup_on_exec_failure ()
704- else :
705- if part and len (errpipe_data ) <= 50000 :
706- errpipe_data .extend (part )
678+ def _cleanup_on_exec_failure (self ):
679+ super ()._cleanup_on_exec_failure ()
680+ self ._exec_waiter = None
681+ self ._loop = None
682+ self ._watcher = None
683+
684+ def _get_exec_err_pipe (self ):
685+ errpipe_read , errpipe_write = self ._loop ._socketpair ()
686+ errpipe_read .setblocking (False )
687+ _set_inheritable (errpipe_write .fileno (), False )
688+ return errpipe_read .detach (), errpipe_write .detach ()
689+
690+ def _wait_exec_done (self , orig_executable , cwd , errpipe_read ):
691+ errpipe_data = bytearray ()
692+ self ._loop .add_reader (errpipe_read , self ._read_errpipe ,
693+ orig_executable , cwd , errpipe_read ,
694+ errpipe_data )
695+
696+ def _read_errpipe (self , orig_executable , cwd , errpipe_read ,
697+ errpipe_data ):
698+ try :
699+ part = os .read (errpipe_read , 50000 )
700+ except BlockingIOError :
707701 return
702+ except Exception as exc :
703+ self ._loop .remove_reader (errpipe_read )
704+ os .close (errpipe_read )
705+ self ._exec_waiter .set_exception (exc )
706+ self ._cleanup_on_exec_failure ()
707+ else :
708+ if part and len (errpipe_data ) <= 50000 :
709+ errpipe_data .extend (part )
710+ return
708711
709- self ._loop .remove_reader (errpipe_read )
710- os .close (errpipe_read )
712+ self ._loop .remove_reader (errpipe_read )
713+ os .close (errpipe_read )
711714
712- if errpipe_data :
713- # asynchronously wait until the process terminated
714- self ._watcher .add_child_handler (
715- self .pid , self ._check_exec_result , orig_executable ,
716- cwd , errpipe_data )
717- else :
715+ if errpipe_data :
716+ # asynchronously wait until the process terminated
717+ self ._watcher .add_child_handler (
718+ self .pid , self ._check_exec_result , orig_executable ,
719+ cwd , errpipe_data )
720+ else :
721+ if not self ._exec_waiter .cancelled ():
722+ self ._exec_waiter .set_result (None )
723+ self ._exec_waiter = None
724+ self ._loop = None
725+ self ._watcher = None
726+
727+ def _check_exec_result (self , pid , returncode , orig_executable , cwd ,
728+ errpipe_data ):
729+ try :
730+ super ()._check_exec_result (orig_executable , cwd , errpipe_data )
731+ except Exception as exc :
718732 if not self ._exec_waiter .cancelled ():
719- self ._exec_waiter .set_result (None )
720- self ._exec_waiter = None
721- self ._loop = None
722- self ._watcher = None
723-
724- def _check_exec_result (self , pid , returncode , orig_executable , cwd ,
725- errpipe_data ):
726- try :
727- super ()._check_exec_result (orig_executable , cwd , errpipe_data )
728- except Exception as exc :
729- if not self ._exec_waiter .cancelled ():
730- self ._exec_waiter .set_exception (exc )
731- self ._cleanup_on_exec_failure ()
733+ self ._exec_waiter .set_exception (exc )
734+ self ._cleanup_on_exec_failure ()
735+ else :
736+ _NonBlockingPopen = None
732737
733738
734739class _UnixSubprocessTransport (base_subprocess .BaseSubprocessTransport ):
735740 @coroutine
736741 def _start (self , args , shell , stdin , stdout , stderr , bufsize , ** kwargs ):
742+ stdin_w = None
743+ if stdin == subprocess .PIPE :
744+ # Use a socket pair for stdin, since not all platforms
745+ # support selecting read events on the write end of a
746+ # socket (which we use in order to detect closing of the
747+ # other end). Notably this is needed on AIX, and works
748+ # just fine on other platforms.
749+ stdin , stdin_w = self ._loop ._socketpair ()
750+
751+ # Mark the write end of the stdin pipe as non-inheritable,
752+ # needed by close_fds=False on Python 3.3 and older
753+ # (Python 3.4 implements the PEP 446, socketpair returns
754+ # non-inheritable sockets)
755+ _set_inheritable (stdin_w .fileno (), False )
756+
737757 with events .get_child_watcher () as watcher :
738- stdin_w = None
739- if stdin == subprocess .PIPE :
740- # Use a socket pair for stdin, since not all platforms
741- # support selecting read events on the write end of a
742- # socket (which we use in order to detect closing of the
743- # other end). Notably this is needed on AIX, and works
744- # just fine on other platforms.
745- stdin , stdin_w = self ._loop ._socketpair ()
746-
747- # Mark the write end of the stdin pipe as non-inheritable,
748- # needed by close_fds=False on Python 3.3 and older
749- # (Python 3.4 implements the PEP 446, socketpair returns
750- # non-inheritable sockets)
751- _set_inheritable (stdin_w .fileno (), False )
752- exec_waiter = self ._loop .create_future ()
753758 try :
754- self ._proc = _NonBlockingPopen (
755- self ._loop , exec_waiter , watcher , args , shell = shell ,
756- stdin = stdin , stdout = stdout , stderr = stderr ,
757- universal_newlines = False , bufsize = bufsize , ** kwargs )
758- yield from exec_waiter
759+ if _NonBlockingPopen :
760+ exec_waiter = self ._loop .create_future ()
761+ self ._proc = _NonBlockingPopen (
762+ self ._loop , exec_waiter , watcher , args , shell = shell ,
763+ stdin = stdin , stdout = stdout , stderr = stderr ,
764+ universal_newlines = False , bufsize = bufsize , ** kwargs )
765+ yield from exec_waiter
766+ else :
767+ self ._proc = subprocess .Popen (
768+ args , shell = shell , stdin = stdin , stdout = stdout ,
769+ stderr = stderr , universal_newlines = False ,
770+ bufsize = bufsize , ** kwargs )
759771 except :
760772 self ._failed_before_start = True
761773 # TODO stdin is probably closed by proc, but what about stdin_w
@@ -766,9 +778,10 @@ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
766778 else :
767779 watcher .add_child_handler (self ._proc .pid ,
768780 self ._child_watcher_callback )
769- if stdin_w is not None :
770- stdin .close ()
771- self ._proc .stdin = open (stdin_w .detach (), 'wb' , buffering = bufsize )
781+
782+ if stdin_w is not None :
783+ stdin .close ()
784+ self ._proc .stdin = open (stdin_w .detach (), 'wb' , buffering = bufsize )
772785
773786 def _child_watcher_callback (self , pid , returncode ):
774787 self ._loop .call_soon_threadsafe (self ._process_exited , returncode )
0 commit comments