Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit43855dd

Browse files
feat: add new_transaction support (#499)
1 parentf4f3bc7 commit43855dd

File tree

12 files changed

+580
-59
lines changed

12 files changed

+580
-59
lines changed

‎google/cloud/datastore/aggregation.py‎

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,11 @@ def _next_page(self):
442442
returnNone
443443

444444
query_pb=self._build_protobuf()
445-
transaction=self.client.current_transaction
446-
iftransactionisNone:
447-
transaction_id=None
448-
else:
449-
transaction_id=transaction.id
445+
transaction_id,new_transaction_options=helpers.get_transaction_options(
446+
self.client.current_transaction
447+
)
450448
read_options=helpers.get_read_options(
451-
self._eventual,transaction_id,self._read_time
449+
self._eventual,transaction_id,self._read_time,new_transaction_options
452450
)
453451

454452
partition_id=entity_pb2.PartitionId(

‎google/cloud/datastore/batch.py‎

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,19 @@ def mutations(self):
192192
"""
193193
returnself._mutations
194194

195+
def_allow_mutations(self)->bool:
196+
"""
197+
This method is called to see if the batch is in a proper state to allow
198+
`put` and `delete` operations.
199+
200+
the Transaction subclass overrides this method to support
201+
the `begin_later` flag.
202+
203+
:rtype: bool
204+
:returns: True if the batch is in a state to allow mutations.
205+
"""
206+
returnself._status==self._IN_PROGRESS
207+
195208
defput(self,entity):
196209
"""Remember an entity's state to be saved during :meth:`commit`.
197210
@@ -218,7 +231,7 @@ def put(self, entity):
218231
progress, if entity has no key assigned, or if the key's
219232
``project`` does not match ours.
220233
"""
221-
ifself._status!=self._IN_PROGRESS:
234+
ifnotself._allow_mutations():
222235
raiseValueError("Batch must be in progress to put()")
223236

224237
ifentity.keyisNone:
@@ -248,7 +261,7 @@ def delete(self, key):
248261
progress, if key is not complete, or if the key's
249262
``project`` does not match ours.
250263
"""
251-
ifself._status!=self._IN_PROGRESS:
264+
ifnotself._allow_mutations():
252265
raiseValueError("Batch must be in progress to delete()")
253266

254267
ifkey.is_partial:
@@ -370,10 +383,12 @@ def __enter__(self):
370383

371384
def__exit__(self,exc_type,exc_val,exc_tb):
372385
try:
373-
ifexc_typeisNone:
374-
self.commit()
375-
else:
376-
self.rollback()
386+
# commit or rollback if not in terminal state
387+
ifself._statusnotin (self._ABORTED,self._FINISHED):
388+
ifexc_typeisNone:
389+
self.commit()
390+
else:
391+
self.rollback()
377392
finally:
378393
self._client._pop_batch()
379394

‎google/cloud/datastore/client.py‎

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def _extended_lookup(
122122
missing=None,
123123
deferred=None,
124124
eventual=False,
125-
transaction_id=None,
125+
transaction=None,
126126
retry=None,
127127
timeout=None,
128128
read_time=None,
@@ -158,10 +158,10 @@ def _extended_lookup(
158158
consistency. If True, request ``EVENTUAL`` read
159159
consistency.
160160
161-
:typetransaction_id: str
162-
:paramtransaction_id: If passed, make the request in the scope of
163-
the given transaction. Incompatible with
164-
``eventual==True`` or ``read_time``.
161+
:typetransaction: Transaction
162+
:paramtransaction: If passed, make the request in the scope of
163+
the given transaction. Incompatible with
164+
``eventual==True`` or ``read_time``.
165165
166166
:type retry: :class:`google.api_core.retry.Retry`
167167
:param retry:
@@ -177,7 +177,7 @@ def _extended_lookup(
177177
:type read_time: datetime
178178
:param read_time:
179179
(Optional) Read time to use for read consistency. Incompatible with
180-
``eventual==True`` or ``transaction_id``.
180+
``eventual==True`` or ``transaction``.
181181
This feature is in private preview.
182182
183183
:type database: str
@@ -199,8 +199,14 @@ def _extended_lookup(
199199

200200
results= []
201201

202+
transaction_id=None
203+
transaction_id,new_transaction_options=helpers.get_transaction_options(
204+
transaction
205+
)
206+
read_options=helpers.get_read_options(
207+
eventual,transaction_id,read_time,new_transaction_options
208+
)
202209
loop_num=0
203-
read_options=helpers.get_read_options(eventual,transaction_id,read_time)
204210
whileloop_num<_MAX_LOOPS:# loop against possible deferred.
205211
loop_num+=1
206212
request= {
@@ -214,6 +220,10 @@ def _extended_lookup(
214220
**kwargs,
215221
)
216222

223+
# set new transaction id if we just started a transaction
224+
iftransactionandlookup_response.transaction:
225+
transaction._begin_with_id(lookup_response.transaction)
226+
217227
# Accumulate the new results.
218228
results.extend(result.entityforresultinlookup_response.found)
219229

@@ -570,7 +580,7 @@ def get_multi(
570580
eventual=eventual,
571581
missing=missing,
572582
deferred=deferred,
573-
transaction_id=transactionandtransaction.id,
583+
transaction=transaction,
574584
retry=retry,
575585
timeout=timeout,
576586
read_time=read_time,

‎google/cloud/datastore/helpers.py‎

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ def entity_to_protobuf(entity):
230230
returnentity_pb
231231

232232

233-
defget_read_options(eventual,transaction_id,read_time=None):
233+
defget_read_options(
234+
eventual,transaction_id,read_time=None,new_transaction_options=None
235+
):
234236
"""Validate rules for read options, and assign to the request.
235237
236238
Helper method for ``lookup()`` and ``run_query``.
@@ -245,33 +247,55 @@ def get_read_options(eventual, transaction_id, read_time=None):
245247
:type read_time: datetime
246248
:param read_time: Read data from the specified time (may be null). This feature is in private preview.
247249
250+
:type new_transaction_options: :class:`google.cloud.datastore_v1.types.TransactionOptions`
251+
:param new_transaction_options: Options for a new transaction.
252+
248253
:rtype: :class:`.datastore_pb2.ReadOptions`
249254
:returns: The read options corresponding to the inputs.
250255
:raises: :class:`ValueError` if more than one of ``eventual==True``,
251-
``transaction``, and ``read_time`` is specified.
256+
``transaction_id``,``read_time``,and ``new_transaction_options`` is specified.
252257
"""
253-
iftransaction_idisNone:
254-
ifeventual:
255-
ifread_timeisnotNone:
256-
raiseValueError("eventual must be False when read_time is specified")
257-
else:
258-
returndatastore_pb2.ReadOptions(
259-
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
260-
)
261-
else:
262-
ifread_timeisNone:
263-
returndatastore_pb2.ReadOptions()
264-
else:
265-
read_time_pb=timestamp_pb2.Timestamp()
266-
read_time_pb.FromDatetime(read_time)
267-
returndatastore_pb2.ReadOptions(read_time=read_time_pb)
268-
else:
269-
ifeventual:
270-
raiseValueError("eventual must be False when in a transaction")
271-
elifread_timeisnotNone:
272-
raiseValueError("transaction and read_time are mutual exclusive")
273-
else:
274-
returndatastore_pb2.ReadOptions(transaction=transaction_id)
258+
is_set= [
259+
bool(x)forxin (eventual,transaction_id,read_time,new_transaction_options)
260+
]
261+
ifsum(is_set)>1:
262+
raiseValueError(
263+
"At most one of eventual, transaction, or read_time is allowed."
264+
)
265+
new_options=datastore_pb2.ReadOptions()
266+
iftransaction_idisnotNone:
267+
new_options.transaction=transaction_id
268+
ifread_timeisnotNone:
269+
read_time_pb=timestamp_pb2.Timestamp()
270+
read_time_pb.FromDatetime(read_time)
271+
new_options.read_time=read_time_pb
272+
ifnew_transaction_optionsisnotNone:
273+
new_options.new_transaction=new_transaction_options
274+
ifeventual:
275+
new_options.read_consistency= (
276+
datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
277+
)
278+
returnnew_options
279+
280+
281+
defget_transaction_options(transaction):
282+
"""
283+
Get the transaction_id or new_transaction_options field from an active transaction object,
284+
for use in get_read_options
285+
286+
These are mutually-exclusive fields, so one or both will be None.
287+
288+
:rtype: Tuple[Optional[bytes], Optional[google.cloud.datastore_v1.types.TransactionOptions]]
289+
:returns: The transaction_id and new_transaction_options fields from the transaction object.
290+
"""
291+
transaction_id,new_transaction_options=None,None
292+
iftransactionisnotNone:
293+
iftransaction.idisnotNone:
294+
transaction_id=transaction.id
295+
eliftransaction._begin_laterandtransaction._status==transaction._INITIAL:
296+
# If the transaction has not yet been begun, we can use the new_transaction_options field.
297+
new_transaction_options=transaction._options
298+
returntransaction_id,new_transaction_options
275299

276300

277301
defkey_from_protobuf(pb):

‎google/cloud/datastore/query.py‎

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -778,13 +778,12 @@ def _next_page(self):
778778
returnNone
779779

780780
query_pb=self._build_protobuf()
781-
transaction=self.client.current_transaction
782-
iftransactionisNone:
783-
transaction_id=None
784-
else:
785-
transaction_id=transaction.id
781+
new_transaction_options=None
782+
transaction_id,new_transaction_options=helpers.get_transaction_options(
783+
self.client.current_transaction
784+
)
786785
read_options=helpers.get_read_options(
787-
self._eventual,transaction_id,self._read_time
786+
self._eventual,transaction_id,self._read_time,new_transaction_options
788787
)
789788

790789
partition_id=entity_pb2.PartitionId(

‎google/cloud/datastore/transaction.py‎

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414

1515
"""Create / interact with Google Cloud Datastore transactions."""
16-
1716
fromgoogle.cloud.datastore.batchimportBatch
1817
fromgoogle.cloud.datastore_v1.typesimportTransactionOptions
1918
fromgoogle.protobufimporttimestamp_pb2
@@ -149,15 +148,23 @@ class Transaction(Batch):
149148
:param read_time: (Optional) Time at which the transaction reads entities.
150149
Only allowed when ``read_only=True``. This feature is in private preview.
151150
151+
:type begin_later: bool
152+
:param begin_later: (Optional) If True, the transaction will be started
153+
lazily (i.e. when the first RPC is made). If False,
154+
the transaction will be started as soon as the context manager
155+
is entered. `self.begin()` can also be called manually to begin
156+
the transaction at any time. Default is False.
157+
152158
:raises: :class:`ValueError` if read_time is specified when
153159
``read_only=False``.
154160
"""
155161

156162
_status=None
157163

158-
def__init__(self,client,read_only=False,read_time=None):
164+
def__init__(self,client,read_only=False,read_time=None,begin_later=False):
159165
super(Transaction,self).__init__(client)
160166
self._id=None
167+
self._begin_later=begin_later
161168

162169
ifread_only:
163170
ifread_timeisnotNone:
@@ -180,8 +187,8 @@ def __init__(self, client, read_only=False, read_time=None):
180187
defid(self):
181188
"""Getter for the transaction ID.
182189
183-
:rtype:str
184-
:returns: The ID of the current transaction.
190+
:rtype:bytes or None
191+
:returns: The ID of the current transaction, or None if not started.
185192
"""
186193
returnself._id
187194

@@ -240,6 +247,21 @@ def begin(self, retry=None, timeout=None):
240247
self._status=self._ABORTED
241248
raise
242249

250+
def_begin_with_id(self,transaction_id):
251+
"""
252+
Attach newly created transaction to an existing transaction ID.
253+
254+
This is used when begin_later is True, when the first lookup request
255+
associated with this transaction creates a new transaction ID.
256+
257+
:type transaction_id: bytes
258+
:param transaction_id: ID of the transaction to attach to.
259+
"""
260+
ifself._statusisnotself._INITIAL:
261+
raiseValueError("Transaction already begun.")
262+
self._id=transaction_id
263+
self._status=self._IN_PROGRESS
264+
243265
defrollback(self,retry=None,timeout=None):
244266
"""Rolls back the current transaction.
245267
@@ -258,6 +280,12 @@ def rollback(self, retry=None, timeout=None):
258280
Note that if ``retry`` is specified, the timeout applies
259281
to each individual attempt.
260282
"""
283+
# if transaction has not started, abort it
284+
ifself._status==self._INITIAL:
285+
self._status=self._ABORTED
286+
self._id=None
287+
returnNone
288+
261289
kwargs=_make_retry_timeout_kwargs(retry,timeout)
262290

263291
try:
@@ -296,6 +324,15 @@ def commit(self, retry=None, timeout=None):
296324
Note that if ``retry`` is specified, the timeout applies
297325
to each individual attempt.
298326
"""
327+
# if transaction has not begun, either begin now, or abort if empty
328+
ifself._status==self._INITIAL:
329+
ifnotself._mutations:
330+
self._status=self._ABORTED
331+
self._id=None
332+
returnNone
333+
else:
334+
self.begin()
335+
299336
kwargs=_make_retry_timeout_kwargs(retry,timeout)
300337

301338
try:
@@ -321,3 +358,18 @@ def put(self, entity):
321358
raiseRuntimeError("Transaction is read only")
322359
else:
323360
super(Transaction,self).put(entity)
361+
362+
def__enter__(self):
363+
ifnotself._begin_later:
364+
self.begin()
365+
self._client._push_batch(self)
366+
returnself
367+
368+
def_allow_mutations(self):
369+
"""
370+
Mutations can be added to a transaction if it is in IN_PROGRESS state,
371+
or if it is in INITIAL state and the begin_later flag is set.
372+
"""
373+
returnself._status==self._IN_PROGRESSor (
374+
self._begin_laterandself._status==self._INITIAL
375+
)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp