Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitea29541

Browse files
committed
Another take on fixing the current concurrent read implementation in git.cmd
There have been rather obvious errors in there, as we forgot to unregisterthe filehandles. Now we will read from a buffer ourselves, which should befaster and ideally, doesn't lead to spurious errors anymore.Related togitpython-developers#232
1 parente395ac9 commitea29541

File tree

1 file changed

+77
-34
lines changed

1 file changed

+77
-34
lines changed

‎git/cmd.py

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
importselect
1010
importlogging
1111
importthreading
12+
importerrno
13+
importmmap
14+
1215
fromsubprocessimport (
1316
call,
1417
Popen,
@@ -26,6 +29,7 @@
2629
string_types,
2730
defenc,
2831
PY3,
32+
bchr,
2933
# just to satisfy flake8 on py3
3034
unicode
3135
)
@@ -41,6 +45,13 @@
4145
ifsys.platform!='win32':
4246
WindowsError=OSError
4347

48+
ifPY3:
49+
_bchr=bchr
50+
else:
51+
def_bchr(c):
52+
returnc
53+
# get custom byte character handling
54+
4455

4556
# ==============================================================================
4657
## @name Utilities
@@ -58,52 +69,73 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
5869
:param stdout_handler: f(stdout_line_string), or None
5970
:param stderr_hanlder: f(stderr_line_string), or None
6071
:param finalizer: f(proc) - wait for proc to finish"""
61-
defread_line_fast(stream):
62-
returnstream.readline()
63-
64-
defread_line_slow(stream):
72+
defparse_lines_from_buffer(fno,buf):
6573
line=b''
66-
whileTrue:
67-
char=stream.read(1)# reads individual single byte strings
68-
ifnotchar:
69-
break
74+
bi=0
75+
lb=len(buf)
76+
whilebi<lb:
77+
char=_bchr(buf[bi])
78+
bi+=1
7079

7180
ifcharin (b'\r',b'\n')andline:
72-
break
81+
yieldbi,line
82+
line=b''
7383
else:
7484
line+=char
7585
# END process parsed line
7686
# END while file is not done reading
77-
returnline
7887
# end
7988

80-
defdispatch_line(stream,handler,readline):
81-
# this can possibly block for a while, but since we wake-up with at least one or more lines to handle,
82-
# we are good ...
83-
line=readline(stream).decode(defenc)
84-
iflineandhandler:
85-
try:
86-
handler(line)
87-
exceptException:
88-
# Keep reading, have to pump the lines empty nontheless
89-
log.error("Line handler exception on line: %s",line,exc_info=True)
90-
# end
91-
returnline
89+
defread_lines_from_fno(fno,last_buf_list):
90+
buf=os.read(fno,mmap.PAGESIZE)
91+
buf=last_buf_list[0]+buf
92+
93+
bi=0
94+
forbi,lineinparse_lines_from_buffer(fno,buf):
95+
yieldline
96+
# for each line to parse from the buffer
97+
98+
# keep remainder
99+
last_buf_list[0]=buf[bi:]
100+
101+
defdispatch_single_line(line,handler):
102+
line=line.decode(defenc)
103+
iflineandhandler:
104+
try:
105+
handler(line)
106+
exceptException:
107+
# Keep reading, have to pump the lines empty nontheless
108+
log.error("Line handler exception on line: %s",line,exc_info=True)
109+
# end
92110
# end dispatch helper
111+
# end single line helper
112+
113+
defdispatch_lines(fno,handler,buf_list):
114+
lc=0
115+
forlineinread_lines_from_fno(fno,buf_list):
116+
dispatch_single_line(line,handler)
117+
lc+=1
118+
# for each line
119+
returnlc
93120
# end
94121

95-
defdeplete_buffer(stream,handler,readline,wg=None):
122+
defdeplete_buffer(fno,handler,buf_list,wg=None):
96123
whileTrue:
97-
line=dispatch_line(stream,handler,readline)
98-
ifnotline:
124+
line_count=dispatch_lines(fno,handler,buf_list)
125+
ifline_count==0:
99126
break
100127
# end deplete buffer
128+
129+
ifbuf_list[0]:
130+
dispatch_single_line(buf_list[0],handler)
131+
# end
132+
101133
ifwg:
102134
wg.done()
103135
# end
104136

105-
fdmap= {process.stdout.fileno(): (process.stdout,stdout_handler,read_line_fast),
106-
process.stderr.fileno(): (process.stderr,stderr_handler,read_line_slow)}
137+
fdmap= {process.stdout.fileno(): (stdout_handler,[b'']),
138+
process.stderr.fileno(): (stderr_handler,[b''])}
107139

108140
ifhasattr(select,'poll'):
109141
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
@@ -118,12 +150,20 @@ def deplete_buffer(stream, handler, readline, wg=None):
118150
closed_streams=set()
119151
whileTrue:
120152
# no timeout
121-
poll_result=poll.poll()
153+
154+
try:
155+
poll_result=poll.poll()
156+
exceptselect.errorase:
157+
ife.args[0]==errno.EINTR:
158+
continue
159+
raise
160+
# end handle poll exception
161+
122162
forfd,resultinpoll_result:
123163
ifresult&CLOSED:
124164
closed_streams.add(fd)
125165
else:
126-
dispatch_line(*fdmap[fd])
166+
dispatch_lines(fd,*fdmap[fd])
127167
# end handle closed stream
128168
# end for each poll-result tuple
129169

@@ -133,19 +173,22 @@ def deplete_buffer(stream, handler, readline, wg=None):
133173
# end endless loop
134174

135175
# Depelete all remaining buffers
136-
forstream,handler,readlineinfdmap.values():
137-
deplete_buffer(stream,handler,readline)
176+
forfno, (handler,buf_list)infdmap.items():
177+
deplete_buffer(fno,handler,buf_list)
138178
# end for each file handle
179+
180+
forfnoinfdmap.keys():
181+
poll.unregister(fno)
182+
# end don't forget to unregister !
139183
else:
140184
# Oh ... probably we are on windows. select.select() can only handle sockets, we have files
141185
# The only reliable way to do this now is to use threads and wait for both to finish
142186
# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
143187
# NO: It's not enough unfortunately, and we will have to sync the threads
144188
wg=WaitGroup()
145-
forfnoinfdmap.keys():
189+
forfno, (handler,buf_list)infdmap.items():
146190
wg.add(1)
147-
stream,handler,readline=fdmap[fno]
148-
t=threading.Thread(target=lambda:deplete_buffer(stream,handler,readline,wg))
191+
t=threading.Thread(target=lambda:deplete_buffer(fno,handler,buf_list,wg))
149192
t.start()
150193
# end
151194
# NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp