Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-132775: Use _PyObject_GetXIData (With Fallback)#134440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 13 additions & 42 deletionsLib/concurrent/futures/interpreter.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -36,9 +36,6 @@ def __str__(self):
""".strip())


UNBOUND=2# error; this should not happen.


classWorkerContext(_thread.WorkerContext):

@classmethod
Expand All@@ -47,23 +44,13 @@ def resolve_task(fn, args, kwargs):
ifisinstance(fn,str):
# XXX Circle back to this later.
raiseTypeError('scripts not supported')
ifargsorkwargs:
raiseValueError(f'a script does not take args or kwargs, got{args!r} and{kwargs!r}')
data=textwrap.dedent(fn)
kind='script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data,'<string>','exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data=pickle.dumps((fn,args,kwargs))
kind='function'
return(data,kind)
task=(fn,args,kwargs)
data=pickle.dumps(task)
returndata

ifinitializerisnotNone:
try:
Expand All@@ -86,24 +73,20 @@ def _capture_exc(cls, resultsid):
exceptBaseExceptionasexc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err=pickle.dumps(exc)
_interpqueues.put(resultsid, (None,err),1,UNBOUND)
_interpqueues.put(resultsid, (None,exc))
raise# re-raise

@classmethod
def_send_script_result(cls,resultsid):
_interpqueues.put(resultsid, (None,None),0,UNBOUND)
_interpqueues.put(resultsid, (None,None))

@classmethod
def_call(cls,func,args,kwargs,resultsid):
withcls._capture_exc(resultsid):
res=func(*argsor (),**kwargsor {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res,None),0,UNBOUND)
except_interpreters.NotShareableError:
res=pickle.dumps(res)
_interpqueues.put(resultsid, (res,None),1,UNBOUND)
withcls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res,None))

@classmethod
def_call_pickled(cls,pickled,resultsid):
Expand DownExpand Up@@ -134,8 +117,7 @@ def initialize(self):
_interpreters.incref(self.interpid)

maxsize=0
fmt=0
self.resultsid=_interpqueues.create(maxsize,fmt,UNBOUND)
self.resultsid=_interpqueues.create(maxsize)

self._exec(f'from{__name__} import WorkerContext')

Expand DownExpand Up@@ -166,17 +148,8 @@ def finalize(self):
pass

defrun(self,task):
data,kind=task
ifkind=='script':
raiseNotImplementedError('script kind disabled')
script=f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data,' ')}
WorkerContext._send_script_result({self.resultsid})"""
elifkind=='function':
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'
else:
raiseNotImplementedError(kind)
data=task
script=f'WorkerContext._call_pickled({data!r},{self.resultsid})'

try:
self._exec(script)
Expand All@@ -199,15 +172,13 @@ def run(self, task):
continue
else:
break
(res,excdata),pickled,unboundop=obj
(res,exc),unboundop=obj
assertunboundopisNone,unboundop
ifexcdataisnotNone:
ifexcisnotNone:
assertresisNone,res
assertpickled
assertexc_wrapperisnotNone
exc=pickle.loads(excdata)
raiseexcfromexc_wrapper
returnpickle.loads(res)ifpickledelseres
returnres


classBrokenInterpreterPool(_thread.BrokenThreadPool):
Expand Down
85 changes: 55 additions & 30 deletionsLib/test/support/interpreters/channels.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
"""
unbound=_serialize_unbound(unbounditems)
unboundop,=unbound
cid=_channels.create(unboundop)
recv,send=RecvChannel(cid),SendChannel(cid,_unbound=unbound)
cid=_channels.create(unboundop,-1)
recv,send=RecvChannel(cid),SendChannel(cid)
send._set_unbound(unboundop,unbounditems)
returnrecv,send


deflist_all():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid),SendChannel(cid,_unbound=unbound))
forcid,unboundin_channels.list_all()]
channels= []
forcid,unboundop,_in_channels.list_all():
chan=_,send=RecvChannel(cid),SendChannel(cid)
ifnothasattr(send,'_unboundop'):
send._set_unbound(unboundop)
else:
assertsend._unbound[0]==op
channels.append(chan)
returnchannels


class_ChannelEnd:
Expand DownExpand Up@@ -175,78 +183,95 @@ class SendChannel(_ChannelEnd):

_end='send'

def__new__(cls,cid,*,_unbound=None):
if_unboundisNone:
try:
op=_channels.get_channel_defaults(cid)
_unbound= (op,)
exceptChannelNotFoundError:
_unbound=_serialize_unbound(UNBOUND)
self=super().__new__(cls,cid)
self._unbound=_unbound
returnself
# def __new__(cls, cid, *, _unbound=None):
# if _unbound is None:
# try:
# op = _channels.get_channel_defaults(cid)
# _unbound = (op,)
# except ChannelNotFoundError:
# _unbound = _serialize_unbound(UNBOUND)
# self = super().__new__(cls, cid)
# self._unbound = _unbound
# return self

def_set_unbound(self,op,items=None):
assertnothasattr(self,'_unbound')
ifitemsisNone:
items=_resolve_unbound(op)
unbound= (op,items)
self._unbound=unbound
returnunbound

@property
defunbounditems(self):
try:
_,items=self._unbound
exceptAttributeError:
op,_=_channels.get_queue_defaults(self._id)
_,items=self._set_unbound(op)
returnitems

@property
defis_closed(self):
info=self._info
returninfo.closedorinfo.closing

defsend(self,obj,timeout=None,*,
unbound=None,
unbounditems=None,
):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
ifunboundisNone:
unboundop,=self._unbound
ifunbounditemsisNone:
unboundop=-1
else:
unboundop,=_serialize_unbound(unbound)
unboundop,=_serialize_unbound(unbounditems)
_channels.send(self._id,obj,unboundop,timeout=timeout,blocking=True)

defsend_nowait(self,obj,*,
unbound=None,
unbounditems=None,
):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
ifunboundisNone:
unboundop,=self._unbound
ifunbounditemsisNone:
unboundop=-1
else:
unboundop,=_serialize_unbound(unbound)
unboundop,=_serialize_unbound(unbounditems)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return_channels.send(self._id,obj,unboundop,blocking=False)

defsend_buffer(self,obj,timeout=None,*,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
ifunboundisNone:
unboundop,=self._unbound
ifunbounditemsisNone:
unboundop=-1
else:
unboundop,=_serialize_unbound(unbound)
unboundop,=_serialize_unbound(unbounditems)
_channels.send_buffer(self._id,obj,unboundop,
timeout=timeout,blocking=True)

defsend_buffer_nowait(self,obj,*,
unbound=None,
unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
ifunboundisNone:
unboundop,=self._unbound
ifunbounditemsisNone:
unboundop=-1
else:
unboundop,=_serialize_unbound(unbound)
unboundop,=_serialize_unbound(unbounditems)
return_channels.send_buffer(self._id,obj,unboundop,blocking=False)

defclose(self):
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp