Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4dd556a

Browse files
Use _interpreters.call().
1 parent4152f17 commit4dd556a

File tree

2 files changed

+52
-69
lines changed

2 files changed

+52
-69
lines changed

‎Lib/concurrent/futures/interpreter.py

Lines changed: 46 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raiseTypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task= (fn,args,kwargs)
52-
data=pickle.dumps(task)
53-
returndata
49+
returntask
5450

5551
ifinitializerisnotNone:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
returncls(initdata,shared)
6662
returncreate_context,resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def_capture_exc(cls,resultsid):
71-
try:
72-
yield
73-
exceptBaseExceptionasexc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None,exc))
77-
raise# re-raise
78-
79-
@classmethod
80-
def_send_script_result(cls,resultsid):
81-
_interpqueues.put(resultsid, (None,None))
82-
83-
@classmethod
84-
def_call(cls,func,args,kwargs,resultsid):
85-
withcls._capture_exc(resultsid):
86-
res=func(*argsor (),**kwargsor {})
87-
# Send the result back.
88-
withcls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res,None))
90-
91-
@classmethod
92-
def_call_pickled(cls,pickled,resultsid):
93-
withcls._capture_exc(resultsid):
94-
fn,args,kwargs=pickle.loads(pickled)
95-
cls._call(fn,args,kwargs,resultsid)
96-
9764
def__init__(self,initdata,shared=None):
9865
self.initdata=initdata
9966
self.shared=dict(shared)ifsharedelseNone
@@ -104,11 +71,46 @@ def __del__(self):
10471
ifself.interpidisnotNone:
10572
self.finalize()
10673

107-
def_exec(self,script):
108-
assertself.interpidisnotNone
109-
excinfo=_interpreters.exec(self.interpid,script,restrict=True)
74+
def_call(self,fn,args,kwargs):
75+
defdo_call(resultsid,func,*args,**kwargs):
76+
try:
77+
returnfunc(*args,**kwargs)
78+
exceptBaseExceptionasexc:
79+
# Avoid relying on globals.
80+
import_interpqueues
81+
# Send the captured exception out on the results queue,
82+
# but still leave it unhandled for the interpreter to handle.
83+
try:
84+
_interpqueues.put(resultsid,exc)
85+
exceptTypeError:
86+
# The exception is not shareable.
87+
_interpqueues.put(resultsid,None)
88+
raise# re-raise
89+
90+
args= (self.resultsid,fn,*args)
91+
res,excinfo=_interpreters.call(self.interpid,do_call,args,kwargs)
11092
ifexcinfoisnotNone:
11193
raiseExecutionFailed(excinfo)
94+
returnres
95+
96+
def_get_exception(self):
97+
# Wait for the exception data to show up.
98+
whileTrue:
99+
try:
100+
excdata=_interpqueues.get(self.resultsid)
101+
except_interpqueues.QueueNotFoundError:
102+
raise# re-raise
103+
except_interpqueues.QueueError:
104+
continue
105+
exceptModuleNotFoundError:
106+
# interpreters.queues doesn't exist, which means
107+
# QueueEmpty doesn't. Act as though it does.
108+
continue
109+
else:
110+
break
111+
exc,unboundop=excdata
112+
assertunboundopisNone,unboundop
113+
returnexc
112114

113115
definitialize(self):
114116
assertself.interpidisNone,self.interpid
@@ -119,8 +121,6 @@ def initialize(self):
119121
maxsize=0
120122
self.resultsid=_interpqueues.create(maxsize)
121123

122-
self._exec(f'from{__name__} import WorkerContext')
123-
124124
ifself.shared:
125125
_interpreters.set___main___attrs(
126126
self.interpid,self.shared,restrict=True)
@@ -148,37 +148,15 @@ def finalize(self):
148148
pass
149149

150150
defrun(self,task):
151-
data=task
152-
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'
153-
151+
fn,args,kwargs=task
154152
try:
155-
self._exec(script)
156-
exceptExecutionFailedasexc:
157-
exc_wrapper=exc
158-
else:
159-
exc_wrapper=None
160-
161-
# Return the result, or raise the exception.
162-
whileTrue:
163-
try:
164-
obj=_interpqueues.get(self.resultsid)
165-
except_interpqueues.QueueNotFoundError:
153+
returnself._call(fn,args,kwargs)
154+
exceptExecutionFailedaswrapper:
155+
exc=self._get_exception()
156+
ifexcisNone:
157+
# The exception must be not shareable.
166158
raise# re-raise
167-
except_interpqueues.QueueError:
168-
continue
169-
exceptModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res,exc),unboundop=obj
176-
assertunboundopisNone,unboundop
177-
ifexcisnotNone:
178-
assertresisNone,res
179-
assertexc_wrapperisnotNone
180-
raiseexcfromexc_wrapper
181-
returnres
159+
raiseexcfromwrapper
182160

183161

184162
classBrokenInterpreterPool(_thread.BrokenThreadPool):

‎Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
importio
44
importos
55
importpickle
6+
importselect
67
importtime
78
importunittest
89
fromconcurrent.futures.interpreterimport (
@@ -22,10 +23,14 @@ def noop():
2223

2324

2425
defwrite_msg(fd,msg):
26+
importos
2527
os.write(fd,msg+b'\0')
2628

2729

28-
defread_msg(fd):
30+
defread_msg(fd,timeout=10.0):
31+
r,_,_=select.select([fd], [], [],timeout)
32+
iffdnotinr:
33+
raiseTimeoutError('nothing to read')
2934
msg=b''
3035
whilech:=os.read(fd,1):
3136
ifch==b'\0':

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp