@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
45
45
# XXX Circle back to this later.
46
46
raise TypeError ('scripts not supported' )
47
47
else :
48
- # Functions defined in the __main__ module can't be pickled,
49
- # so they can't be used here. In the future, we could possibly
50
- # borrow from multiprocessing to work around this.
51
48
task = (fn ,args ,kwargs )
52
- data = pickle .dumps (task )
53
- return data
49
+ return task
54
50
55
51
if initializer is not None :
56
52
try :
@@ -65,35 +61,6 @@ def create_context():
65
61
return cls (initdata ,shared )
66
62
return create_context ,resolve_task
67
63
68
- @classmethod
69
- @contextlib .contextmanager
70
- def _capture_exc (cls ,resultsid ):
71
- try :
72
- yield
73
- except BaseException as exc :
74
- # Send the captured exception out on the results queue,
75
- # but still leave it unhandled for the interpreter to handle.
76
- _interpqueues .put (resultsid , (None ,exc ))
77
- raise # re-raise
78
-
79
- @classmethod
80
- def _send_script_result (cls ,resultsid ):
81
- _interpqueues .put (resultsid , (None ,None ))
82
-
83
- @classmethod
84
- def _call (cls ,func ,args ,kwargs ,resultsid ):
85
- with cls ._capture_exc (resultsid ):
86
- res = func (* args or (),** kwargs or {})
87
- # Send the result back.
88
- with cls ._capture_exc (resultsid ):
89
- _interpqueues .put (resultsid , (res ,None ))
90
-
91
- @classmethod
92
- def _call_pickled (cls ,pickled ,resultsid ):
93
- with cls ._capture_exc (resultsid ):
94
- fn ,args ,kwargs = pickle .loads (pickled )
95
- cls ._call (fn ,args ,kwargs ,resultsid )
96
-
97
64
def __init__ (self ,initdata ,shared = None ):
98
65
self .initdata = initdata
99
66
self .shared = dict (shared )if shared else None
@@ -104,11 +71,42 @@ def __del__(self):
104
71
if self .interpid is not None :
105
72
self .finalize ()
106
73
107
- def _exec (self ,script ):
108
- assert self .interpid is not None
109
- excinfo = _interpreters .exec (self .interpid ,script ,restrict = True )
74
+ def _call (self ,fn ,args ,kwargs ):
75
+ def do_call (resultsid ,func ,* args ,** kwargs ):
76
+ try :
77
+ return func (* args ,** kwargs )
78
+ except BaseException as exc :
79
+ # Avoid relying on globals.
80
+ import _interpqueues
81
+ # Send the captured exception out on the results queue,
82
+ # but still leave it unhandled for the interpreter to handle.
83
+ _interpqueues .put (resultsid ,err )
84
+ raise # re-raise
85
+
86
+ args = (self .resultsid ,fn ,* args )
87
+ res ,excinfo = _interpreters .call (self .interpid ,do_call ,args ,kwargs )
110
88
if excinfo is not None :
111
89
raise ExecutionFailed (excinfo )
90
+ return res
91
+
92
+ def _get_exception (self ):
93
+ # Wait for the exception data to show up.
94
+ while True :
95
+ try :
96
+ excdata = _interpqueues .get (self .resultsid )
97
+ except _interpqueues .QueueNotFoundError :
98
+ raise # re-raise
99
+ except _interpqueues .QueueError :
100
+ continue
101
+ except ModuleNotFoundError :
102
+ # interpreters.queues doesn't exist, which means
103
+ # QueueEmpty doesn't. Act as though it does.
104
+ continue
105
+ else :
106
+ break
107
+ exc ,unboundop = excdata
108
+ assert unboundop is None ,unboundop
109
+ return exc
112
110
113
111
def initialize (self ):
114
112
assert self .interpid is None ,self .interpid
@@ -148,37 +146,12 @@ def finalize(self):
148
146
pass
149
147
150
148
def run (self ,task ):
151
- data = task
152
- script = f'WorkerContext._call_pickled({ data !r} ,{ self .resultsid } )'
153
-
149
+ fn ,args ,kwargs = task
154
150
try :
155
- self ._exec (script )
156
- except ExecutionFailed as exc :
157
- exc_wrapper = exc
158
- else :
159
- exc_wrapper = None
160
-
161
- # Return the result, or raise the exception.
162
- while True :
163
- try :
164
- obj = _interpqueues .get (self .resultsid )
165
- except _interpqueues .QueueNotFoundError :
166
- raise # re-raise
167
- except _interpqueues .QueueError :
168
- continue
169
- except ModuleNotFoundError :
170
- # interpreters.queues doesn't exist, which means
171
- # QueueEmpty doesn't. Act as though it does.
172
- continue
173
- else :
174
- break
175
- (res ,exc ),unboundop = obj
176
- assert unboundop is None ,unboundop
177
- if exc is not None :
178
- assert res is None ,res
179
- assert exc_wrapper is not None
180
- raise exc from exc_wrapper
181
- return res
151
+ return self ._call (fn ,args ,kwargs )
152
+ except ExecutionFailed as wrapper :
153
+ exc = self ._get_exception ()
154
+ raise exc from wrapper
182
155
183
156
184
157
class BrokenInterpreterPool (_thread .BrokenThreadPool ):