Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1208ff

Browse files
Use _interpreters.call().
1 parentfe95f3f commitf1208ff

File tree

2 files changed

+106
-89
lines changed

2 files changed

+106
-89
lines changed

‎Lib/concurrent/futures/interpreter.py

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
4545
# XXX Circle back to this later.
4646
raiseTypeError('scripts not supported')
4747
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5148
task= (fn,args,kwargs)
52-
data=pickle.dumps(task)
53-
returndata
49+
returntask
5450

5551
ifinitializerisnotNone:
5652
try:
@@ -65,35 +61,6 @@ def create_context():
6561
returncls(initdata,shared)
6662
returncreate_context,resolve_task
6763

68-
@classmethod
69-
@contextlib.contextmanager
70-
def_capture_exc(cls,resultsid):
71-
try:
72-
yield
73-
exceptBaseExceptionasexc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None,exc))
77-
raise# re-raise
78-
79-
@classmethod
80-
def_send_script_result(cls,resultsid):
81-
_interpqueues.put(resultsid, (None,None))
82-
83-
@classmethod
84-
def_call(cls,func,args,kwargs,resultsid):
85-
withcls._capture_exc(resultsid):
86-
res=func(*argsor (),**kwargsor {})
87-
# Send the result back.
88-
withcls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res,None))
90-
91-
@classmethod
92-
def_call_pickled(cls,pickled,resultsid):
93-
withcls._capture_exc(resultsid):
94-
fn,args,kwargs=pickle.loads(pickled)
95-
cls._call(fn,args,kwargs,resultsid)
96-
9764
def__init__(self,initdata,shared=None):
9865
self.initdata=initdata
9966
self.shared=dict(shared)ifsharedelseNone
@@ -104,11 +71,56 @@ def __del__(self):
10471
ifself.interpidisnotNone:
10572
self.finalize()
10673

107-
def_exec(self,script):
108-
assertself.interpidisnotNone
109-
excinfo=_interpreters.exec(self.interpid,script,restrict=True)
74+
def_call(self,fn,args,kwargs):
75+
defdo_call(resultsid,func,*args,**kwargs):
76+
try:
77+
returnfunc(*args,**kwargs)
78+
exceptBaseExceptionasexc:
79+
# Avoid relying on globals.
80+
import_interpreters
81+
import_interpqueues
82+
# Send the captured exception out on the results queue,
83+
# but still leave it unhandled for the interpreter to handle.
84+
try:
85+
_interpqueues.put(resultsid,exc)
86+
except_interpreters.NotShareableError:
87+
# The exception is not shareable.
88+
importsys
89+
importtraceback
90+
print('exception is not shareable:',file=sys.stderr)
91+
traceback.print_exception(exc)
92+
_interpqueues.put(resultsid,None)
93+
raise# re-raise
94+
95+
args= (self.resultsid,fn,*args)
96+
res,excinfo=_interpreters.call(self.interpid,do_call,args,kwargs)
11097
ifexcinfoisnotNone:
11198
raiseExecutionFailed(excinfo)
99+
returnres
100+
101+
def_get_exception(self):
102+
# Wait for the exception data to show up.
103+
whileTrue:
104+
try:
105+
excdata=_interpqueues.get(self.resultsid)
106+
except_interpqueues.QueueNotFoundError:
107+
raise# re-raise
108+
except_interpqueues.QueueErrorasexc:
109+
ifexc.__cause__isnotNoneorexc.__context__isnotNone:
110+
raise# re-raise
111+
ifstr(exc).endswith(' is empty'):
112+
continue
113+
else:
114+
raise# re-raise
115+
exceptModuleNotFoundError:
116+
# interpreters.queues doesn't exist, which means
117+
# QueueEmpty doesn't. Act as though it does.
118+
continue
119+
else:
120+
break
121+
exc,unboundop=excdata
122+
assertunboundopisNone,unboundop
123+
returnexc
112124

113125
definitialize(self):
114126
assertself.interpidisNone,self.interpid
@@ -119,8 +131,6 @@ def initialize(self):
119131
maxsize=0
120132
self.resultsid=_interpqueues.create(maxsize)
121133

122-
self._exec(f'from{__name__} import WorkerContext')
123-
124134
ifself.shared:
125135
_interpreters.set___main___attrs(
126136
self.interpid,self.shared,restrict=True)
@@ -148,37 +158,15 @@ def finalize(self):
148158
pass
149159

150160
defrun(self,task):
151-
data=task
152-
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'
153-
161+
fn,args,kwargs=task
154162
try:
155-
self._exec(script)
156-
exceptExecutionFailedasexc:
157-
exc_wrapper=exc
158-
else:
159-
exc_wrapper=None
160-
161-
# Return the result, or raise the exception.
162-
whileTrue:
163-
try:
164-
obj=_interpqueues.get(self.resultsid)
165-
except_interpqueues.QueueNotFoundError:
163+
returnself._call(fn,args,kwargs)
164+
exceptExecutionFailedaswrapper:
165+
exc=self._get_exception()
166+
ifexcisNone:
167+
# The exception must have been not shareable.
166168
raise# re-raise
167-
except_interpqueues.QueueError:
168-
continue
169-
exceptModuleNotFoundError:
170-
# interpreters.queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res,exc),unboundop=obj
176-
assertunboundopisNone,unboundop
177-
ifexcisnotNone:
178-
assertresisNone,res
179-
assertexc_wrapperisnotNone
180-
raiseexcfromexc_wrapper
181-
returnres
169+
raiseexcfromwrapper
182170

183171

184172
classBrokenInterpreterPool(_thread.BrokenThreadPool):

‎Lib/test/test_concurrent_futures/test_interpreter_pool.py

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
importcontextlib
33
importio
44
importos
5-
importpickle
5+
importselect
66
importtime
77
importunittest
88
fromconcurrent.futures.interpreterimport (
@@ -22,10 +22,14 @@ def noop():
2222

2323

2424
defwrite_msg(fd,msg):
25+
importos
2526
os.write(fd,msg+b'\0')
2627

2728

28-
defread_msg(fd):
29+
defread_msg(fd,timeout=10.0):
30+
r,_,_=select.select([fd], [], [],timeout)
31+
iffdnotinr:
32+
raiseTimeoutError('nothing to read')
2933
msg=b''
3034
whilech:=os.read(fd,1):
3135
ifch==b'\0':
@@ -121,19 +125,32 @@ def init2():
121125
nonlocalcount
122126
count+=1
123127

124-
withself.assertRaises(pickle.PicklingError):
125-
self.executor_type(initializer=init1)
126-
withself.assertRaises(pickle.PicklingError):
127-
self.executor_type(initializer=init2)
128+
withcontextlib.redirect_stderr(io.StringIO())asstderr:
129+
withself.executor_type(initializer=init1)asexecutor:
130+
fut=executor.submit(lambda:None)
131+
self.assertIn('NotShareableError',stderr.getvalue())
132+
withself.assertRaises(BrokenInterpreterPool):
133+
fut.result()
134+
135+
withcontextlib.redirect_stderr(io.StringIO())asstderr:
136+
withself.executor_type(initializer=init2)asexecutor:
137+
fut=executor.submit(lambda:None)
138+
self.assertIn('NotShareableError',stderr.getvalue())
139+
withself.assertRaises(BrokenInterpreterPool):
140+
fut.result()
128141

129142
deftest_init_instance_method(self):
130143
classSpam:
131144
definitializer(self):
132145
raiseNotImplementedError
133146
spam=Spam()
134147

135-
withself.assertRaises(pickle.PicklingError):
136-
self.executor_type(initializer=spam.initializer)
148+
withcontextlib.redirect_stderr(io.StringIO())asstderr:
149+
withself.executor_type(initializer=spam.initializer)asexecutor:
150+
fut=executor.submit(lambda:None)
151+
self.assertIn('NotShareableError',stderr.getvalue())
152+
withself.assertRaises(BrokenInterpreterPool):
153+
fut.result()
137154

138155
deftest_init_shared(self):
139156
msg=b'eggs'
@@ -178,8 +195,6 @@ def test_init_exception_in_func(self):
178195
stderr=stderr.getvalue()
179196
self.assertIn('ExecutionFailed: Exception: spam',stderr)
180197
self.assertIn('Uncaught in the interpreter:',stderr)
181-
self.assertIn('The above exception was the direct cause of the following exception:',
182-
stderr)
183198

184199
@unittest.expectedFailure
185200
deftest_submit_script(self):
@@ -208,19 +223,24 @@ def task2():
208223
returnspam
209224

210225
executor=self.executor_type()
211-
withself.assertRaises(pickle.PicklingError):
212-
executor.submit(task1)
213-
withself.assertRaises(pickle.PicklingError):
214-
executor.submit(task2)
226+
227+
fut=executor.submit(task1)
228+
withself.assertRaises(_interpreters.NotShareableError):
229+
fut.result()
230+
231+
fut=executor.submit(task2)
232+
withself.assertRaises(_interpreters.NotShareableError):
233+
fut.result()
215234

216235
deftest_submit_local_instance(self):
217236
classSpam:
218237
def__init__(self):
219238
self.value=True
220239

221240
executor=self.executor_type()
222-
withself.assertRaises(pickle.PicklingError):
223-
executor.submit(Spam)
241+
fut=executor.submit(Spam)
242+
withself.assertRaises(_interpreters.NotShareableError):
243+
fut.result()
224244

225245
deftest_submit_instance_method(self):
226246
classSpam:
@@ -229,8 +249,9 @@ def run(self):
229249
spam=Spam()
230250

231251
executor=self.executor_type()
232-
withself.assertRaises(pickle.PicklingError):
233-
executor.submit(spam.run)
252+
fut=executor.submit(spam.run)
253+
withself.assertRaises(_interpreters.NotShareableError):
254+
fut.result()
234255

235256
deftest_submit_func_globals(self):
236257
executor=self.executor_type()
@@ -242,6 +263,7 @@ def test_submit_func_globals(self):
242263

243264
@unittest.expectedFailure
244265
deftest_submit_exception_in_script(self):
266+
# Scripts are not supported currently.
245267
fut=self.executor.submit('raise Exception("spam")')
246268
withself.assertRaises(Exception)ascaptured:
247269
fut.result()
@@ -289,13 +311,20 @@ def test_idle_thread_reuse(self):
289311
executor.shutdown(wait=True)
290312

291313
deftest_pickle_errors_propagate(self):
292-
# GH-125864: Pickle errors happen before the script tries to execute, so the
293-
# queue used to wait infinitely.
294-
314+
# GH-125864: Pickle errors happen before the script tries to execute,
315+
# so the queue used to wait infinitely.
295316
fut=self.executor.submit(PickleShenanigans(0))
296317
withself.assertRaisesRegex(RuntimeError,"gotcha"):
297318
fut.result()
298319

320+
deftest_no_stale_references(self):
321+
# Weak references don't cross between interpreters.
322+
raiseunittest.SkipTest('not applicable')
323+
324+
deftest_free_reference(self):
325+
# Weak references don't cross between interpreters.
326+
raiseunittest.SkipTest('not applicable')
327+
299328

300329
classAsyncioTest(InterpretersMixin,testasyncio_utils.TestCase):
301330

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp