@@ -45,12 +45,8 @@ def resolve_task(fn, args, kwargs):
45
45
# XXX Circle back to this later.
46
46
raise TypeError ('scripts not supported' )
47
47
else :
48
- # Functions defined in the __main__ module can't be pickled,
49
- # so they can't be used here. In the future, we could possibly
50
- # borrow from multiprocessing to work around this.
51
48
task = (fn ,args ,kwargs )
52
- data = pickle .dumps (task )
53
- return data
49
+ return task
54
50
55
51
if initializer is not None :
56
52
try :
@@ -65,35 +61,6 @@ def create_context():
65
61
return cls (initdata ,shared )
66
62
return create_context ,resolve_task
67
63
68
- @classmethod
69
- @contextlib .contextmanager
70
- def _capture_exc (cls ,resultsid ):
71
- try :
72
- yield
73
- except BaseException as exc :
74
- # Send the captured exception out on the results queue,
75
- # but still leave it unhandled for the interpreter to handle.
76
- _interpqueues .put (resultsid , (None ,exc ))
77
- raise # re-raise
78
-
79
- @classmethod
80
- def _send_script_result (cls ,resultsid ):
81
- _interpqueues .put (resultsid , (None ,None ))
82
-
83
- @classmethod
84
- def _call (cls ,func ,args ,kwargs ,resultsid ):
85
- with cls ._capture_exc (resultsid ):
86
- res = func (* args or (),** kwargs or {})
87
- # Send the result back.
88
- with cls ._capture_exc (resultsid ):
89
- _interpqueues .put (resultsid , (res ,None ))
90
-
91
- @classmethod
92
- def _call_pickled (cls ,pickled ,resultsid ):
93
- with cls ._capture_exc (resultsid ):
94
- fn ,args ,kwargs = pickle .loads (pickled )
95
- cls ._call (fn ,args ,kwargs ,resultsid )
96
-
97
64
def __init__ (self ,initdata ,shared = None ):
98
65
self .initdata = initdata
99
66
self .shared = dict (shared )if shared else None
@@ -104,11 +71,46 @@ def __del__(self):
104
71
if self .interpid is not None :
105
72
self .finalize ()
106
73
107
- def _exec (self ,script ):
108
- assert self .interpid is not None
109
- excinfo = _interpreters .exec (self .interpid ,script ,restrict = True )
74
+ def _call (self ,fn ,args ,kwargs ):
75
+ def do_call (resultsid ,func ,* args ,** kwargs ):
76
+ try :
77
+ return func (* args ,** kwargs )
78
+ except BaseException as exc :
79
+ # Avoid relying on globals.
80
+ import _interpqueues
81
+ # Send the captured exception out on the results queue,
82
+ # but still leave it unhandled for the interpreter to handle.
83
+ try :
84
+ _interpqueues .put (resultsid ,exc )
85
+ except TypeError :
86
+ # The exception is not shareable.
87
+ _interpqueues .put (resultsid ,None )
88
+ raise # re-raise
89
+
90
+ args = (self .resultsid ,fn ,* args )
91
+ res ,excinfo = _interpreters .call (self .interpid ,do_call ,args ,kwargs )
110
92
if excinfo is not None :
111
93
raise ExecutionFailed (excinfo )
94
+ return res
95
+
96
+ def _get_exception (self ):
97
+ # Wait for the exception data to show up.
98
+ while True :
99
+ try :
100
+ excdata = _interpqueues .get (self .resultsid )
101
+ except _interpqueues .QueueNotFoundError :
102
+ raise # re-raise
103
+ except _interpqueues .QueueError :
104
+ continue
105
+ except ModuleNotFoundError :
106
+ # interpreters.queues doesn't exist, which means
107
+ # QueueEmpty doesn't. Act as though it does.
108
+ continue
109
+ else :
110
+ break
111
+ exc ,unboundop = excdata
112
+ assert unboundop is None ,unboundop
113
+ return exc
112
114
113
115
def initialize (self ):
114
116
assert self .interpid is None ,self .interpid
@@ -119,8 +121,6 @@ def initialize(self):
119
121
maxsize = 0
120
122
self .resultsid = _interpqueues .create (maxsize )
121
123
122
- self ._exec (f'from{ __name__ } import WorkerContext' )
123
-
124
124
if self .shared :
125
125
_interpreters .set___main___attrs (
126
126
self .interpid ,self .shared ,restrict = True )
@@ -148,37 +148,15 @@ def finalize(self):
148
148
pass
149
149
150
150
def run (self ,task ):
151
- data = task
152
- script = f'WorkerContext._call_pickled({ data !r} ,{ self .resultsid } )'
153
-
151
+ fn ,args ,kwargs = task
154
152
try :
155
- self ._exec (script )
156
- except ExecutionFailed as exc :
157
- exc_wrapper = exc
158
- else :
159
- exc_wrapper = None
160
-
161
- # Return the result, or raise the exception.
162
- while True :
163
- try :
164
- obj = _interpqueues .get (self .resultsid )
165
- except _interpqueues .QueueNotFoundError :
153
+ return self ._call (fn ,args ,kwargs )
154
+ except ExecutionFailed as wrapper :
155
+ exc = self ._get_exception ()
156
+ if exc is None :
157
+ # The exception must be not shareable.
166
158
raise # re-raise
167
- except _interpqueues .QueueError :
168
- continue
169
- except ModuleNotFoundError :
170
- # interpreters.queues doesn't exist, which means
171
- # QueueEmpty doesn't. Act as though it does.
172
- continue
173
- else :
174
- break
175
- (res ,exc ),unboundop = obj
176
- assert unboundop is None ,unboundop
177
- if exc is not None :
178
- assert res is None ,res
179
- assert exc_wrapper is not None
180
- raise exc from exc_wrapper
181
- return res
159
+ raise exc from wrapper
182
160
183
161
184
162
class BrokenInterpreterPool (_thread .BrokenThreadPool ):