Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc58c63f

Browse files
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
1 parent7029c1a commitc58c63f

File tree

8 files changed

+202
-47
lines changed

8 files changed

+202
-47
lines changed

‎Include/internal/pycore_pythread.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
8989
// unset: -1 seconds, in nanoseconds
9090
#definePyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000))
9191

92+
// Exported for the _xxinterpchannels module.
93+
PyAPI_FUNC(int)PyThread_ParseTimeoutArg(
94+
PyObject*arg,
95+
intblocking,
96+
PY_TIMEOUT_T*timeout);
97+
9298
/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
9399
* is interrupted, signal handlers are run, and if they raise an exception,
94100
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE

‎Lib/test/support/interpreters.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,25 @@ class RecvChannel(_ChannelEnd):
170170

171171
_end='recv'
172172

173-
defrecv(self,*,_sentinel=object(),_delay=10/1000):# 10 milliseconds
173+
defrecv(self,timeout=None,*,
174+
_sentinel=object(),
175+
_delay=10/1000,# 10 milliseconds
176+
):
174177
"""Return the next object from the channel.
175178
176179
This blocks until an object has been sent, if none have been
177180
sent already.
178181
"""
182+
iftimeoutisnotNone:
183+
timeout=int(timeout)
184+
iftimeout<0:
185+
raiseValueError(f'timeout value must be non-negative')
186+
end=time.time()+timeout
179187
obj=_channels.recv(self._id,_sentinel)
180188
whileobjis_sentinel:
181189
time.sleep(_delay)
190+
iftimeoutisnotNoneandtime.time()>=end:
191+
raiseTimeoutError
182192
obj=_channels.recv(self._id,_sentinel)
183193
returnobj
184194

@@ -203,12 +213,12 @@ class SendChannel(_ChannelEnd):
203213

204214
_end='send'
205215

206-
defsend(self,obj):
216+
defsend(self,obj,timeout=None):
207217
"""Send the object (i.e. its data) to the channel's receiving end.
208218
209219
This blocks until the object is received.
210220
"""
211-
_channels.send(self._id,obj,blocking=True)
221+
_channels.send(self._id,obj,timeout=timeout,blocking=True)
212222

213223
defsend_nowait(self,obj):
214224
"""Send the object to the channel's receiving end.
@@ -221,12 +231,12 @@ def send_nowait(self, obj):
221231
# See bpo-32604 and gh-19829.
222232
return_channels.send(self._id,obj,blocking=False)
223233

224-
defsend_buffer(self,obj):
234+
defsend_buffer(self,obj,timeout=None):
225235
"""Send the object's buffer to the channel's receiving end.
226236
227237
This blocks until the object is received.
228238
"""
229-
_channels.send_buffer(self._id,obj,blocking=True)
239+
_channels.send_buffer(self._id,obj,timeout=timeout,blocking=True)
230240

231241
defsend_buffer_nowait(self,obj):
232242
"""Send the object's buffer to the channel's receiving end.

‎Lib/test/test__xxinterpchannels.py

Lines changed: 108 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -864,22 +864,97 @@ def f():
864864

865865
self.assertEqual(received,obj)
866866

867+
deftest_send_timeout(self):
868+
obj=b'spam'
869+
870+
withself.subTest('non-blocking with timeout'):
871+
cid=channels.create()
872+
withself.assertRaises(ValueError):
873+
channels.send(cid,obj,blocking=False,timeout=0.1)
874+
875+
withself.subTest('timeout hit'):
876+
cid=channels.create()
877+
withself.assertRaises(TimeoutError):
878+
channels.send(cid,obj,blocking=True,timeout=0.1)
879+
withself.assertRaises(channels.ChannelEmptyError):
880+
received=channels.recv(cid)
881+
print(repr(received))
882+
883+
withself.subTest('timeout not hit'):
884+
cid=channels.create()
885+
deff():
886+
recv_wait(cid)
887+
t=threading.Thread(target=f)
888+
t.start()
889+
channels.send(cid,obj,blocking=True,timeout=10)
890+
t.join()
891+
892+
deftest_send_buffer_timeout(self):
893+
try:
894+
self._has_run_once_timeout
895+
exceptAttributeError:
896+
# At the moment, this test leaks a few references.
897+
# It looks like the leak originates with the addition
898+
# of _channels.send_buffer() (gh-110246), whereas the
899+
# tests were added afterward. We want this test even
900+
# if the refleak isn't fixed yet, so we skip here.
901+
raiseunittest.SkipTest('temporarily skipped due to refleaks')
902+
else:
903+
self._has_run_once_timeout=True
904+
905+
obj=bytearray(b'spam')
906+
907+
withself.subTest('non-blocking with timeout'):
908+
cid=channels.create()
909+
withself.assertRaises(ValueError):
910+
channels.send_buffer(cid,obj,blocking=False,timeout=0.1)
911+
912+
withself.subTest('timeout hit'):
913+
cid=channels.create()
914+
withself.assertRaises(TimeoutError):
915+
channels.send_buffer(cid,obj,blocking=True,timeout=0.1)
916+
withself.assertRaises(channels.ChannelEmptyError):
917+
received=channels.recv(cid)
918+
print(repr(received))
919+
920+
withself.subTest('timeout not hit'):
921+
cid=channels.create()
922+
deff():
923+
recv_wait(cid)
924+
t=threading.Thread(target=f)
925+
t.start()
926+
channels.send_buffer(cid,obj,blocking=True,timeout=10)
927+
t.join()
928+
867929
deftest_send_closed_while_waiting(self):
868930
obj=b'spam'
869931
wait=self.build_send_waiter(obj)
870-
cid=channels.create()
871-
deff():
872-
wait()
873-
channels.close(cid,force=True)
874-
t=threading.Thread(target=f)
875-
t.start()
876-
withself.assertRaises(channels.ChannelClosedError):
877-
channels.send(cid,obj,blocking=True)
878-
t.join()
932+
933+
withself.subTest('without timeout'):
934+
cid=channels.create()
935+
deff():
936+
wait()
937+
channels.close(cid,force=True)
938+
t=threading.Thread(target=f)
939+
t.start()
940+
withself.assertRaises(channels.ChannelClosedError):
941+
channels.send(cid,obj,blocking=True)
942+
t.join()
943+
944+
withself.subTest('with timeout'):
945+
cid=channels.create()
946+
deff():
947+
wait()
948+
channels.close(cid,force=True)
949+
t=threading.Thread(target=f)
950+
t.start()
951+
withself.assertRaises(channels.ChannelClosedError):
952+
channels.send(cid,obj,blocking=True,timeout=30)
953+
t.join()
879954

880955
deftest_send_buffer_closed_while_waiting(self):
881956
try:
882-
self._has_run_once
957+
self._has_run_once_closed
883958
exceptAttributeError:
884959
# At the moment, this test leaks a few references.
885960
# It looks like the leak originates with the addition
@@ -888,19 +963,32 @@ def test_send_buffer_closed_while_waiting(self):
888963
# if the refleak isn't fixed yet, so we skip here.
889964
raiseunittest.SkipTest('temporarily skipped due to refleaks')
890965
else:
891-
self._has_run_once=True
966+
self._has_run_once_closed=True
892967

893968
obj=bytearray(b'spam')
894969
wait=self.build_send_waiter(obj,buffer=True)
895-
cid=channels.create()
896-
deff():
897-
wait()
898-
channels.close(cid,force=True)
899-
t=threading.Thread(target=f)
900-
t.start()
901-
withself.assertRaises(channels.ChannelClosedError):
902-
channels.send_buffer(cid,obj,blocking=True)
903-
t.join()
970+
971+
withself.subTest('without timeout'):
972+
cid=channels.create()
973+
deff():
974+
wait()
975+
channels.close(cid,force=True)
976+
t=threading.Thread(target=f)
977+
t.start()
978+
withself.assertRaises(channels.ChannelClosedError):
979+
channels.send_buffer(cid,obj,blocking=True)
980+
t.join()
981+
982+
withself.subTest('with timeout'):
983+
cid=channels.create()
984+
deff():
985+
wait()
986+
channels.close(cid,force=True)
987+
t=threading.Thread(target=f)
988+
t.start()
989+
withself.assertRaises(channels.ChannelClosedError):
990+
channels.send_buffer(cid,obj,blocking=True,timeout=30)
991+
t.join()
904992

905993
#-------------------
906994
# close

‎Lib/test/test_interpreters.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,11 @@ def test_send_recv_nowait_different_interpreters(self):
10221022
self.assertEqual(obj2,b'eggs')
10231023
self.assertNotEqual(id(obj2),int(out))
10241024

1025+
deftest_recv_timeout(self):
1026+
r,_=interpreters.create_channel()
1027+
withself.assertRaises(TimeoutError):
1028+
r.recv(timeout=1)
1029+
10251030
deftest_recv_channel_does_not_exist(self):
10261031
ch=interpreters.RecvChannel(1_000_000)
10271032
withself.assertRaises(interpreters.ChannelNotFoundError):

‎Modules/_queuemodule.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
214214
PY_TIMEOUT_Tmicroseconds;
215215
PyThreadState*tstate=PyThreadState_Get();
216216

217+
// XXX Use PyThread_ParseTimeoutArg().
218+
217219
if (block==0) {
218220
/* Non-blocking */
219221
microseconds=0;

‎Modules/_threadmodule.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,15 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
8888
char*kwlist[]= {"blocking","timeout",NULL};
8989
intblocking=1;
9090
PyObject*timeout_obj=NULL;
91-
const_PyTime_tunset_timeout=_PyTime_FromSeconds(-1);
92-
93-
*timeout=unset_timeout ;
94-
9591
if (!PyArg_ParseTupleAndKeywords(args,kwds,"|pO:acquire",kwlist,
9692
&blocking,&timeout_obj))
9793
return-1;
9894

95+
// XXX Use PyThread_ParseTimeoutArg().
96+
97+
const_PyTime_tunset_timeout=_PyTime_FromSeconds(-1);
98+
*timeout=unset_timeout;
99+
99100
if (timeout_obj
100101
&&_PyTime_FromSecondsObject(timeout,
101102
timeout_obj,_PyTime_ROUND_TIMEOUT)<0)
@@ -108,7 +109,7 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
108109
}
109110
if (*timeout<0&&*timeout!=unset_timeout) {
110111
PyErr_SetString(PyExc_ValueError,
111-
"timeout value must bepositive");
112+
"timeout value must bea non-negative number");
112113
return-1;
113114
}
114115
if (!blocking)

‎Modules/_xxinterpchannelsmodule.c

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,8 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared,
242242
}
243243

244244
staticint
245-
wait_for_lock(PyThread_type_lockmutex)
245+
wait_for_lock(PyThread_type_lockmutex,PY_TIMEOUT_Ttimeout)
246246
{
247-
PY_TIMEOUT_Ttimeout=PyThread_UNSET_TIMEOUT;
248247
PyLockStatusres=PyThread_acquire_lock_timed_with_retries(mutex,timeout);
249248
if (res==PY_LOCK_INTR) {
250249
/* KeyboardInterrupt, etc. */
@@ -1883,7 +1882,8 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
18831882
}
18841883

18851884
staticint
1886-
_channel_send_wait(_channels*channels,int64_tcid,PyObject*obj)
1885+
_channel_send_wait(_channels*channels,int64_tcid,PyObject*obj,
1886+
PY_TIMEOUT_Ttimeout)
18871887
{
18881888
// We use a stack variable here, so we must ensure that &waiting
18891889
// is not held by any channel item at the point this function exits.
@@ -1901,7 +1901,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
19011901
}
19021902

19031903
/* Wait until the object is received. */
1904-
if (wait_for_lock(waiting.mutex)<0) {
1904+
if (wait_for_lock(waiting.mutex,timeout)<0) {
19051905
assert(PyErr_Occurred());
19061906
_waiting_finish_releasing(&waiting);
19071907
/* The send() call is failing now, so make sure the item
@@ -2816,25 +2816,29 @@ receive end.");
28162816
staticPyObject*
28172817
channel_send(PyObject*self,PyObject*args,PyObject*kwds)
28182818
{
2819-
// XXX Add a timeout arg.
2820-
staticchar*kwlist[]= {"cid","obj","blocking",NULL};
2821-
int64_tcid;
2819+
staticchar*kwlist[]= {"cid","obj","blocking","timeout",NULL};
28222820
structchannel_id_converter_datacid_data= {
28232821
.module=self,
28242822
};
28252823
PyObject*obj;
28262824
intblocking=1;
2827-
if (!PyArg_ParseTupleAndKeywords(args,kwds,"O&O|$p:channel_send",kwlist,
2825+
PyObject*timeout_obj=NULL;
2826+
if (!PyArg_ParseTupleAndKeywords(args,kwds,"O&O|$pO:channel_send",kwlist,
28282827
channel_id_converter,&cid_data,&obj,
2829-
&blocking)) {
2828+
&blocking,&timeout_obj)) {
2829+
returnNULL;
2830+
}
2831+
2832+
int64_tcid=cid_data.cid;
2833+
PY_TIMEOUT_Ttimeout;
2834+
if (PyThread_ParseTimeoutArg(timeout_obj,blocking,&timeout)<0) {
28302835
returnNULL;
28312836
}
2832-
cid=cid_data.cid;
28332837

28342838
/* Queue up the object. */
28352839
interr=0;
28362840
if (blocking) {
2837-
err=_channel_send_wait(&_globals.channels,cid,obj);
2841+
err=_channel_send_wait(&_globals.channels,cid,obj,timeout);
28382842
}
28392843
else {
28402844
err=_channel_send(&_globals.channels,cid,obj,NULL);
@@ -2855,20 +2859,25 @@ By default this waits for the object to be received.");
28552859
staticPyObject*
28562860
channel_send_buffer(PyObject*self,PyObject*args,PyObject*kwds)
28572861
{
2858-
staticchar*kwlist[]= {"cid","obj","blocking",NULL};
2859-
int64_tcid;
2862+
staticchar*kwlist[]= {"cid","obj","blocking","timeout",NULL};
28602863
structchannel_id_converter_datacid_data= {
28612864
.module=self,
28622865
};
28632866
PyObject*obj;
28642867
intblocking=1;
2868+
PyObject*timeout_obj=NULL;
28652869
if (!PyArg_ParseTupleAndKeywords(args,kwds,
2866-
"O&O|$p:channel_send_buffer",kwlist,
2870+
"O&O|$pO:channel_send_buffer",kwlist,
28672871
channel_id_converter,&cid_data,&obj,
2868-
&blocking)) {
2872+
&blocking,&timeout_obj)) {
2873+
returnNULL;
2874+
}
2875+
2876+
int64_tcid=cid_data.cid;
2877+
PY_TIMEOUT_Ttimeout;
2878+
if (PyThread_ParseTimeoutArg(timeout_obj,blocking,&timeout)<0) {
28692879
returnNULL;
28702880
}
2871-
cid=cid_data.cid;
28722881

28732882
PyObject*tempobj=PyMemoryView_FromObject(obj);
28742883
if (tempobj==NULL) {
@@ -2878,7 +2887,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
28782887
/* Queue up the object. */
28792888
interr=0;
28802889
if (blocking) {
2881-
err=_channel_send_wait(&_globals.channels,cid,tempobj);
2890+
err=_channel_send_wait(&_globals.channels,cid,tempobj,timeout);
28822891
}
28832892
else {
28842893
err=_channel_send(&_globals.channels,cid,tempobj,NULL);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp