22.Concurrent Programming with asyncio and Pipelining
Concurrent Programming with asyncio andOracle Database Pipeliningsignificantly enhances the overall performance and responsiveness ofapplications.
22.1.Concurrent Programming with asyncio
TheAsynchronous I/O (asyncio) Python library can be used with python-oracledb Thin mode forconcurrent programming. This library allows you to run operations in parallel,for example to run a long-running operation in the background without blockingthe rest of the application. With asyncio, you can easily write concurrent codewith theasync
andawait
syntax. See Python’sDeveloping with asyncio documentation foruseful tips.
The python-oracledb asynchronous API is a part of the standard python-oracledbmodule. All the synchronous methods that require a round-trip to the databasehave corresponding asynchronous counterparts. You can choose whether to use thesynchronous API or the asynchronous API in your code. It is recommended tonot use both at the same time in your application.
The asynchronous API classes areAsyncConnection,AsyncConnectionPool,AsyncCursor, andAsyncLOB.
Unlike their synchronous counterparts, asynchronous connections and cursors arenot automatically closed at the end of scope. These asynchronous resourcesshould either be explicitly closed, or have been initially created via acontext managerwith
block.
Note
Concurrent programming with asyncio is only supported inpython-oracledb Thin mode.
22.1.1.Connecting to Oracle Database Asynchronously
With python-oracledb, you can create an asynchronous connection to OracleDatabase using eitherstandalone connections orpooled connections. (For discussion of synchronousprogramming, seeConnecting to Oracle Database.)
22.1.1.1.Standalone Connections
Standalone connections are useful for applications that need only a singleconnection to a database.
An asynchronous standalone connection can be created by calling theasynchronous methodoracledb.connect_async()
which establishes aconnection to the database and returns anAsyncConnection Object. Once connections are created, all objects created by theseconnections follow the asynchronous programming model. Subject to appropriateuse ofawait
for calls that require a round-trip to the database,asynchronous connections are used in the same way that synchronous programs useStandalone Connections.
Asynchronous connections should be released when they are no longer needed toensure Oracle Database gracefully cleans up. A preferred method is to use anasynchronous context manager. For example:
importasyncioimportoracledbasyncdefmain():asyncwithoracledb.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")asconnection:withconnection.cursor()ascursor:awaitcursor.execute("select user from dual")asyncforresultincursor:print(result)asyncio.run(main())
This code ensures that once the block is completed, the connection is closedand resources are reclaimed by the database. In addition, any attempt to usethe variableconnection
outside of the block will fail.
If you do not use a context manager, you should explicitly close connectionswhen they are no longer needed, for example:
connection=awaitoracle.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")cursor=connection.cursor()awaitcursor.execute("select user from dual")asyncforresultincursor:print(result)cursor.close()awaitconnection.close()
Note asynchronous connections are not automatically closed at the end ofscope. This is different to synchronous connection behavior.
22.1.1.2.Connection Pools
Connection pooling allows applications to create and maintain a pool of openconnections to the database. Connection pooling is important for performanceand scalability when applications need to handle a large number of users who dodatabase work for short periods of time but have relatively long periods whenthe connections are not needed. The high availability features of pools alsomake small pools useful for applications that want a few connections availablefor infrequent use and requires them to be immediately usable when acquired.
An asynchronous connection pool can be created by callingoracledb.create_pool_async()
which returns anAsyncConnectionPoolObject. Note that this method issynchronous and does notuseawait
. Once the pool has been created, your application can get aconnection from it by callingAsyncConnectionPool.acquire()
. Afteryour application has used a connection, it should be released back to the poolto make it available for other users. This can be done by explicitly closingthe connection or by using an asynchronous context manager, for example:
importasyncioimportoracledbasyncdefmain():pool=oracle.create_pool_async(user="hr",password=userpwd,dsn="localhost/orclpdb",min=1,max=4,increment=1)asyncwithpool.acquire()asconnection:withconnection.cursor()ascursor:awaitcursor.execute("select user from dual")asyncforresultincursor:print(result)awaitpool.close()asyncio.run(main())
22.1.2.Executing SQL Using Asynchronous Methods
This section covers executing SQL using the asynchronous programming model.For discussion of synchronous programming, seeExecuting SQL.
Your application communicates with Oracle Database by executing SQLstatements. Statements such as queries (statements beginning with SELECT orWITH), Data Manipulation Language (DML), and Data Definition Language (DDL) areexecuted using the asynchronous methodsAsyncCursor.execute()
orAsyncCursor.executemany()
. Rows can be iterated over, or fetched usingone of the methodsAsyncCursor.fetchone()
,AsyncCursor.fetchone()
,AsyncCursor.fetchmany()
, orAsyncCursor.fetchall()
. Note that explicitly opened asynchronouscursors are not automatically closed at the end of scope. This is different tosynchronous behavior. Asynchronous cursors should either be explicitly closed,or have been initially created via acontext managerwith
block.
You can also use shortcut methods on theAPI: AsyncConnection Objects object such asAsyncConnection.execute()
orAsyncConnection.executemany()
. Rows can be fetched using one of theshortcut methodsAsyncConnection.fetchone()
,AsyncConnection.fetchmany()
,AsyncConnection.fetchall()
,AsyncConnection.fetch_df_all()
, orAsyncConnection.fetch_df_batches()
.
An example of usingAsyncConnection.fetchall()
:
importasyncioimportoracledbasyncdefmain():asyncwithoracledb.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")asconnection:res=awaitconnection.fetchall("select * from locations")print(res)asyncio.run(main())
An example that uses asyncio for parallelization and shows the execution ofmultiple coroutines:
importasyncioimportoracledb# Number of coroutines to runCONCURRENCY=5# Query the unique session identifier/serial number combination of a connectionSQL="""SELECT UNIQUE CURRENT_TIMESTAMP AS CT, sid||'-'||serial# AS SIDSER FROM V$SESSION_CONNECT_INFO WHERE sid = SYS_CONTEXT('USERENV', 'SID')"""# Show the unique session identifier/serial number of each connection that the# pool opensasyncdefinit_session(connection,requested_tag):res=awaitconnection.fetchone(SQL)print(res[0].strftime("%H:%M:%S.%f"),'- init_session with SID-SERIAL#',res[1])# The coroutine simply shows the session identifier/serial number of the# connection returned by the pool.acquire() callasyncdefquery(pool):asyncwithpool.acquire()asconnection:awaitconnection.callproc("dbms_session.sleep",[1])res=awaitconnection.fetchone(SQL)print(res[0].strftime("%H:%M:%S.%f"),'- query with SID-SERIAL#',res[1])asyncdefmain():pool=oracledb.create_pool_async(user="hr",password=userpwd,dsn="localhost/orclpdb",min=1,max=CONCURRENCY,session_callback=init_session)coroutines=[query(pool)foriinrange(CONCURRENCY)]awaitasyncio.gather(*coroutines)awaitpool.close()asyncio.run(main())
When you run this, you will see that multiple connections (identified by theunique Session Identifier and Serial Number combination) are opened and areused byquery()
. For example:
12:09:29.711525-init_sessionwithSID-SERIAL# 36-3809612:09:29.909769-init_sessionwithSID-SERIAL# 33-5622512:09:30.085537-init_sessionwithSID-SERIAL# 14-3143112:09:30.257232-init_sessionwithSID-SERIAL# 285-4027012:09:30.434538-init_sessionwithSID-SERIAL# 282-3260812:09:30.730166-querywithSID-SERIAL# 36-3809612:09:30.933957-querywithSID-SERIAL# 33-5622512:09:31.115008-querywithSID-SERIAL# 14-3143112:09:31.283593-querywithSID-SERIAL# 285-4027012:09:31.457474-querywithSID-SERIAL# 282-32608
Your results may vary depending how fast your environment is.
Seeasync_gather.py for a runnable example.
22.1.3.Managing Transactions Using Asynchronous Methods
This section covers managing transactions using the asynchronous programmingmodel. For discussion of synchronous programming, seeManaging Transactions.
WhenAsyncCursor.execute()
orAsyncCursor.executemany()
executes a SQL statement, a transaction is started or continued. By default,python-oracledb does not commit this transaction to the database. The methodsAsyncConnection.commit()
andAsyncConnection.rollback()
methods can be used to explicitly commit or rollback a transaction:
asyncdefmain():asyncwithoracledb.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")asconnection:withconnection.cursorascursor:awaitcursor.execute("INSERT INTO mytab (name) VALUES ('John')")awaitconnection.commit()
When a database connection is closed, such as withAsyncConnection.close()
, or when variables referencing the connectiongo out of scope, any uncommitted transaction will be rolled back.
An alternative way to commit is to set the attributeAsyncConnection.autocommit
of the connection toTrue
. Thisensures allDML statements (INSERT, UPDATE, and so on) arecommitted as they are executed.
Note that irrespective of the autocommit value, Oracle Database will alwayscommit an open transaction when a DDL statement is executed.
When executing multiple DML statements that constitute a single transaction, itis recommended to use autocommit mode only for the last DML statement in thesequence of operations. Unnecessarily committing causes extra database load,and can destroy transactional consistency.
22.2.Pipelining Database Operations
Pipelining allows an application to send multiple, independent statements toOracle Database with one call. The database can be kept busy without waitingfor the application to receive a result set and send the next statement. Whilethe database processes the pipeline of statements, the application can continuewith non-database work. When the database has executed all the pipelinedoperations, their results are returned to the application.
Pipelined operations are executed sequentially by the database. They do notexecute concurrently. It is local tasks that can be executed at the same timethe database is working.
Effective use of Oracle Database Pipelining can increase the responsiveness ofan application and improve overall system throughput. Pipelining is useful whenmany small operations are being performed in rapid succession. It is mostbeneficial when the network to the database is slow. This is because of itsreduction inround-trips compared with those required ifthe equivalent SQL statements were individually executed with calls likeAsyncCursor.execute()
.
Pipelining is only supported in python-oracledb Thin mode withasyncio.
SeeOracle Call Interface Pipelining for more informationabout Oracle Database Pipelining.
Note
True pipelining only occurs when you are connected to Oracle Database 23ai.
When you connect to an older database, operations are sequentiallyexecuted by python-oracledb. Each operation concludes before the next issent to the database. There is no reduction in round-trips and noperformance benefit. This usage is only recommended for code portabilitysuch as when preparing for a database upgrade.
22.2.1.Using Pipelines
To create apipeline to process a set of databaseoperations, useoracledb.create_pipeline()
.
pipeline=oracledb.create_pipeline()
You can then add various operations to the pipeline usingadd_callfunc()
,add_callproc()
,add_commit()
,add_execute()
,add_executemany()
,add_fetchall()
,add_fetchmany()
, andadd_fetchone()
. Forexample:
pipeline.add_execute("insert into mytable (mycol) values (1234)")pipeline.add_fetchone("select user from dual")pipeline.add_fetchmany("select employee_id from employees",num_rows=20)
Note that queries that return results do not calladd_execute()
.
Only one set of query results can be returned from each query operation. Forexampleadd_fetchmany()
will only fetch the first set ofquery records, up to the limit specified by the method’snum_rows
parameter. Similarly foradd_fetchone()
only the first rowcan ever be fetched. It is not possible to fetch more data from theseoperations. To prevent the database processing rows that cannot be fetched bythe application, consider adding appropriateWHERE
conditions or using aFETCHNEXT
clause in the statement, seeLimiting Rows.
Query results orOUT binds from one operation cannot be passed tosubsequent operations in the same pipeline.
To execute the pipeline, callAsyncConnection.run_pipeline()
.
results=awaitconnection.run_pipeline(pipeline)
The operations are all sent to the database and executed. The method returns alist ofPipelineOpResult objects, one entry peroperation. The objects contain information about the execution of the relevantoperation, such as any error number, PL/SQL function return value, or any queryrows and column metadata.
TheConnection.call_timeout
value has no effect on pipeline operations.To limit the time for a pipeline, use anasyncio timeout, availablefrom Python 3.11.
To tune fetching of rows withPipeline.add_fetchall()
, setdefaults.arraysize
or pass thearraysize
parameter.
22.2.1.1.Pipelining Examples
An example of pipelining is:
importasyncioimportoracledbasyncdefmain():# Create a pipeline and define the operationspipeline=oracledb.create_pipeline()pipeline.add_fetchone("select temperature from weather")pipeline.add_fetchall("select name from friends where active = true")pipeline.add_fetchmany("select story from news order by popularity",num_rows=5)connection=awaitoracle.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")# Run the operations in the pipelineresult_1,result_2,result_3=awaitconnection.run_pipeline(pipeline)# Print the database responsesprint("Current temperature:",result_1.rows)print("Active friends:",result_2.rows)print("Top news stories:",result_3.rows)awaitconnection.close()asyncio.run(main())
Seepipelining_basic.pyfor a runnable example.
To allow an application to continue with non-database work before processingany responses from the database, use code similar to:
asyncdefrun_thing_one():return"thing_one"asyncdefrun_thing_two():return"thing_two"asyncdefmain():connection=awaitoracle.connect_async(user="hr",password=userpwd,dsn="localhost/orclpdb")pipeline=oracledb.create_pipeline()pipeline.add_fetchone("select user from dual")pipeline.add_fetchone("select sysdate from dual")# Run the pipeline and non-database operations concurrentlyreturn_values=awaitasyncio.gather(run_thing_one(),run_thing_two(),connection.run_pipeline(pipeline))forrinreturn_values:ifisinstance(r,list):# the pipeline return listforresultinr:ifresult.rows:forrowinresult.rows:print(*row,sep="\t")else:print(r)# a local operation resultawaitconnection.close()asyncio.run(main())
Output will be like:
thing_onething_twoHR2024-10-2903:34:43
Seepipelining_parallel.pyfor a runnable example.
22.2.2.Using OUT Binds with Pipelines
To fetchOUT binds from executed statements, create an explicitcursor and useCursor.var()
. These variables are associated with theconnection and can be used by the other cursors created internally for eachpipelined operation. For example:
cursor=connection.cursor()v1=cursor.var(oracledb.DB_TYPE_BOOLEAN)v2=cursor.var(oracledb.DB_TYPE_VARCHAR)pipeline=oracledb.create_pipeline()pipeline.add_execute(""" begin :1 := true; :2 := 'Python'; end; """,[v1,v2])pipeline.add_fetchone("select 1234 from dual")results=awaitconnection.run_pipeline(pipeline)forrinresults:ifr.rows:print(r.rows)print(v1.getvalue(),v2.getvalue())
This prints:
[(1234,)]TruePython
OUT binds from one operation cannot be used in subsequent operations. Forexample the following would print onlyTrue
because the WHERE condition ofthe SQL statement is not matched:
cursor=connection.cursor()v1=cursor.var(oracledb.DB_TYPE_BOOLEAN)pipeline=oracledb.create_pipeline()pipeline.add_execute(""" begin :1 := TRUE; end; """,[v1])pipeline.add_fetchone("select 1234 from dual where :1 = TRUE",[v1])results=awaitconnection.run_pipeline(pipeline)forrinresults:ifr.rows:print(r.rows)print(v1.getvalue())# prints True
22.2.3.Pipeline Error Handling
Thecontinue_on_error
parameter toAsyncConnection.run_pipeline()
determines whether subsequent operations should continue to run after a failurein one operation has occurred. When set to the default value False, if anyerror is returned in any operation in the pipeline then the database terminatesall subsequent operations.
For example:
# Stop on errorpipeline.add_fetchall("select 1234 from does_not_exist")pipeline.add_fetchone("select 5678 from dual")r1,r2=awaitconnection.run_pipeline(pipeline)
will only execute the first operation and will throw the failure message:
oracledb.exceptions.DatabaseError:ORA-00942:tableorview"HR"."DOES_NOT_EXIST"doesnotexistHelp:https://docs.oracle.com/error-help/db/ora-00942/
whereas this code:
# Continue on errorpipeline.add_fetchall("select 1234 from does_not_exist")pipeline.add_fetchone("select 5678 from dual")r1,r2=awaitconnection.run_pipeline(pipeline,continue_on_error=True)print(r1.error)print(r2.rows)
will execute all operations and will display:
ORA-00942:tableorview"HR"."DOES_NOT_EXIST"doesnotexistHelp:https://docs.oracle.com/error-help/db/ora-00942/[(5678,)]
PL/SQL Compilation Warnings
PL/SQL Compilation Warnings can be identified by checking thePipelineOpResultAttributePipelineOpResult.warning
. Forexample:
pipeline.add_execute("""create or replace procedure myproc as begin bogus; end;""")(result,)=awaitconnection.run_pipeline(pipeline)print(result.warning.full_code)print(result.warning)
will print:
DPY-7000DPY-7000:creationsucceededwithcompilationerrors
Seepipelining_error.pyfor a runnable example showing warnings and errors.
22.2.4.Pipeline Cursor Usage
For each operation added to a pipeline, with the exception ofPipeline.add_commit()
, a cursor will be opened whenAsyncConnection.run_pipeline()
is called. For example, the followingcode will open two cursors:
pipeline=oracledb.create_pipeline()pipeline.add_execute("insert into t1 (c1) values (1234)")pipeline.add_fetchone("select user from dual")awaitconnection.run_pipeline(pipeline)
Make sure your pipeline length does not exceed your cursor limit. Set thedatabase parameteropen_cursorsappropriately.
22.2.5.Pipeline Round-trips
The complete set of operations in a pipeline will be performed in a singleround-trip whenAsyncConnection.run_pipeline()
iscalled, with the following exceptions:
Queries that containLOBs require an additionalround-trip
Queries that containDbObject values may require multipleround-trips
Queries with
add_fetchall()
may require multipleround-trips
The reduction in round-trips is the significant contributor to pipelining’sperformance improvement in comparison to explicitly executing the equivalentSQL statements individually. With high-speed networks there may be littleperformance benefit to using pipelining, however the database and networkefficiencies can help overall system scalability.
Note that the traditional method of monitoring round-trips by taking snapshotsof the V$SESSTAT view is not accurate for pipelines.