# Copyright (C) 2016-present the asyncpg authors and contributors# <see AUTHORS file>## This module is part of asyncpg and is released under# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0importcollectionsfrom.importconnresourcefrom.importexceptions[docs]classCursorFactory(connresource.ConnectionResource):"""A cursor interface for the results of a query. A cursor interface can be used to initiate efficient traversal of the results of a large query. """__slots__=('_state','_args','_prefetch','_query','_timeout','_record_class',)def__init__(self,connection,query,state,args,prefetch,timeout,record_class):super().__init__(connection)self._args=argsself._prefetch=prefetchself._query=queryself._timeout=timeoutself._state=stateself._record_class=record_classifstateisnotNone:state.attach()@connresource.guardeddef__aiter__(self):prefetch=50ifself._prefetchisNoneelseself._prefetchreturnCursorIterator(self._connection,self._query,self._state,self._args,self._record_class,prefetch,self._timeout,)@connresource.guardeddef__await__(self):ifself._prefetchisnotNone:raiseexceptions.InterfaceError('prefetch argument can only be specified for iterable cursor')cursor=Cursor(self._connection,self._query,self._state,self._args,self._record_class,)returncursor._init(self._timeout).__await__()def__del__(self):ifself._stateisnotNone:self._state.detach()self._connection._maybe_gc_stmt(self._state) classBaseCursor(connresource.ConnectionResource):__slots__=('_state','_args','_portal_name','_exhausted','_query','_record_class',)def__init__(self,connection,query,state,args,record_class):super().__init__(connection)self._args=argsself._state=stateifstateisnotNone:state.attach()self._portal_name=Noneself._exhausted=Falseself._query=queryself._record_class=record_classdef_check_ready(self):ifself._stateisNone:raiseexceptions.InterfaceError('cursor: no associated prepared statement')ifself._state.closed:raiseexceptions.InterfaceError('cursor: the prepared statement is closed')ifnotself._connection._top_xact:raiseexceptions.NoActiveSQLTransactionError('cursor cannot be created outside of a transaction')asyncdef_bind_exec(self,n,timeout):self._check_ready()ifself._portal_name:raiseexceptions.InterfaceError('cursor already has an open portal')con=self._connectionprotocol=con._protocolself._portal_name=con._get_unique_id('portal')buffer,_,self._exhausted=awaitprotocol.bind_execute(self._state,self._args,self._portal_name,n,True,timeout)returnbufferasyncdef_bind(self,timeout):self._check_ready()ifself._portal_name:raiseexceptions.InterfaceError('cursor already has an open portal')con=self._connectionprotocol=con._protocolself._portal_name=con._get_unique_id('portal')buffer=awaitprotocol.bind(self._state,self._args,self._portal_name,timeout)returnbufferasyncdef_exec(self,n,timeout):self._check_ready()ifnotself._portal_name:raiseexceptions.InterfaceError('cursor does not have an open portal')protocol=self._connection._protocolbuffer,_,self._exhausted=awaitprotocol.execute(self._state,self._portal_name,n,True,timeout)returnbufferasyncdef_close_portal(self,timeout):self._check_ready()ifnotself._portal_name:raiseexceptions.InterfaceError('cursor does not have an open portal')protocol=self._connection._protocolawaitprotocol.close_portal(self._portal_name,timeout)self._portal_name=Nonedef__repr__(self):attrs=[]ifself._exhausted:attrs.append('exhausted')attrs.append('')# to separate from idifself.__class__.__module__.startswith('asyncpg.'):mod='asyncpg'else:mod=self.__class__.__module__return'<{}.{} "{!s:.30}"{}{:#x}>'.format(mod,self.__class__.__name__,self._state.query,' '.join(attrs),id(self))def__del__(self):ifself._stateisnotNone:self._state.detach()self._connection._maybe_gc_stmt(self._state)classCursorIterator(BaseCursor):__slots__=('_buffer','_prefetch','_timeout')def__init__(self,connection,query,state,args,record_class,prefetch,timeout):super().__init__(connection,query,state,args,record_class)ifprefetch<=0:raiseexceptions.InterfaceError('prefetch argument must be greater than zero')self._buffer=collections.deque()self._prefetch=prefetchself._timeout=timeout@connresource.guardeddef__aiter__(self):returnself@connresource.guardedasyncdef__anext__(self):ifself._stateisNone:self._state=awaitself._connection._get_statement(self._query,self._timeout,named=True,record_class=self._record_class,)self._state.attach()ifnotself._portal_nameandnotself._exhausted:buffer=awaitself._bind_exec(self._prefetch,self._timeout)self._buffer.extend(buffer)ifnotself._bufferandnotself._exhausted:buffer=awaitself._exec(self._prefetch,self._timeout)self._buffer.extend(buffer)ifself._portal_nameandself._exhausted:awaitself._close_portal(self._timeout)ifself._buffer:returnself._buffer.popleft()raiseStopAsyncIteration[docs]classCursor(BaseCursor):"""An open *portal* into the results of a query."""__slots__=()asyncdef_init(self,timeout):ifself._stateisNone:self._state=awaitself._connection._get_statement(self._query,timeout,named=True,record_class=self._record_class,)self._state.attach()self._check_ready()awaitself._bind(timeout)returnself[docs]@connresource.guardedasyncdeffetch(self,n,*,timeout=None):r"""Return the next *n* rows as a list of :class:`Record` objects. :param float timeout: Optional timeout value in seconds. :return: A list of :class:`Record` instances. """self._check_ready()ifn<=0:raiseexceptions.InterfaceError('n must be greater than zero')ifself._exhausted:return[]recs=awaitself._exec(n,timeout)iflen(recs)<n:self._exhausted=Truereturnrecs [docs]@connresource.guardedasyncdeffetchrow(self,*,timeout=None):r"""Return the next row. :param float timeout: Optional timeout value in seconds. :return: A :class:`Record` instance. """self._check_ready()ifself._exhausted:returnNonerecs=awaitself._exec(1,timeout)iflen(recs)<1:self._exhausted=TruereturnNonereturnrecs[0] [docs]@connresource.guardedasyncdefforward(self,n,*,timeout=None)->int:r"""Skip over the next *n* rows. :param float timeout: Optional timeout value in seconds. :return: A number of rows actually skipped over (<= *n*). """self._check_ready()ifn<=0:raiseexceptions.InterfaceError('n must be greater than zero')protocol=self._connection._protocolstatus=awaitprotocol.query('MOVE FORWARD{:d}{}'.format(n,self._portal_name),timeout)advanced=int(status.split()[1])ifadvanced<n:self._exhausted=Truereturnadvanced