Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6c7677e

Browse files
author
Gurov Ilya
authored
refactor: incorporate will_accept() checks into publish() (#108)
1 parent0132a46 commit6c7677e

File tree

5 files changed

+47
-69
lines changed

5 files changed

+47
-69
lines changed

‎google/cloud/pubsub_v1/publisher/_batch/base.py‎

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -109,32 +109,6 @@ def status(self):
109109
"""
110110
raiseNotImplementedError
111111

112-
defwill_accept(self,message):
113-
"""Return True if the batch is able to accept the message.
114-
115-
In concurrent implementations, the attributes on the current batch
116-
may be modified by other workers. With this in mind, the caller will
117-
likely want to hold a lock that will make sure the state remains
118-
the same after the "will accept?" question is answered.
119-
120-
Args:
121-
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
122-
123-
Returns:
124-
bool: Whether this batch can accept the message.
125-
"""
126-
# If this batch is not accepting messages generally, return False.
127-
ifself.status!=BatchStatus.ACCEPTING_MESSAGES:
128-
returnFalse
129-
130-
# If this message will make the batch exceed the ``max_messages``
131-
# setting, return False.
132-
iflen(self.messages)>=self.settings.max_messages:
133-
returnFalse
134-
135-
# Okay, everything is good.
136-
returnTrue
137-
138112
defcancel(self,cancellation_reason):
139113
"""Complete pending futures with an exception.
140114

‎google/cloud/pubsub_v1/publisher/_batch/thread.py‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ def publish(self, message):
333333
self._status!=base.BatchStatus.ERROR
334334
),"Publish after stop() or publish error."
335335

336-
ifnotself.will_accept(message):
337-
returnfuture
336+
ifself.status!=base.BatchStatus.ACCEPTING_MESSAGES:
337+
return
338338

339339
size_increase=types.PublishRequest(messages=[message]).ByteSize()
340340

‎tests/unit/pubsub_v1/publisher/batch/test_base.py‎

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,3 @@ def test_len():
4646
assertlen(batch)==0
4747
batch.publish(types.PubsubMessage(data=b"foo"))
4848
assertlen(batch)==1
49-
50-
51-
deftest_will_accept():
52-
batch=create_batch(status=BatchStatus.ACCEPTING_MESSAGES)
53-
message=types.PubsubMessage()
54-
assertbatch.will_accept(message)isTrue
55-
56-
57-
deftest_will_accept_oversize():
58-
batch=create_batch(
59-
settings=types.BatchSettings(max_bytes=10),
60-
status=BatchStatus.ACCEPTING_MESSAGES,
61-
)
62-
message=types.PubsubMessage(data=b"abcdefghijklmnopqrstuvwxyz")
63-
assertbatch.will_accept(message)isTrue
64-
65-
66-
deftest_will_not_accept_status():
67-
batch=create_batch(status="talk to the hand")
68-
message=types.PubsubMessage()
69-
assertbatch.will_accept(message)isFalse
70-
71-
72-
deftest_will_not_accept_number():
73-
batch=create_batch(
74-
settings=types.BatchSettings(max_messages=-1),
75-
status=BatchStatus.ACCEPTING_MESSAGES,
76-
)
77-
message=types.PubsubMessage(data=b"abc")
78-
assertbatch.will_accept(message)isFalse

‎tests/unit/pubsub_v1/publisher/batch/test_thread.py‎

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,18 +287,56 @@ def test_publish_updating_batch_size():
287287
assertbatch.size>0# I do not always trust protobuf.
288288

289289

290-
deftest_publish_not_will_accept():
290+
deftest_publish():
291+
batch=create_batch()
292+
message=types.PubsubMessage()
293+
future=batch.publish(message)
294+
295+
assertlen(batch.messages)==1
296+
assertbatch._futures== [future]
297+
298+
299+
deftest_publish_max_messages_zero():
291300
batch=create_batch(topic="topic_foo",max_messages=0)
292-
base_request_size=types.PublishRequest(topic="topic_foo").ByteSize()
293301

294-
# Publish the message.
295302
message=types.PubsubMessage(data=b"foobarbaz")
303+
withmock.patch.object(batch,"commit")ascommit:
304+
future=batch.publish(message)
305+
306+
assertfutureisnotNone
307+
assertlen(batch.messages)==1
308+
assertbatch._futures== [future]
309+
commit.assert_called_once()
310+
311+
312+
deftest_publish_max_messages_enforced():
313+
batch=create_batch(topic="topic_foo",max_messages=1)
314+
315+
message=types.PubsubMessage(data=b"foobarbaz")
316+
message2=types.PubsubMessage(data=b"foobarbaz2")
317+
318+
future=batch.publish(message)
319+
future2=batch.publish(message2)
320+
321+
assertfutureisnotNone
322+
assertfuture2isNone
323+
assertlen(batch.messages)==1
324+
assertlen(batch._futures)==1
325+
326+
327+
deftest_publish_max_bytes_enforced():
328+
batch=create_batch(topic="topic_foo",max_bytes=15)
329+
330+
message=types.PubsubMessage(data=b"foobarbaz")
331+
message2=types.PubsubMessage(data=b"foobarbaz2")
332+
296333
future=batch.publish(message)
334+
future2=batch.publish(message2)
297335

298-
assertfutureisNone
299-
assertbatch.size==base_request_size
300-
assertbatch.messages==[]
301-
assertbatch._futures==[]
336+
assertfutureisnotNone
337+
assertfuture2isNone
338+
assertlen(batch.messages)==1
339+
assertlen(batch._futures)==1
302340

303341

304342
deftest_publish_exceed_max_messages():

‎tests/unit/pubsub_v1/publisher/test_publisher_client.py‎

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_publish():
135135
batch=mock.Mock(spec=client._batch_class)
136136

137137
# Set the mock up to claim indiscriminately that it accepts all messages.
138-
batch.will_accept.return_value=True
139138
batch.publish.side_effect= (future1,future2)
140139

141140
topic="topic/path"
@@ -169,7 +168,6 @@ def test_publish_error_exceeding_flow_control_limits():
169168
client=publisher.Client(credentials=creds,publisher_options=publisher_options)
170169

171170
mock_batch=mock.Mock(spec=client._batch_class)
172-
mock_batch.will_accept.return_value=True
173171
topic="topic/path"
174172
client._set_batch(topic,mock_batch)
175173

@@ -216,7 +214,6 @@ def test_publish_attrs_bytestring():
216214
# Use a mock in lieu of the actual batch class.
217215
batch=mock.Mock(spec=client._batch_class)
218216
# Set the mock up to claim indiscriminately that it accepts all messages.
219-
batch.will_accept.return_value=True
220217

221218
topic="topic/path"
222219
client._set_batch(topic,batch)
@@ -431,7 +428,6 @@ def test_publish_with_ordering_key():
431428
future1.add_done_callback=mock.Mock(spec=["__call__"])
432429
future2.add_done_callback=mock.Mock(spec=["__call__"])
433430

434-
batch.will_accept.return_value=True
435431
batch.publish.side_effect= (future1,future2)
436432

437433
topic="topic/path"

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp