99import select
1010import logging
1111import threading
12+ import errno
13+ import mmap
14+
1215from subprocess import (
1316call ,
1417Popen ,
2629string_types ,
2730defenc ,
2831PY3 ,
32+ bchr ,
2933# just to satisfy flake8 on py3
3034unicode
3135)
4145if sys .platform != 'win32' :
4246WindowsError = OSError
4347
48+ if PY3 :
49+ _bchr = bchr
50+ else :
51+ def _bchr (c ):
52+ return c
53+ # get custom byte character handling
54+
4455
4556# ==============================================================================
4657## @name Utilities
@@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
5869 :param stdout_handler: f(stdout_line_string), or None
5970 :param stderr_hanlder: f(stderr_line_string), or None
6071 :param finalizer: f(proc) - wait for proc to finish"""
61- def read_line_fast (stream ):
62- return stream .readline ()
63-
64- def read_line_slow (stream ):
72+ def parse_lines_from_buffer (fno ,buf ):
6573line = b''
66- while True :
67- char = stream .read (1 )# reads individual single byte strings
68- if not char :
69- break
74+ bi = 0
75+ lb = len (buf )
76+ while bi < lb :
77+ char = _bchr (buf [bi ])
78+ bi += 1
7079
7180if char in (b'\r ' ,b'\n ' )and line :
72- break
81+ yield bi ,line
82+ line = b''
7383else :
7484line += char
7585# END process parsed line
7686# END while file is not done reading
77- return line
7887# end
7988
80- def dispatch_line (stream ,handler ,readline ):
81- # this can possibly block for a while, but since we wake-up with at least one or more lines to handle,
82- # we are good ...
83- line = readline (stream ).decode (defenc )
84- if line and handler :
85- try :
86- handler (line )
87- except Exception :
88- # Keep reading, have to pump the lines empty nontheless
89- log .error ("Line handler exception on line: %s" ,line ,exc_info = True )
90- # end
91- return line
89+ def read_lines_from_fno (fno ,last_buf_list ):
90+ buf = os .read (fno ,mmap .PAGESIZE )
91+ buf = last_buf_list [0 ]+ buf
92+
93+ bi = 0
94+ for bi ,line in parse_lines_from_buffer (fno ,buf ):
95+ yield line
96+ # for each line to parse from the buffer
97+
98+ # keep remainder
99+ last_buf_list [0 ]= buf [bi :]
100+
101+ def dispatch_single_line (line ,handler ):
102+ line = line .decode (defenc )
103+ if line and handler :
104+ try :
105+ handler (line )
106+ except Exception :
107+ # Keep reading, have to pump the lines empty nontheless
108+ log .error ("Line handler exception on line: %s" ,line ,exc_info = True )
109+ # end
92110# end dispatch helper
111+ # end single line helper
112+
113+ def dispatch_lines (fno ,handler ,buf_list ):
114+ lc = 0
115+ for line in read_lines_from_fno (fno ,buf_list ):
116+ dispatch_single_line (line ,handler )
117+ lc += 1
118+ # for each line
119+ return lc
93120# end
94121
95- def deplete_buffer (stream ,handler ,readline ,wg = None ):
122+ def deplete_buffer (fno ,handler ,buf_list ,wg = None ):
96123while True :
97- line = dispatch_line ( stream ,handler ,readline )
98- if not line :
124+ line_count = dispatch_lines ( fno ,handler ,buf_list )
125+ if line_count == 0 :
99126break
100127# end deplete buffer
128+
129+ if buf_list [0 ]:
130+ dispatch_single_line (buf_list [0 ],handler )
131+ # end
132+
101133if wg :
102134wg .done ()
103135# end
104136
105- fdmap = {process .stdout .fileno (): (process . stdout , stdout_handler ,read_line_fast ),
106- process .stderr .fileno (): (process . stderr , stderr_handler ,read_line_slow )}
137+ fdmap = {process .stdout .fileno (): (stdout_handler ,[ b'' ] ),
138+ process .stderr .fileno (): (stderr_handler ,[ b'' ] )}
107139
108140if hasattr (select ,'poll' ):
109141# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
@@ -118,12 +150,20 @@ def deplete_buffer(stream, handler, readline, wg=None):
118150closed_streams = set ()
119151while True :
120152# no timeout
121- poll_result = poll .poll ()
153+
154+ try :
155+ poll_result = poll .poll ()
156+ except select .error as e :
157+ if e .args [0 ]== errno .EINTR :
158+ continue
159+ raise
160+ # end handle poll exception
161+
122162for fd ,result in poll_result :
123163if result & CLOSED :
124164closed_streams .add (fd )
125165else :
126- dispatch_line ( * fdmap [fd ])
166+ dispatch_lines ( fd , * fdmap [fd ])
127167# end handle closed stream
128168# end for each poll-result tuple
129169
@@ -133,19 +173,22 @@ def deplete_buffer(stream, handler, readline, wg=None):
133173# end endless loop
134174
135175# Depelete all remaining buffers
136- for stream , handler ,readline in fdmap .values ():
137- deplete_buffer (stream ,handler ,readline )
176+ for fno , ( handler ,buf_list ) in fdmap .items ():
177+ deplete_buffer (fno ,handler ,buf_list )
138178# end for each file handle
179+
180+ for fno in fdmap .keys ():
181+ poll .unregister (fno )
182+ # end don't forget to unregister !
139183else :
140184# Oh ... probably we are on windows. select.select() can only handle sockets, we have files
141185# The only reliable way to do this now is to use threads and wait for both to finish
142186# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
143187# NO: It's not enough unfortunately, and we will have to sync the threads
144188wg = WaitGroup ()
145- for fno in fdmap .keys ():
189+ for fno , ( handler , buf_list ) in fdmap .items ():
146190wg .add (1 )
147- stream ,handler ,readline = fdmap [fno ]
148- t = threading .Thread (target = lambda :deplete_buffer (stream ,handler ,readline ,wg ))
191+ t = threading .Thread (target = lambda :deplete_buffer (fno ,handler ,buf_list ,wg ))
149192t .start ()
150193# end
151194# NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's