Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit3453c52

Browse files
committed
Add support for asynchronous working
1 parent1444759 commit3453c52

File tree

2 files changed

+335
-293
lines changed

2 files changed

+335
-293
lines changed

‎testgres/testgres.py

Lines changed: 100 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,10 @@
4545
fromenumimportEnum
4646
fromdistutils.versionimportLooseVersion
4747

48-
# Try to use psycopg2 by default. If psycopg2 isn't available then use
49-
# pg8000 which is slower but much more portable because uses only
50-
# pure-Python code
5148
try:
52-
importpsycopg2aspglib
49+
importasyncpgaspglib
5350
exceptImportError:
54-
try:
55-
importpg8000aspglib
56-
exceptImportError:
57-
raiseImportError("You must have psycopg2 or pg8000 modules installed")
51+
raiseImportError("You must have asyncpg module installed")
5852

5953
# ports used by nodes
6054
bound_ports=set()
@@ -193,26 +187,34 @@ def __init__(self,
193187
password=None):
194188

195189
# Use default user if not specified
196-
username=usernameordefault_username()
197-
190+
self.username=usernameordefault_username()
191+
self.dbname=dbname
192+
self.host=host
193+
self.password=password
198194
self.parent_node=parent_node
195+
self.connection=None
196+
self.current_transaction=None
199197

200-
self.connection=pglib.connect(
201-
database=dbname,
202-
user=username,
203-
port=parent_node.port,
204-
host=host,
205-
password=password)
198+
asyncdefinit_connection(self):
199+
ifself.connection:
200+
return
206201

207-
self.cursor=self.connection.cursor()
202+
self.connection=awaitpglib.connect(
203+
database=self.dbname,
204+
user=self.username,
205+
port=self.parent_node.port,
206+
host=self.host,
207+
password=self.password)
208208

209-
def__enter__(self):
209+
asyncdef__aenter__(self):
210210
returnself
211211

212-
def__exit__(self,type,value,traceback):
213-
self.close()
212+
asyncdef__aexit__(self,type,value,traceback):
213+
awaitself.close()
214+
215+
asyncdefbegin(self,isolation_level=IsolationLevel.ReadCommitted):
216+
awaitself.init_connection()
214217

215-
defbegin(self,isolation_level=IsolationLevel.ReadCommitted):
216218
# yapf: disable
217219
levels= [
218220
'read uncommitted',
@@ -245,37 +247,45 @@ def begin(self, isolation_level=IsolationLevel.ReadCommitted):
245247

246248
# Set isolation level
247249
cmd='SET TRANSACTION ISOLATION LEVEL {}'
248-
self.cursor.execute(cmd.format(isolation_level))
250+
self.current_transaction=self.connection.transaction()
251+
awaitself.current_transaction.start()
252+
awaitself.connection.execute(cmd.format(isolation_level))
249253

250254
returnself
251255

252-
defcommit(self):
253-
self.connection.commit()
256+
asyncdefcommit(self):
257+
ifnotself.current_transaction:
258+
raiseQueryException("transaction is not started")
254259

255-
returnself
256-
257-
defrollback(self):
258-
self.connection.rollback()
260+
awaitself.current_transaction.commit()
261+
self.current_transaction=None
259262

260-
returnself
263+
asyncdefrollback(self):
264+
ifnotself.current_transaction:
265+
raiseQueryException("transaction is not started")
261266

262-
defexecute(self,query,*args):
263-
self.cursor.execute(query,args)
267+
awaitself.current_transaction.rollback()
268+
self.current_transaction=None
264269

265-
try:
266-
res=self.cursor.fetchall()
267-
268-
# pg8000 might return tuples
269-
ifisinstance(res,tuple):
270-
res= [tuple(t)fortinres]
270+
asyncdefexecute(self,query,*args):
271+
awaitself.init_connection()
272+
ifself.current_transaction:
273+
returnawaitself.connection.execute(query,*args)
274+
else:
275+
asyncwithself.connection.transaction():
276+
returnawaitself.connection.execute(query,*args)
271277

272-
returnres
273-
exceptException:
274-
returnNone
278+
asyncdeffetch(self,query,*args):
279+
awaitself.init_connection()
280+
ifself.current_transaction:
281+
returnawaitself.connection.fetch(query,*args)
282+
else:
283+
asyncwithself.connection.transaction():
284+
returnawaitself.connection.fetch(query,*args)
275285

276-
defclose(self):
277-
self.cursor.close()
278-
self.connection.close()
286+
asyncdefclose(self):
287+
ifself.connection:
288+
awaitself.connection.close()
279289

280290

281291
classNodeBackup(object):
@@ -943,7 +953,7 @@ def restore(self, dbname, filename, username=None):
943953

944954
self.psql(dbname=dbname,filename=filename,username=username)
945955

946-
defpoll_query_until(self,
956+
asyncdefpoll_query_until(self,
947957
dbname,
948958
query,
949959
username=None,
@@ -973,41 +983,54 @@ def poll_query_until(self,
973983

974984
attempts=0
975985
whilemax_attempts==0orattempts<max_attempts:
976-
try:
977-
res=self.execute(dbname=dbname,
978-
query=query,
979-
username=username,
980-
commit=True)
981-
982-
ifexpectedisNoneandresisNone:
983-
return# done
986+
res=awaitself.fetch(dbname=dbname,
987+
query=query,
988+
username=username,
989+
commit=True)
984990

985-
ifresisNone:
986-
raiseQueryException('Query returned None')
991+
ifexpectedisNoneandresisNone:
992+
return# done
987993

988-
iflen(res)==0:
989-
raiseQueryException('Query returned0 rows')
994+
ifresisNone:
995+
raiseQueryException('Query returnedNone')
990996

991-
iflen(res[0])==0:
992-
raiseQueryException('Query returned 0columns')
997+
iflen(res)==0:
998+
raiseQueryException('Query returned 0rows')
993999

994-
ifres[0][0]:
995-
return# done
1000+
iflen(res[0])==0:
1001+
raiseQueryException('Query returned 0 columns')
9961002

997-
exceptpglib.ProgrammingErrorase:
998-
ifraise_programming_error:
999-
raisee
1000-
1001-
exceptpglib.InternalErrorase:
1002-
ifraise_internal_error:
1003-
raisee
1003+
ifres[0][0]:
1004+
return# done
10041005

10051006
time.sleep(sleep_time)
10061007
attempts+=1
10071008

10081009
raiseTimeoutException('Query timeout')
10091010

1010-
defexecute(self,dbname,query,username=None,commit=True):
1011+
asyncdefexecute(self,dbname,query,username=None,commit=True):
1012+
"""
1013+
Execute a query
1014+
1015+
Args:
1016+
dbname: database name to connect to.
1017+
query: query to be executed.
1018+
username: database user name.
1019+
commit: should we commit this query?
1020+
1021+
Returns:
1022+
A list of tuples representing rows.
1023+
"""
1024+
1025+
asyncwithself.connect(dbname,username)asnode_con:
1026+
ifcommit:
1027+
awaitnode_con.begin()
1028+
1029+
awaitnode_con.execute(query)
1030+
ifcommit:
1031+
awaitnode_con.commit()
1032+
1033+
asyncdeffetch(self,dbname,query,username=None,commit=True):
10111034
"""
10121035
Execute a query and return all rows as list.
10131036
@@ -1021,10 +1044,13 @@ def execute(self, dbname, query, username=None, commit=True):
10211044
A list of tuples representing rows.
10221045
"""
10231046

1024-
withself.connect(dbname,username)asnode_con:
1025-
res=node_con.execute(query)
1047+
asyncwithself.connect(dbname,username)asnode_con:
1048+
ifcommit:
1049+
awaitnode_con.begin()
1050+
1051+
res=awaitnode_con.fetch(query)
10261052
ifcommit:
1027-
node_con.commit()
1053+
awaitnode_con.commit()
10281054
returnres
10291055

10301056
defbackup(self,username=None,xlog_method=DEFAULT_XLOG_METHOD):
@@ -1059,7 +1085,7 @@ def replicate(self, name, username=None,
10591085
backup=self.backup(username=username,xlog_method=xlog_method)
10601086
returnbackup.spawn_replica(name,use_logging=use_logging)
10611087

1062-
defcatchup(self,username=None):
1088+
asyncdefcatchup(self,username=None):
10631089
"""
10641090
Wait until async replica catches up with its master.
10651091
"""
@@ -1080,8 +1106,8 @@ def catchup(self, username=None):
10801106
raiseCatchUpException("Master node is not specified")
10811107

10821108
try:
1083-
lsn=master.execute('postgres',poll_lsn)[0][0]
1084-
self.poll_query_until(dbname='postgres',
1109+
lsn=(awaitmaster.fetch('postgres',poll_lsn))[0][0]
1110+
awaitself.poll_query_until(dbname='postgres',
10851111
username=username,
10861112
query=wait_lsn.format(lsn),
10871113
max_attempts=0)# infinite

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp