Source code for asyncpg.pool

# Copyright (C) 2016-present the asyncpg authors and contributors# <see AUTHORS file>## This module is part of asyncpg and is released under# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0from__future__importannotationsimportasynciofromcollections.abcimportAwaitable,CallableimportfunctoolsimportinspectimportloggingimporttimefromtypesimportTracebackTypefromtypingimportAny,Optional,Typeimportwarningsfrom.importcompatfrom.importconnectionfrom.importexceptionsfrom.importprotocollogger=logging.getLogger(__name__)classPoolConnectionProxyMeta(type):def__new__(mcls,name:str,bases:tuple[Type[Any],...],dct:dict[str,Any],*,wrap:bool=False,)->PoolConnectionProxyMeta:ifwrap:forattrnameindir(connection.Connection):ifattrname.startswith('_')orattrnameindct:continuemeth=getattr(connection.Connection,attrname)ifnotinspect.isfunction(meth):continueiscoroutine=inspect.iscoroutinefunction(meth)wrapper=mcls._wrap_connection_method(attrname,iscoroutine)wrapper=functools.update_wrapper(wrapper,meth)dct[attrname]=wrapperif'__doc__'notindct:dct['__doc__']=connection.Connection.__doc__returnsuper().__new__(mcls,name,bases,dct)@staticmethoddef_wrap_connection_method(meth_name:str,iscoroutine:bool)->Callable[...,Any]:defcall_con_method(self:Any,*args:Any,**kwargs:Any)->Any:# This method will be owned by PoolConnectionProxy class.ifself._conisNone:raiseexceptions.InterfaceError('cannot call Connection.{}(): ''connection has been released back to the pool'.format(meth_name))meth=getattr(self._con.__class__,meth_name)returnmeth(self._con,*args,**kwargs)ifiscoroutine:compat.markcoroutinefunction(call_con_method)returncall_con_methodclassPoolConnectionProxy(connection._ConnectionProxy,metaclass=PoolConnectionProxyMeta,wrap=True):__slots__=('_con','_holder')def__init__(self,holder:PoolConnectionHolder,con:connection.Connection)->None:self._con=conself._holder=holdercon._set_proxy(self)def__getattr__(self,attr:str)->Any:# Proxy all unresolved attributes to the wrapped Connection object.returngetattr(self._con,attr)def_detach(self)->Optional[connection.Connection]:ifself._conisNone:returncon,self._con=self._con,Nonecon._set_proxy(None)returncondef__repr__(self)->str:ifself._conisNone:return'<{classname} [released]{id:#x}>'.format(classname=self.__class__.__name__,id=id(self))else:return'<{classname}{con!r}{id:#x}>'.format(classname=self.__class__.__name__,con=self._con,id=id(self))classPoolConnectionHolder:__slots__=('_con','_pool','_loop','_proxy','_max_queries','_setup','_max_inactive_time','_in_use','_inactive_callback','_timeout','_generation')def__init__(self,pool:"Pool",*,max_queries:float,setup:Optional[Callable[[PoolConnectionProxy],Awaitable[None]]],max_inactive_time:float,)->None:self._pool=poolself._con:Optional[connection.Connection]=Noneself._proxy:Optional[PoolConnectionProxy]=Noneself._max_queries=max_queriesself._max_inactive_time=max_inactive_timeself._setup=setupself._inactive_callback:Optional[Callable]=Noneself._in_use:Optional[asyncio.Future]=Noneself._timeout:Optional[float]=Noneself._generation:Optional[int]=Nonedefis_connected(self)->bool:returnself._conisnotNoneandnotself._con.is_closed()defis_idle(self)->bool:returnnotself._in_useasyncdefconnect(self)->None:ifself._conisnotNone:raiseexceptions.InternalClientError('PoolConnectionHolder.connect() called while another ''connection already exists')self._con=awaitself._pool._get_new_connection()self._generation=self._pool._generationself._maybe_cancel_inactive_callback()self._setup_inactive_callback()asyncdefacquire(self)->PoolConnectionProxy:ifself._conisNoneorself._con.is_closed():self._con=Noneawaitself.connect()elifself._generation!=self._pool._generation:# Connections have been expired, re-connect the holder.self._pool._loop.create_task(self._con.close(timeout=self._timeout))self._con=Noneawaitself.connect()self._maybe_cancel_inactive_callback()self._proxy=proxy=PoolConnectionProxy(self,self._con)ifself._setupisnotNone:try:awaitself._setup(proxy)except(Exception,asyncio.CancelledError)asex:# If a user-defined `setup` function fails, we don't# know if the connection is safe for re-use, hence# we close it.  A new connection will be created# when `acquire` is called again.try:# Use `close()` to close the connection gracefully.# An exception in `setup` isn't necessarily caused# by an IO or a protocol error.  close() will# do the necessary cleanup via _release_on_close().awaitself._con.close()finally:raiseexself._in_use=self._pool._loop.create_future()returnproxyasyncdefrelease(self,timeout:Optional[float])->None:ifself._in_useisNone:raiseexceptions.InternalClientError('PoolConnectionHolder.release() called on ''a free connection holder')ifself._con.is_closed():# When closing, pool connections perform the necessary# cleanup, so we don't have to do anything else here.returnself._timeout=Noneifself._con._protocol.queries_count>=self._max_queries:# The connection has reached its maximum utilization limit,# so close it.  Connection.close() will call _release().awaitself._con.close(timeout=timeout)returnifself._generation!=self._pool._generation:# The connection has expired because it belongs to# an older generation (Pool.expire_connections() has# been called.)awaitself._con.close(timeout=timeout)returntry:budget=timeoutifself._con._protocol._is_cancelling():# If the connection is in cancellation state,# wait for the cancellationstarted=time.monotonic()awaitcompat.wait_for(self._con._protocol._wait_for_cancellation(),budget)ifbudgetisnotNone:budget-=time.monotonic()-startedifself._pool._resetisnotNone:asyncwithcompat.timeout(budget):awaitself._con._reset()awaitself._pool._reset(self._con)else:awaitself._con.reset(timeout=budget)except(Exception,asyncio.CancelledError)asex:# If the `reset` call failed, terminate the connection.# A new one will be created when `acquire` is called# again.try:# An exception in `reset` is most likely caused by# an IO error, so terminate the connection.self._con.terminate()finally:raiseex# Free this connection holder and invalidate the# connection proxy.self._release()# Rearm the connection inactivity timer.self._setup_inactive_callback()asyncdefwait_until_released(self)->None:ifself._in_useisNone:returnelse:awaitself._in_useasyncdefclose(self)->None:ifself._conisnotNone:# Connection.close() will call _release_on_close() to# finish holder cleanup.awaitself._con.close()defterminate(self)->None:ifself._conisnotNone:# Connection.terminate() will call _release_on_close() to# finish holder cleanup.self._con.terminate()def_setup_inactive_callback(self)->None:ifself._inactive_callbackisnotNone:raiseexceptions.InternalClientError('pool connection inactivity timer already exists')ifself._max_inactive_time:self._inactive_callback=self._pool._loop.call_later(self._max_inactive_time,self._deactivate_inactive_connection)def_maybe_cancel_inactive_callback(self)->None:ifself._inactive_callbackisnotNone:self._inactive_callback.cancel()self._inactive_callback=Nonedef_deactivate_inactive_connection(self)->None:ifself._in_useisnotNone:raiseexceptions.InternalClientError('attempting to deactivate an acquired connection')ifself._conisnotNone:# The connection is idle and not in use, so it's fine to# use terminate() instead of close().self._con.terminate()# Must call clear_connection, because _deactivate_connection# is called when the connection is *not* checked out, and# so terminate() above will not call the below.self._release_on_close()def_release_on_close(self)->None:self._maybe_cancel_inactive_callback()self._release()self._con=Nonedef_release(self)->None:"""Release this connection holder."""ifself._in_useisNone:# The holder is not checked out.returnifnotself._in_use.done():self._in_use.set_result(None)self._in_use=None# Deinitialize the connection proxy.  All subsequent# operations on it will fail.ifself._proxyisnotNone:self._proxy._detach()self._proxy=None# Put ourselves back to the pool queue.self._pool._queue.put_nowait(self)
[docs]classPool:"""A connection pool. Connection pool can be used to manage a set of connections to the database. Connections are first acquired from the pool, then used, and then released back to the pool. Once a connection is released, it's reset to close all open cursors and other resources *except* prepared statements. Pools are created by calling :func:`~asyncpg.pool.create_pool`. """__slots__=('_queue','_loop','_minsize','_maxsize','_init','_connect','_reset','_connect_args','_connect_kwargs','_holders','_initialized','_initializing','_closing','_closed','_connection_class','_record_class','_generation','_setup','_max_queries','_max_inactive_connection_lifetime')def__init__(self,*connect_args,min_size,max_size,max_queries,max_inactive_connection_lifetime,connect=None,setup=None,init=None,reset=None,loop,connection_class,record_class,**connect_kwargs):iflen(connect_args)>1:warnings.warn("Passing multiple positional arguments to asyncpg.Pool ""constructor is deprecated and will be removed in ""asyncpg 0.17.0. The non-deprecated form is ""asyncpg.Pool(<dsn>, **kwargs)",DeprecationWarning,stacklevel=2)ifloopisNone:loop=asyncio.get_event_loop()self._loop=loopifmax_size<=0:raiseValueError('max_size is expected to be greater than zero')ifmin_size<0:raiseValueError('min_size is expected to be greater or equal to zero')ifmin_size>max_size:raiseValueError('min_size is greater than max_size')ifmax_queries<=0:raiseValueError('max_queries is expected to be greater than zero')ifmax_inactive_connection_lifetime<0:raiseValueError('max_inactive_connection_lifetime is expected to be greater ''or equal to zero')ifnotissubclass(connection_class,connection.Connection):raiseTypeError('connection_class is expected to be a subclass of ''asyncpg.Connection, got{!r}'.format(connection_class))ifnotissubclass(record_class,protocol.Record):raiseTypeError('record_class is expected to be a subclass of ''asyncpg.Record, got{!r}'.format(record_class))self._minsize=min_sizeself._maxsize=max_sizeself._holders=[]self._initialized=Falseself._initializing=Falseself._queue=Noneself._connection_class=connection_classself._record_class=record_classself._closing=Falseself._closed=Falseself._generation=0self._connect=connectifconnectisnotNoneelseconnection.connectself._connect_args=connect_argsself._connect_kwargs=connect_kwargsself._setup=setupself._init=initself._reset=resetself._max_queries=max_queriesself._max_inactive_connection_lifetime= \max_inactive_connection_lifetimeasyncdef_async__init__(self):ifself._initialized:returnselfifself._initializing:raiseexceptions.InterfaceError('pool is being initialized in another task')ifself._closed:raiseexceptions.InterfaceError('pool is closed')self._initializing=Truetry:awaitself._initialize()returnselffinally:self._initializing=Falseself._initialized=Trueasyncdef_initialize(self):self._queue=asyncio.LifoQueue(maxsize=self._maxsize)for_inrange(self._maxsize):ch=PoolConnectionHolder(self,max_queries=self._max_queries,max_inactive_time=self._max_inactive_connection_lifetime,setup=self._setup)self._holders.append(ch)self._queue.put_nowait(ch)ifself._minsize:# Since we use a LIFO queue, the first items in the queue will be# the last ones in `self._holders`. We want to pre-connect the# first few connections in the queue, therefore we want to walk# `self._holders` in reverse.# Connect the first connection holder in the queue so that# any connection issues are visible early.first_ch=self._holders[-1]# type: PoolConnectionHolderawaitfirst_ch.connect()ifself._minsize>1:connect_tasks=[]fori,chinenumerate(reversed(self._holders[:-1])):# `minsize - 1` because we already have first_chifi>=self._minsize-1:breakconnect_tasks.append(ch.connect())awaitasyncio.gather(*connect_tasks)
[docs]defis_closing(self):"""Return ``True`` if the pool is closing or is closed. .. versionadded:: 0.28.0 """returnself._closedorself._closing
[docs]defget_size(self):"""Return the current number of connections in this pool. .. versionadded:: 0.25.0 """returnsum(h.is_connected()forhinself._holders)
[docs]defget_min_size(self):"""Return the minimum number of connections in this pool. .. versionadded:: 0.25.0 """returnself._minsize
[docs]defget_max_size(self):"""Return the maximum allowed number of connections in this pool. .. versionadded:: 0.25.0 """returnself._maxsize
[docs]defget_idle_size(self):"""Return the current number of idle connections in this pool. .. versionadded:: 0.25.0 """returnsum(h.is_connected()andh.is_idle()forhinself._holders)
[docs]defset_connect_args(self,dsn=None,**connect_kwargs):r"""Set the new connection arguments for this pool. The new connection arguments will be used for all subsequent new connection attempts. Existing connections will remain until they expire. Use :meth:`Pool.expire_connections() <asyncpg.pool.Pool.expire_connections>` to expedite the connection expiry. :param str dsn: Connection arguments specified using as a single string in the following format: ``postgres://user:pass@host:port/database?option=value``. :param \*\*connect_kwargs: Keyword arguments for the :func:`~asyncpg.connection.connect` function. .. versionadded:: 0.16.0 """self._connect_args=[dsn]self._connect_kwargs=connect_kwargs
asyncdef_get_new_connection(self):con=awaitself._connect(*self._connect_args,loop=self._loop,connection_class=self._connection_class,record_class=self._record_class,**self._connect_kwargs,)ifnotisinstance(con,self._connection_class):good=self._connection_classgood_n=f'{good.__module__}.{good.__name__}'bad=type(con)ifbad.__module__=="builtins":bad_n=bad.__name__else:bad_n=f'{bad.__module__}.{bad.__name__}'raiseexceptions.InterfaceError("expected pool connect callback to return an instance of "f"'{good_n}', got "f"'{bad_n}'")ifself._initisnotNone:try:awaitself._init(con)except(Exception,asyncio.CancelledError)asex:# If a user-defined `init` function fails, we don't# know if the connection is safe for re-use, hence# we close it. A new connection will be created# when `acquire` is called again.try:# Use `close()` to close the connection gracefully.# An exception in `init` isn't necessarily caused# by an IO or a protocol error. close() will# do the necessary cleanup via _release_on_close().awaitcon.close()finally:raiseexreturncon
[docs]asyncdefexecute(self,query:str,*args,timeout:Optional[float]=None,)->str:"""Execute an SQL command (or commands). Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.execute() <asyncpg.connection.Connection.execute>`. .. versionadded:: 0.10.0 """asyncwithself.acquire()ascon:returnawaitcon.execute(query,*args,timeout=timeout)
[docs]asyncdefexecutemany(self,command:str,args,*,timeout:Optional[float]=None,):"""Execute an SQL *command* for each sequence of arguments in *args*. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.executemany() <asyncpg.connection.Connection.executemany>`. .. versionadded:: 0.10.0 """asyncwithself.acquire()ascon:returnawaitcon.executemany(command,args,timeout=timeout)
[docs]asyncdeffetch(self,query,*args,timeout=None,record_class=None)->list:"""Run a query and return the results as a list of :class:`Record`. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.fetch() <asyncpg.connection.Connection.fetch>`. .. versionadded:: 0.10.0 """asyncwithself.acquire()ascon:returnawaitcon.fetch(query,*args,timeout=timeout,record_class=record_class)
[docs]asyncdeffetchval(self,query,*args,column=0,timeout=None):"""Run a query and return a value in the first row. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.fetchval() <asyncpg.connection.Connection.fetchval>`. .. versionadded:: 0.10.0 """asyncwithself.acquire()ascon:returnawaitcon.fetchval(query,*args,column=column,timeout=timeout)
[docs]asyncdeffetchrow(self,query,*args,timeout=None,record_class=None):"""Run a query and return the first row. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.fetchrow() <asyncpg.connection.Connection.fetchrow>`. .. versionadded:: 0.10.0 """asyncwithself.acquire()ascon:returnawaitcon.fetchrow(query,*args,timeout=timeout,record_class=record_class)
[docs]asyncdeffetchmany(self,query,args,*,timeout=None,record_class=None):"""Run a query for each sequence of arguments in *args* and return the results as a list of :class:`Record`. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.fetchmany() <asyncpg.connection.Connection.fetchmany>`. .. versionadded:: 0.30.0 """asyncwithself.acquire()ascon:returnawaitcon.fetchmany(query,args,timeout=timeout,record_class=record_class)
[docs]asyncdefcopy_from_table(self,table_name,*,output,columns=None,schema_name=None,timeout=None,format=None,oids=None,delimiter=None,null=None,header=None,quote=None,escape=None,force_quote=None,encoding=None):"""Copy table contents to a file or file-like object. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.copy_from_table() <asyncpg.connection.Connection.copy_from_table>`. .. versionadded:: 0.24.0 """asyncwithself.acquire()ascon:returnawaitcon.copy_from_table(table_name,output=output,columns=columns,schema_name=schema_name,timeout=timeout,format=format,oids=oids,delimiter=delimiter,null=null,header=header,quote=quote,escape=escape,force_quote=force_quote,encoding=encoding)
[docs]asyncdefcopy_from_query(self,query,*args,output,timeout=None,format=None,oids=None,delimiter=None,null=None,header=None,quote=None,escape=None,force_quote=None,encoding=None):"""Copy the results of a query to a file or file-like object. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.copy_from_query() <asyncpg.connection.Connection.copy_from_query>`. .. versionadded:: 0.24.0 """asyncwithself.acquire()ascon:returnawaitcon.copy_from_query(query,*args,output=output,timeout=timeout,format=format,oids=oids,delimiter=delimiter,null=null,header=header,quote=quote,escape=escape,force_quote=force_quote,encoding=encoding)
[docs]asyncdefcopy_to_table(self,table_name,*,source,columns=None,schema_name=None,timeout=None,format=None,oids=None,freeze=None,delimiter=None,null=None,header=None,quote=None,escape=None,force_quote=None,force_not_null=None,force_null=None,encoding=None,where=None):"""Copy data to the specified table. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.copy_to_table() <asyncpg.connection.Connection.copy_to_table>`. .. versionadded:: 0.24.0 """asyncwithself.acquire()ascon:returnawaitcon.copy_to_table(table_name,source=source,columns=columns,schema_name=schema_name,timeout=timeout,format=format,oids=oids,freeze=freeze,delimiter=delimiter,null=null,header=header,quote=quote,escape=escape,force_quote=force_quote,force_not_null=force_not_null,force_null=force_null,encoding=encoding,where=where)
[docs]asyncdefcopy_records_to_table(self,table_name,*,records,columns=None,schema_name=None,timeout=None,where=None):"""Copy a list of records to the specified table using binary COPY. Pool performs this operation using one of its connections. Other than that, it behaves identically to :meth:`Connection.copy_records_to_table() <asyncpg.connection.Connection.copy_records_to_table>`. .. versionadded:: 0.24.0 """asyncwithself.acquire()ascon:returnawaitcon.copy_records_to_table(table_name,records=records,columns=columns,schema_name=schema_name,timeout=timeout,where=where)
[docs]defacquire(self,*,timeout=None):"""Acquire a database connection from the pool. :param float timeout: A timeout for acquiring a Connection. :return: An instance of :class:`~asyncpg.connection.Connection`. Can be used in an ``await`` expression or with an ``async with`` block. .. code-block:: python async with pool.acquire() as con: await con.execute(...) Or: .. code-block:: python con = await pool.acquire() try: await con.execute(...) finally: await pool.release(con) """returnPoolAcquireContext(self,timeout)
asyncdef_acquire(self,timeout):asyncdef_acquire_impl():ch=awaitself._queue.get()# type: PoolConnectionHoldertry:proxy=awaitch.acquire()# type: PoolConnectionProxyexcept(Exception,asyncio.CancelledError):self._queue.put_nowait(ch)raiseelse:# Record the timeout, as we will apply it by default# in release().ch._timeout=timeoutreturnproxyifself._closing:raiseexceptions.InterfaceError('pool is closing')self._check_init()iftimeoutisNone:returnawait_acquire_impl()else:returnawaitcompat.wait_for(_acquire_impl(),timeout=timeout)
[docs]asyncdefrelease(self,connection,*,timeout=None):"""Release a database connection back to the pool. :param Connection connection: A :class:`~asyncpg.connection.Connection` object to release. :param float timeout: A timeout for releasing the connection. If not specified, defaults to the timeout provided in the corresponding call to the :meth:`Pool.acquire() <asyncpg.pool.Pool.acquire>` method. .. versionchanged:: 0.14.0 Added the *timeout* parameter. """if(type(connection)isnotPoolConnectionProxyorconnection._holder._poolisnotself):raiseexceptions.InterfaceError('Pool.release() received invalid connection: ''{connection!r} is not a member of this pool'.format(connection=connection))ifconnection._conisNone:# Already released, do nothing.returnself._check_init()# Let the connection do its internal housekeeping when its released.connection._con._on_release()ch=connection._holderiftimeoutisNone:timeout=ch._timeout# Use asyncio.shield() to guarantee that task cancellation# does not prevent the connection from being returned to the# pool properly.returnawaitasyncio.shield(ch.release(timeout))
[docs]asyncdefclose(self):"""Attempt to gracefully close all connections in the pool. Wait until all pool connections are released, close them and shut down the pool. If any error (including cancellation) occurs in ``close()`` the pool will terminate by calling :meth:`Pool.terminate() <pool.Pool.terminate>`. It is advisable to use :func:`python:asyncio.wait_for` to set a timeout. .. versionchanged:: 0.16.0 ``close()`` now waits until all pool connections are released before closing them and the pool. Errors raised in ``close()`` will cause immediate pool termination. """ifself._closed:returnself._check_init()self._closing=Truewarning_callback=Nonetry:warning_callback=self._loop.call_later(60,self._warn_on_long_close)release_coros=[ch.wait_until_released()forchinself._holders]awaitasyncio.gather(*release_coros)close_coros=[ch.close()forchinself._holders]awaitasyncio.gather(*close_coros)except(Exception,asyncio.CancelledError):self.terminate()raisefinally:ifwarning_callbackisnotNone:warning_callback.cancel()self._closed=Trueself._closing=False
def_warn_on_long_close(self):logger.warning('Pool.close() is taking over 60 seconds to complete. ''Check if you have any unreleased connections left. ''Use asyncio.wait_for() to set a timeout for ''Pool.close().')
[docs]defterminate(self):"""Terminate all connections in the pool."""ifself._closed:returnself._check_init()forchinself._holders:ch.terminate()self._closed=True
[docs]asyncdefexpire_connections(self):"""Expire all currently open connections. Cause all currently open connections to get replaced on the next :meth:`~asyncpg.pool.Pool.acquire()` call. .. versionadded:: 0.16.0 """self._generation+=1
def_check_init(self):ifnotself._initialized:ifself._initializing:raiseexceptions.InterfaceError('pool is being initialized, but not yet ready: ''likely there is a race between creating a pool and ''using it')raiseexceptions.InterfaceError('pool is not initialized')ifself._closed:raiseexceptions.InterfaceError('pool is closed')def_drop_statement_cache(self):# Drop statement cache for all connections in the pool.forchinself._holders:ifch._conisnotNone:ch._con._drop_local_statement_cache()def_drop_type_cache(self):# Drop type codec cache for all connections in the pool.forchinself._holders:ifch._conisnotNone:ch._con._drop_local_type_cache()def__await__(self):returnself._async__init__().__await__()asyncdef__aenter__(self):awaitself._async__init__()returnselfasyncdef__aexit__(self,*exc):awaitself.close()
classPoolAcquireContext:__slots__=('timeout','connection','done','pool')def__init__(self,pool:Pool,timeout:Optional[float])->None:self.pool=poolself.timeout=timeoutself.connection=Noneself.done=Falseasyncdef__aenter__(self):ifself.connectionisnotNoneorself.done:raiseexceptions.InterfaceError('a connection is already acquired')self.connection=awaitself.pool._acquire(self.timeout)returnself.connectionasyncdef__aexit__(self,exc_type:Optional[Type[BaseException]]=None,exc_val:Optional[BaseException]=None,exc_tb:Optional[TracebackType]=None,)->None:self.done=Truecon=self.connectionself.connection=Noneawaitself.pool.release(con)def__await__(self):self.done=Truereturnself.pool._acquire(self.timeout).__await__()
[docs]defcreate_pool(dsn=None,*,min_size=10,max_size=10,max_queries=50000,max_inactive_connection_lifetime=300.0,connect=None,setup=None,init=None,reset=None,loop=None,connection_class=connection.Connection,record_class=protocol.Record,**connect_kwargs):r"""Create a connection pool. Can be used either with an ``async with`` block: .. code-block:: python async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: await pool.fetch('SELECT 1') Or to perform multiple operations on a single connection: .. code-block:: python async with asyncpg.create_pool(user='postgres', command_timeout=60) as pool: async with pool.acquire() as con: await con.execute(''' CREATE TABLE names ( id serial PRIMARY KEY, name VARCHAR (255) NOT NULL) ''') await con.fetch('SELECT 1') Or directly with ``await`` (not recommended): .. code-block:: python pool = await asyncpg.create_pool(user='postgres', command_timeout=60) con = await pool.acquire() try: await con.fetch('SELECT 1') finally: await pool.release(con) .. warning:: Prepared statements and cursors returned by :meth:`Connection.prepare() <asyncpg.connection.Connection.prepare>` and :meth:`Connection.cursor() <asyncpg.connection.Connection.cursor>` become invalid once the connection is released. Likewise, all notification and log listeners are removed, and ``asyncpg`` will issue a warning if there are any listener callbacks registered on a connection that is being released to the pool. :param str dsn: Connection arguments specified using as a single string in the following format: ``postgres://user:pass@host:port/database?option=value``. :param \*\*connect_kwargs: Keyword arguments for the :func:`~asyncpg.connection.connect` function. :param Connection connection_class: The class to use for connections. Must be a subclass of :class:`~asyncpg.connection.Connection`. :param type record_class: If specified, the class to use for records returned by queries on the connections in this pool. Must be a subclass of :class:`~asyncpg.Record`. :param int min_size: Number of connection the pool will be initialized with. :param int max_size: Max number of connections in the pool. :param int max_queries: Number of queries after a connection is closed and replaced with a new connection. :param float max_inactive_connection_lifetime: Number of seconds after which inactive connections in the pool will be closed. Pass ``0`` to disable this mechanism. :param coroutine connect: A coroutine that is called instead of :func:`~asyncpg.connection.connect` whenever the pool needs to make a new connection. Must return an instance of type specified by *connection_class* or :class:`~asyncpg.connection.Connection` if *connection_class* was not specified. :param coroutine setup: A coroutine to prepare a connection right before it is returned from :meth:`Pool.acquire()`. An example use case would be to automatically set up notifications listeners for all connections of a pool. :param coroutine init: A coroutine to initialize a connection when it is created. An example use case would be to setup type codecs with :meth:`Connection.set_builtin_type_codec() <\ asyncpg.connection.Connection.set_builtin_type_codec>` or :meth:`Connection.set_type_codec() <\ asyncpg.connection.Connection.set_type_codec>`. :param coroutine reset: A coroutine to reset a connection before it is returned to the pool by :meth:`Pool.release()`. The function is supposed to reset any changes made to the database session so that the next acquirer gets the connection in a well-defined state. The default implementation calls :meth:`Connection.reset() <\ asyncpg.connection.Connection.reset>`, which runs the following:: SELECT pg_advisory_unlock_all(); CLOSE ALL; UNLISTEN *; RESET ALL; The exact reset query is determined by detected server capabilities, and a custom *reset* implementation can obtain the default query by calling :meth:`Connection.get_reset_query() <\ asyncpg.connection.Connection.get_reset_query>`. :param loop: An asyncio event loop instance. If ``None``, the default event loop will be used. :return: An instance of :class:`~asyncpg.pool.Pool`. .. versionchanged:: 0.10.0 An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any attempted operation on a released connection. .. versionchanged:: 0.13.0 An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any attempted operation on a prepared statement or a cursor created on a connection that has been released to the pool. .. versionchanged:: 0.13.0 An :exc:`~asyncpg.exceptions.InterfaceWarning` will be produced if there are any active listeners (added via :meth:`Connection.add_listener() <asyncpg.connection.Connection.add_listener>` or :meth:`Connection.add_log_listener() <asyncpg.connection.Connection.add_log_listener>`) present on the connection at the moment of its release to the pool. .. versionchanged:: 0.22.0 Added the *record_class* parameter. .. versionchanged:: 0.30.0 Added the *connect* and *reset* parameters. """returnPool(dsn,connection_class=connection_class,record_class=record_class,min_size=min_size,max_size=max_size,max_queries=max_queries,loop=loop,connect=connect,setup=setup,init=init,reset=reset,max_inactive_connection_lifetime=max_inactive_connection_lifetime,**connect_kwargs,)