Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitac2d3d1

Browse files
Use _PyObject_GetXIDataWithFallback().
1 parenta66bae8 commitac2d3d1

File tree

10 files changed

+486
-438
lines changed

10 files changed

+486
-438
lines changed

‎Lib/concurrent/futures/interpreter.py

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ def __str__(self):
3636
""".strip())
3737

3838

39-
UNBOUND=2# error; this should not happen.
40-
41-
4239
classWorkerContext(_thread.WorkerContext):
4340

4441
@classmethod
@@ -47,23 +44,13 @@ def resolve_task(fn, args, kwargs):
4744
ifisinstance(fn,str):
4845
# XXX Circle back to this later.
4946
raiseTypeError('scripts not supported')
50-
ifargsorkwargs:
51-
raiseValueError(f'a script does not take args or kwargs, got{args!r} and{kwargs!r}')
52-
data=textwrap.dedent(fn)
53-
kind='script'
54-
# Make sure the script compiles.
55-
# Ideally we wouldn't throw away the resulting code
56-
# object. However, there isn't much to be done until
57-
# code objects are shareable and/or we do a better job
58-
# of supporting code objects in _interpreters.exec().
59-
compile(data,'<string>','exec')
6047
else:
6148
# Functions defined in the __main__ module can't be pickled,
6249
# so they can't be used here. In the future, we could possibly
6350
# borrow from multiprocessing to work around this.
64-
data=pickle.dumps((fn,args,kwargs))
65-
kind='function'
66-
return(data,kind)
51+
task=(fn,args,kwargs)
52+
data=pickle.dumps(task)
53+
returndata
6754

6855
ifinitializerisnotNone:
6956
try:
@@ -86,24 +73,20 @@ def _capture_exc(cls, resultsid):
8673
exceptBaseExceptionasexc:
8774
# Send the captured exception out on the results queue,
8875
# but still leave it unhandled for the interpreter to handle.
89-
err=pickle.dumps(exc)
90-
_interpqueues.put(resultsid, (None,err),1,UNBOUND)
76+
_interpqueues.put(resultsid, (None,exc))
9177
raise# re-raise
9278

9379
@classmethod
9480
def_send_script_result(cls,resultsid):
95-
_interpqueues.put(resultsid, (None,None),0,UNBOUND)
81+
_interpqueues.put(resultsid, (None,None))
9682

9783
@classmethod
9884
def_call(cls,func,args,kwargs,resultsid):
9985
withcls._capture_exc(resultsid):
10086
res=func(*argsor (),**kwargsor {})
10187
# Send the result back.
102-
try:
103-
_interpqueues.put(resultsid, (res,None),0,UNBOUND)
104-
except_interpreters.NotShareableError:
105-
res=pickle.dumps(res)
106-
_interpqueues.put(resultsid, (res,None),1,UNBOUND)
88+
withcls._capture_exc(resultsid):
89+
_interpqueues.put(resultsid, (res,None))
10790

10891
@classmethod
10992
def_call_pickled(cls,pickled,resultsid):
@@ -134,8 +117,7 @@ def initialize(self):
134117
_interpreters.incref(self.interpid)
135118

136119
maxsize=0
137-
fmt=0
138-
self.resultsid=_interpqueues.create(maxsize,fmt,UNBOUND)
120+
self.resultsid=_interpqueues.create(maxsize)
139121

140122
self._exec(f'from{__name__} import WorkerContext')
141123

@@ -166,17 +148,8 @@ def finalize(self):
166148
pass
167149

168150
defrun(self,task):
169-
data,kind=task
170-
ifkind=='script':
171-
raiseNotImplementedError('script kind disabled')
172-
script=f"""
173-
with WorkerContext._capture_exc({self.resultsid}):
174-
{textwrap.indent(data,' ')}
175-
WorkerContext._send_script_result({self.resultsid})"""
176-
elifkind=='function':
177-
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'
178-
else:
179-
raiseNotImplementedError(kind)
151+
data=task
152+
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'
180153

181154
try:
182155
self._exec(script)
@@ -199,15 +172,13 @@ def run(self, task):
199172
continue
200173
else:
201174
break
202-
(res,excdata),pickled,unboundop=obj
175+
(res,exc),unboundop=obj
203176
assertunboundopisNone,unboundop
204-
ifexcdataisnotNone:
177+
ifexcisnotNone:
205178
assertresisNone,res
206-
assertpickled
207179
assertexc_wrapperisnotNone
208-
exc=pickle.loads(excdata)
209180
raiseexcfromexc_wrapper
210-
returnpickle.loads(res)ifpickledelseres
181+
returnres
211182

212183

213184
classBrokenInterpreterPool(_thread.BrokenThreadPool):

‎Lib/test/support/interpreters/channels.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
5555
"""
5656
unbound=_serialize_unbound(unbounditems)
5757
unboundop,=unbound
58-
cid=_channels.create(unboundop)
59-
recv,send=RecvChannel(cid),SendChannel(cid,_unbound=unbound)
58+
cid=_channels.create(unboundop,-1)
59+
recv,send=RecvChannel(cid),SendChannel(cid)
60+
send._set_unbound(unboundop,unbounditems)
6061
returnrecv,send
6162

6263

6364
deflist_all():
6465
"""Return a list of (recv, send) for all open channels."""
65-
return [(RecvChannel(cid),SendChannel(cid,_unbound=unbound))
66-
forcid,unboundin_channels.list_all()]
66+
channels= []
67+
forcid,unboundop,_in_channels.list_all():
68+
chan=_,send=RecvChannel(cid),SendChannel(cid)
69+
ifnothasattr(send,'_unboundop'):
70+
send._set_unbound(unboundop)
71+
else:
72+
assertsend._unbound[0]==op
73+
channels.append(chan)
74+
returnchannels
6775

6876

6977
class_ChannelEnd:
@@ -175,78 +183,95 @@ class SendChannel(_ChannelEnd):
175183

176184
_end='send'
177185

178-
def__new__(cls,cid,*,_unbound=None):
179-
if_unboundisNone:
180-
try:
181-
op=_channels.get_channel_defaults(cid)
182-
_unbound= (op,)
183-
exceptChannelNotFoundError:
184-
_unbound=_serialize_unbound(UNBOUND)
185-
self=super().__new__(cls,cid)
186-
self._unbound=_unbound
187-
returnself
186+
# def __new__(cls, cid, *, _unbound=None):
187+
# if _unbound is None:
188+
# try:
189+
# op = _channels.get_channel_defaults(cid)
190+
# _unbound = (op,)
191+
# except ChannelNotFoundError:
192+
# _unbound = _serialize_unbound(UNBOUND)
193+
# self = super().__new__(cls, cid)
194+
# self._unbound = _unbound
195+
# return self
196+
197+
def_set_unbound(self,op,items=None):
198+
assertnothasattr(self,'_unbound')
199+
ifitemsisNone:
200+
items=_resolve_unbound(op)
201+
unbound= (op,items)
202+
self._unbound=unbound
203+
returnunbound
204+
205+
@property
206+
defunbounditems(self):
207+
try:
208+
_,items=self._unbound
209+
exceptAttributeError:
210+
op,_=_channels.get_queue_defaults(self._id)
211+
_,items=self._set_unbound(op)
212+
returnitems
188213

189214
@property
190215
defis_closed(self):
191216
info=self._info
192217
returninfo.closedorinfo.closing
193218

194219
defsend(self,obj,timeout=None,*,
195-
unbound=None,
220+
unbounditems=None,
196221
):
197222
"""Send the object (i.e. its data) to the channel's receiving end.
198223
199224
This blocks until the object is received.
200225
"""
201-
ifunboundisNone:
202-
unboundop,=self._unbound
226+
ifunbounditemsisNone:
227+
unboundop=-1
203228
else:
204-
unboundop,=_serialize_unbound(unbound)
229+
unboundop,=_serialize_unbound(unbounditems)
205230
_channels.send(self._id,obj,unboundop,timeout=timeout,blocking=True)
206231

207232
defsend_nowait(self,obj,*,
208-
unbound=None,
233+
unbounditems=None,
209234
):
210235
"""Send the object to the channel's receiving end.
211236
212237
If the object is immediately received then return True
213238
(else False). Otherwise this is the same as send().
214239
"""
215-
ifunboundisNone:
216-
unboundop,=self._unbound
240+
ifunbounditemsisNone:
241+
unboundop=-1
217242
else:
218-
unboundop,=_serialize_unbound(unbound)
243+
unboundop,=_serialize_unbound(unbounditems)
219244
# XXX Note that at the moment channel_send() only ever returns
220245
# None. This should be fixed when channel_send_wait() is added.
221246
# See bpo-32604 and gh-19829.
222247
return_channels.send(self._id,obj,unboundop,blocking=False)
223248

224249
defsend_buffer(self,obj,timeout=None,*,
225-
unbound=None,
250+
unbounditems=None,
226251
):
227252
"""Send the object's buffer to the channel's receiving end.
228253
229254
This blocks until the object is received.
230255
"""
231-
ifunboundisNone:
232-
unboundop,=self._unbound
256+
ifunbounditemsisNone:
257+
unboundop=-1
233258
else:
234-
unboundop,=_serialize_unbound(unbound)
259+
unboundop,=_serialize_unbound(unbounditems)
235260
_channels.send_buffer(self._id,obj,unboundop,
236261
timeout=timeout,blocking=True)
237262

238263
defsend_buffer_nowait(self,obj,*,
239-
unbound=None,
264+
unbounditems=None,
240265
):
241266
"""Send the object's buffer to the channel's receiving end.
242267
243268
If the object is immediately received then return True
244269
(else False). Otherwise this is the same as send().
245270
"""
246-
ifunboundisNone:
247-
unboundop,=self._unbound
271+
ifunbounditemsisNone:
272+
unboundop=-1
248273
else:
249-
unboundop,=_serialize_unbound(unbound)
274+
unboundop,=_serialize_unbound(unbounditems)
250275
return_channels.send_buffer(self._id,obj,unboundop,blocking=False)
251276

252277
defclose(self):

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp