Source code for asyncpg.transaction

# Copyright (C) 2016-present the asyncpg authors and contributors# <see AUTHORS file>## This module is part of asyncpg and is released under# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0importenumfrom.importconnresourcefrom.importexceptionsasapg_errorsclassTransactionState(enum.Enum):NEW=0STARTED=1COMMITTED=2ROLLEDBACK=3FAILED=4ISOLATION_LEVELS={'read_committed','read_uncommitted','serializable','repeatable_read',}ISOLATION_LEVELS_BY_VALUE={'read committed':'read_committed','read uncommitted':'read_uncommitted','serializable':'serializable','repeatable read':'repeatable_read',}
[docs]classTransaction(connresource.ConnectionResource):"""Represents a transaction or savepoint block. Transactions are created by calling the :meth:`Connection.transaction() <connection.Connection.transaction>` function. """__slots__=('_connection','_isolation','_readonly','_deferrable','_state','_nested','_id','_managed')def__init__(self,connection,isolation,readonly,deferrable):super().__init__(connection)ifisolationandisolationnotinISOLATION_LEVELS:raiseValueError('isolation is expected to be either of{}, ''got{!r}'.format(ISOLATION_LEVELS,isolation))self._isolation=isolationself._readonly=readonlyself._deferrable=deferrableself._state=TransactionState.NEWself._nested=Falseself._id=Noneself._managed=Falseasyncdef__aenter__(self):ifself._managed:raiseapg_errors.InterfaceError('cannot enter context: already in an `async with` block')self._managed=Trueawaitself.start()asyncdef__aexit__(self,extype,ex,tb):try:self._check_conn_validity('__aexit__')exceptapg_errors.InterfaceError:ifextypeisGeneratorExit:# When a PoolAcquireContext is being exited, and there# is an open transaction in an async generator that has# not been iterated fully, there is a possibility that# Pool.release() would race with this __aexit__(), since# both would be in concurrent tasks. In such case we# yield to Pool.release() to do the ROLLBACK for us.# See https://github.com/MagicStack/asyncpg/issues/232# for an example.returnelse:raisetry:ifextypeisnotNone:awaitself.__rollback()else:awaitself.__commit()finally:self._managed=False
[docs]@connresource.guardedasyncdefstart(self):"""Enter the transaction or savepoint block."""self.__check_state_base('start')ifself._stateisTransactionState.STARTED:raiseapg_errors.InterfaceError('cannot start; the transaction is already started')con=self._connectionifcon._top_xactisNone:ifcon._protocol.is_in_transaction():raiseapg_errors.InterfaceError('cannot use Connection.transaction() in ''a manually started transaction')con._top_xact=selfelse:# Nested transaction blockifself._isolation:top_xact_isolation=con._top_xact._isolationiftop_xact_isolationisNone:top_xact_isolation=ISOLATION_LEVELS_BY_VALUE[awaitself._connection.fetchval('SHOW transaction_isolation;')]ifself._isolation!=top_xact_isolation:raiseapg_errors.InterfaceError('nested transaction has a different isolation level: ''current{!r} != outer{!r}'.format(self._isolation,top_xact_isolation))self._nested=Trueifself._nested:self._id=con._get_unique_id('savepoint')query='SAVEPOINT{};'.format(self._id)else:query='BEGIN'ifself._isolation=='read_committed':query+=' ISOLATION LEVEL READ COMMITTED'elifself._isolation=='read_uncommitted':query+=' ISOLATION LEVEL READ UNCOMMITTED'elifself._isolation=='repeatable_read':query+=' ISOLATION LEVEL REPEATABLE READ'elifself._isolation=='serializable':query+=' ISOLATION LEVEL SERIALIZABLE'ifself._readonly:query+=' READ ONLY'ifself._deferrable:query+=' DEFERRABLE'query+=';'try:awaitself._connection.execute(query)exceptBaseException:self._state=TransactionState.FAILEDraiseelse:self._state=TransactionState.STARTED
def__check_state_base(self,opname):ifself._stateisTransactionState.COMMITTED:raiseapg_errors.InterfaceError('cannot{}; the transaction is already committed'.format(opname))ifself._stateisTransactionState.ROLLEDBACK:raiseapg_errors.InterfaceError('cannot{}; the transaction is already rolled back'.format(opname))ifself._stateisTransactionState.FAILED:raiseapg_errors.InterfaceError('cannot{}; the transaction is in error state'.format(opname))def__check_state(self,opname):ifself._stateisnotTransactionState.STARTED:ifself._stateisTransactionState.NEW:raiseapg_errors.InterfaceError('cannot{}; the transaction is not yet started'.format(opname))self.__check_state_base(opname)asyncdef__commit(self):self.__check_state('commit')ifself._connection._top_xactisself:self._connection._top_xact=Noneifself._nested:query='RELEASE SAVEPOINT{};'.format(self._id)else:query='COMMIT;'try:awaitself._connection.execute(query)exceptBaseException:self._state=TransactionState.FAILEDraiseelse:self._state=TransactionState.COMMITTEDasyncdef__rollback(self):self.__check_state('rollback')ifself._connection._top_xactisself:self._connection._top_xact=Noneifself._nested:query='ROLLBACK TO{};'.format(self._id)else:query='ROLLBACK;'try:awaitself._connection.execute(query)exceptBaseException:self._state=TransactionState.FAILEDraiseelse:self._state=TransactionState.ROLLEDBACK
[docs]@connresource.guardedasyncdefcommit(self):"""Exit the transaction or savepoint block and commit changes."""ifself._managed:raiseapg_errors.InterfaceError('cannot manually commit from within an `async with` block')awaitself.__commit()
[docs]@connresource.guardedasyncdefrollback(self):"""Exit the transaction or savepoint block and rollback changes."""ifself._managed:raiseapg_errors.InterfaceError('cannot manually rollback from within an `async with` block')awaitself.__rollback()
def__repr__(self):attrs=[]attrs.append('state:{}'.format(self._state.name.lower()))ifself._isolationisnotNone:attrs.append(self._isolation)ifself._readonly:attrs.append('readonly')ifself._deferrable:attrs.append('deferrable')ifself.__class__.__module__.startswith('asyncpg.'):mod='asyncpg'else:mod=self.__class__.__module__return'<{}.{}{}{:#x}>'.format(mod,self.__class__.__name__,' '.join(attrs),id(self))