Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit763ef75

Browse files
committed
Using a wait-group seems to properly sync the threads for buffer depletion
1 parentc86bea6 commit763ef75

File tree

2 files changed

+43
-9
lines changed

2 files changed

+43
-9
lines changed

‎git/cmd.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
from .utilimport (
2020
LazyMixin,
21-
stream_copy
21+
stream_copy,
22+
WaitGroup
2223
)
2324
from .excimportGitCommandError
2425
fromgit.compatimport (
@@ -84,12 +85,14 @@ def dispatch_line(fno):
8485
# end dispatch helper
8586
# end
8687

87-
defdeplete_buffer(fno):
88+
defdeplete_buffer(fno,wg=None):
8889
whileTrue:
8990
line=dispatch_line(fno)
9091
ifnotline:
9192
break
9293
# end deplete buffer
94+
ifwg:
95+
wg.done()
9396
# end
9497

9598
fdmap= {process.stdout.fileno() : (process.stdout,stdout_handler,read_line_fast),
@@ -131,15 +134,16 @@ def deplete_buffer(fno):
131134
# The only reliable way to do this now is to use threads and wait for both to finish
132135
# Since the finalizer is expected to wait, we don't have to introduce our own wait primitive
133136
# NO: It's not enough unfortunately, and we will have to sync the threads
134-
threads=list()
137+
wg=WaitGroup()
135138
forfnoinfdmap.keys():
136-
t=threading.Thread(target=lambda:deplete_buffer(fno))
137-
threads.append(t)
139+
wg.add(1)
140+
t=threading.Thread(target=lambda:deplete_buffer(fno,wg))
138141
t.start()
139142
# end
140-
fortinthreads:
141-
t.join()
142-
# end
143+
# NOTE: Just joining threads can possibly fail as there is a gap between .start() and when it's
144+
# actually started, which could make the wait() call to just return because the thread is not yet
145+
# active
146+
wg.wait()
143147
# end
144148

145149
returnfinalizer(process)

‎git/util.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
importshutil
1313
importplatform
1414
importgetpass
15+
importthreading
1516

1617
# NOTE: Some of the unused imports might be used/imported by others.
1718
# Handle once test-cases are back up and running.
@@ -32,7 +33,7 @@
3233
__all__= ("stream_copy","join_path","to_native_path_windows","to_native_path_linux",
3334
"join_path_native","Stats","IndexFileSHA1Writer","Iterable","IterableList",
3435
"BlockingLockFile","LockFile",'Actor','get_user_id','assure_directory_exists',
35-
'RemoteProgress','rmtree')
36+
'RemoteProgress','rmtree','WaitGroup')
3637

3738
#{ Utility Methods
3839

@@ -699,3 +700,32 @@ def iter_items(cls, repo, *args, **kwargs):
699700
raiseNotImplementedError("To be implemented by Subclass")
700701

701702
#} END classes
703+
704+
705+
classWaitGroup(object):
706+
"""WaitGroup is like Go sync.WaitGroup.
707+
708+
Without all the useful corner cases.
709+
By Peter Teichman, taken from https://gist.github.com/pteichman/84b92ae7cef0ab98f5a8
710+
"""
711+
def__init__(self):
712+
self.count=0
713+
self.cv=threading.Condition()
714+
715+
defadd(self,n):
716+
self.cv.acquire()
717+
self.count+=n
718+
self.cv.release()
719+
720+
defdone(self):
721+
self.cv.acquire()
722+
self.count-=1
723+
ifself.count==0:
724+
self.cv.notify_all()
725+
self.cv.release()
726+
727+
defwait(self):
728+
self.cv.acquire()
729+
whileself.count>0:
730+
self.cv.wait()
731+
self.cv.release()

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp