asyncpg Usage

The interaction with the database normally starts with a call toconnect(), which establishesa new database session and returns a newConnection instance,which provides methods to run queries and manage transactions.

importasyncioimportasyncpgimportdatetimeasyncdefmain():# Establish a connection to an existing database named "test"# as a "postgres" user.conn=awaitasyncpg.connect('postgresql://postgres@localhost/test')# Execute a statement to create a new table.awaitconn.execute('''        CREATE TABLE users(            id serial PRIMARY KEY,            name text,            dob date        )    ''')# Insert a record into the created table.awaitconn.execute('''        INSERT INTO users(name, dob) VALUES($1, $2)    ''','Bob',datetime.date(1984,3,1))# Select a row from the table.row=awaitconn.fetchrow('SELECT * FROM users WHERE name = $1','Bob')# *row* now contains# asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1))# Close the connection.awaitconn.close()asyncio.run(main())

Note

asyncpg uses the native PostgreSQL syntax for query arguments:$n.

Type Conversion

asyncpg automatically converts PostgreSQL types to the corresponding Pythontypes and vice versa. All standard data types are supported out of the box,including arrays, composite types, range types, enumerations and anycombination of them. It is possible to supply codecs for non-standardtypes or override standard codecs. SeeCustom Type Conversions formore information.

The table below shows the correspondence between PostgreSQL and Python types.

PostgreSQL Type

Python Type

anyarray

list

anyenum

str

anyrange

asyncpg.Range,tuple

anymultirange

list[asyncpg.Range],list[tuple][1]

record

asyncpg.Record,tuple,Mapping

bit,varbit

asyncpg.BitString

bool

bool

box

asyncpg.Box

bytea

bytes

char,name,varchar,text,xml

str

cidr

ipaddress.IPv4Network,ipaddress.IPv6Network

inet

ipaddress.IPv4Interface,ipaddress.IPv6Interface,ipaddress.IPv4Address,ipaddress.IPv6Address[2]

macaddr

str

circle

asyncpg.Circle

date

datetime.date

time

offset-naïvedatetime.time

timewithtimezone

offset-awaredatetime.time

timestamp

offset-naïvedatetime.datetime

timestampwithtimezone

offset-awaredatetime.datetime

interval

datetime.timedelta

float,doubleprecision

float[3]

smallint,integer,bigint

int

numeric

Decimal

json,jsonb

str

line

asyncpg.Line

lseg

asyncpg.LineSegment

money

str

path

asyncpg.Path

point

asyncpg.Point

polygon

asyncpg.Polygon

uuid

uuid.UUID

tid

tuple

All other types are encoded and decoded as text by default.

[1]

Since version 0.25.0

[2]

Prior to version 0.20.0, asyncpg erroneously treatedinet valueswith prefix asIPvXNetwork instead ofIPvXInterface.

[3]

Inexact single-precisionfloat values may have a differentrepresentation when decoded into a Python float. This is inherentto the implementation of limited-precision floating point types.If you need the decimal representation to match, cast the expressiontodouble ornumeric in your query.

Custom Type Conversions

asyncpg allows defining custom type conversion functions both for standardand user-defined types using theConnection.set_type_codec() andConnection.set_builtin_type_codec() methods.

Example: automatic JSON conversion

The example below shows how to configure asyncpg to encode and decodeJSON values using thejson module.

importasyncioimportasyncpgimportjsonasyncdefmain():conn=awaitasyncpg.connect()try:awaitconn.set_type_codec('json',encoder=json.dumps,decoder=json.loads,schema='pg_catalog')data={'foo':'bar','spam':1}res=awaitconn.fetchval('SELECT $1::json',data)finally:awaitconn.close()asyncio.run(main())

Example: complex types

The example below shows how to configure asyncpg to encode and decodePythoncomplex values to a custom compositetype in PostgreSQL.

importasyncioimportasyncpgasyncdefmain():conn=awaitasyncpg.connect()try:awaitconn.execute('''            CREATE TYPE mycomplex AS (                r float,                i float            );''')awaitconn.set_type_codec('complex',encoder=lambdax:(x.real,x.imag),decoder=lambdat:complex(t[0],t[1]),format='tuple',)res=awaitconn.fetchval('SELECT $1::mycomplex',(1+2j))finally:awaitconn.close()asyncio.run(main())

Example: automatic conversion of PostGIS types

The example below shows how to configure asyncpg to encode and decodethe PostGISgeometry type. It works for any Python object thatconforms to thegeo interface specification and relies onShapely,although any library that supports reading and writing the WKB formatwill work.

importasyncioimportasyncpgimportshapely.geometryimportshapely.wkbfromshapely.geometry.baseimportBaseGeometryasyncdefmain():conn=awaitasyncpg.connect()try:defencode_geometry(geometry):ifnothasattr(geometry,'__geo_interface__'):raiseTypeError('{g} does not conform to ''the geo interface'.format(g=geometry))shape=shapely.geometry.shape(geometry)returnshapely.wkb.dumps(shape)defdecode_geometry(wkb):returnshapely.wkb.loads(wkb)awaitconn.set_type_codec('geometry',# also works for 'geography'encoder=encode_geometry,decoder=decode_geometry,format='binary',)data=shapely.geometry.Point(-73.985661,40.748447)res=awaitconn.fetchrow('''SELECT 'Empire State Building' AS name,                      $1::geometry            AS coordinates            ''',data)print(res)finally:awaitconn.close()asyncio.run(main())

Example: decoding numeric columns as floats

By default asyncpg decodes numeric columns as PythonDecimal instances. The example belowshows how to instruct asyncpg to use floats instead.

importasyncioimportasyncpgasyncdefmain():conn=awaitasyncpg.connect()try:awaitconn.set_type_codec('numeric',encoder=str,decoder=float,schema='pg_catalog',format='text')res=awaitconn.fetchval("SELECT $1::numeric",11.123)print(res,type(res))finally:awaitconn.close()asyncio.run(main())

Example: decoding hstore values

hstore is an extension data type used for storing key/value pairs.asyncpg includes a codec to decode and encode hstore values asdictobjects. Becausehstore is not a builtin type, the codec mustbe registered on a connection usingConnection.set_builtin_type_codec():

importasyncpgimportasyncioasyncdefrun():conn=awaitasyncpg.connect()# Assuming the hstore extension exists in the public schema.awaitconn.set_builtin_type_codec('hstore',codec_name='pg_contrib.hstore')result=awaitconn.fetchval("SELECT 'a=>1,b=>2,c=>NULL'::hstore")assertresult=={'a':'1','b':'2','c':None}asyncio.run(run())

Transactions

To create transactions, theConnection.transaction() methodshould be used.

The most common way to use transactions is through anasyncwith statement:

asyncwithconnection.transaction():awaitconnection.execute("INSERT INTO mytable VALUES(1, 2, 3)")

Note

When not in an explicit transaction block, any changes to the databasewill be applied immediately. This is also known asauto-commit.

See theTransactions API documentation for more information.

Connection Pools

For server-type type applications, that handle frequent requests and needthe database connection for a short period time while handling a request,the use of a connection pool is recommended. asyncpg provides an advancedpool implementation, which eliminates the need to use an external connectionpooler such as PgBouncer.

To create a connection pool, use theasyncpg.create_pool() function.The resultingPool object can then be usedto borrow connections from the pool.

Below is an example of howasyncpg can be used to implement a simpleWeb service that computes the requested power of two.

importasyncioimportasyncpgfromaiohttpimportwebasyncdefhandle(request):"""Handle incoming requests."""pool=request.app['pool']power=int(request.match_info.get('power',10))# Take a connection from the pool.asyncwithpool.acquire()asconnection:# Open a transaction.asyncwithconnection.transaction():# Run the query passing the request argument.result=awaitconnection.fetchval('select 2 ^ $1',power)returnweb.Response(text="2 ^{} is{}".format(power,result))asyncdefinit_db(app):"""Initialize a connection pool."""app['pool']=awaitasyncpg.create_pool(database='postgres',user='postgres')yieldawaitapp['pool'].close()definit_app():"""Initialize the application server."""app=web.Application()# Create a database contextapp.cleanup_ctx.append(init_db)# Configure service routesapp.router.add_route('GET','/{power:\d+}',handle)app.router.add_route('GET','/',handle)returnappapp=init_app()web.run_app(app)

SeeConnection Pools API documentation for more information.