Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit056eb14

Browse files
committed
When a cursor is opened using dblink_open, only start a transaction
if there isn't one already open. Upon dblink_close, only committhe open transaction if it was started by dblink_open, and onlythen when all cursors opened by dblink_open are closed. The transactionaccounting is done individually for all named connections, plusthe persistent unnamed connection.
1 parentc62b29a commit056eb14

File tree

3 files changed

+193
-39
lines changed

3 files changed

+193
-39
lines changed

‎contrib/dblink/dblink.c

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@
6060

6161
typedefstructremoteConn
6262
{
63-
PGconn*conn;/* Hold the remote connection */
64-
intautoXactCursors;/*Indicates thenumber of open cursors,
65-
* non-zero means we opened the xact ourselves */
63+
PGconn*conn;/* Hold the remote connection */
64+
intopenCursorCount;/*Thenumber of open cursors */
65+
boolnewXactForCursor;/* Opened a transaction for a cursor */
6666
}remoteConn;
6767

6868
/*
@@ -84,10 +84,8 @@ static Oidget_relid_from_relname(text *relname_text);
8484
staticchar*generate_relation_name(Oidrelid);
8585

8686
/* Global */
87-
List*res_id=NIL;
88-
intres_id_index=0;
89-
PGconn*persistent_conn=NULL;
90-
staticHTAB*remoteConnHash=NULL;
87+
staticremoteConn*pconn=NULL;
88+
staticHTAB*remoteConnHash=NULL;
9189

9290
/*
9391
*Following is list that holds multiple remote connections.
@@ -184,6 +182,16 @@ typedef struct remoteConnHashEnt
184182
} \
185183
} while (0)
186184

185+
#defineDBLINK_INIT \
186+
do { \
187+
if (!pconn) \
188+
{ \
189+
pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
190+
pconn->conn = NULL; \
191+
pconn->openCursorCount = 0; \
192+
pconn->newXactForCursor = FALSE; \
193+
} \
194+
} while (0)
187195

188196
/*
189197
* Create a persistent connection to another database
@@ -199,6 +207,8 @@ dblink_connect(PG_FUNCTION_ARGS)
199207
PGconn*conn=NULL;
200208
remoteConn*rconn=NULL;
201209

210+
DBLINK_INIT;
211+
202212
if (PG_NARGS()==2)
203213
{
204214
connstr=GET_STR(PG_GETARG_TEXT_P(1));
@@ -234,7 +244,7 @@ dblink_connect(PG_FUNCTION_ARGS)
234244
createNewConnection(connname,rconn);
235245
}
236246
else
237-
persistent_conn=conn;
247+
pconn->conn=conn;
238248

239249
PG_RETURN_TEXT_P(GET_TEXT("OK"));
240250
}
@@ -250,6 +260,8 @@ dblink_disconnect(PG_FUNCTION_ARGS)
250260
remoteConn*rconn=NULL;
251261
PGconn*conn=NULL;
252262

263+
DBLINK_INIT;
264+
253265
if (PG_NARGS()==1)
254266
{
255267
conname=GET_STR(PG_GETARG_TEXT_P(0));
@@ -258,7 +270,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
258270
conn=rconn->conn;
259271
}
260272
else
261-
conn=persistent_conn;
273+
conn=pconn->conn;
262274

263275
if (!conn)
264276
DBLINK_CONN_NOT_AVAIL;
@@ -270,7 +282,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
270282
pfree(rconn);
271283
}
272284
else
273-
persistent_conn=NULL;
285+
pconn->conn=NULL;
274286

275287
PG_RETURN_TEXT_P(GET_TEXT("OK"));
276288
}
@@ -292,12 +304,14 @@ dblink_open(PG_FUNCTION_ARGS)
292304
remoteConn*rconn=NULL;
293305
boolfail= true;/* default to backward compatible behavior */
294306

307+
DBLINK_INIT;
308+
295309
if (PG_NARGS()==2)
296310
{
297311
/* text,text */
298312
curname=GET_STR(PG_GETARG_TEXT_P(0));
299313
sql=GET_STR(PG_GETARG_TEXT_P(1));
300-
conn=persistent_conn;
314+
rconn=pconn;
301315
}
302316
elseif (PG_NARGS()==3)
303317
{
@@ -307,16 +321,14 @@ dblink_open(PG_FUNCTION_ARGS)
307321
curname=GET_STR(PG_GETARG_TEXT_P(0));
308322
sql=GET_STR(PG_GETARG_TEXT_P(1));
309323
fail=PG_GETARG_BOOL(2);
310-
conn=persistent_conn;
324+
rconn=pconn;
311325
}
312326
else
313327
{
314328
conname=GET_STR(PG_GETARG_TEXT_P(0));
315329
curname=GET_STR(PG_GETARG_TEXT_P(1));
316330
sql=GET_STR(PG_GETARG_TEXT_P(2));
317331
rconn=getConnectionByName(conname);
318-
if (rconn)
319-
conn=rconn->conn;
320332
}
321333
}
322334
elseif (PG_NARGS()==4)
@@ -327,18 +339,26 @@ dblink_open(PG_FUNCTION_ARGS)
327339
sql=GET_STR(PG_GETARG_TEXT_P(2));
328340
fail=PG_GETARG_BOOL(3);
329341
rconn=getConnectionByName(conname);
330-
if (rconn)
331-
conn=rconn->conn;
332342
}
333343

334-
if (!conn)
344+
if (!rconn|| !rconn->conn)
335345
DBLINK_CONN_NOT_AVAIL;
346+
else
347+
conn=rconn->conn;
336348

337-
res=PQexec(conn,"BEGIN");
338-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
339-
DBLINK_RES_INTERNALERROR("begin error");
349+
/*If we are not in a transaction, start one */
350+
if (PQtransactionStatus(conn)==PQTRANS_IDLE)
351+
{
352+
res=PQexec(conn,"BEGIN");
353+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
354+
DBLINK_RES_INTERNALERROR("begin error");
355+
PQclear(res);
356+
rconn->newXactForCursor= TRUE;
357+
}
340358

341-
PQclear(res);
359+
/* if we started a transaction, increment cursor count */
360+
if (rconn->newXactForCursor)
361+
(rconn->openCursorCount)++;
342362

343363
appendStringInfo(str,"DECLARE %s CURSOR FOR %s",curname,sql);
344364
res=PQexec(conn,str->data);
@@ -373,11 +393,13 @@ dblink_close(PG_FUNCTION_ARGS)
373393
remoteConn*rconn=NULL;
374394
boolfail= true;/* default to backward compatible behavior */
375395

396+
DBLINK_INIT;
397+
376398
if (PG_NARGS()==1)
377399
{
378400
/* text */
379401
curname=GET_STR(PG_GETARG_TEXT_P(0));
380-
conn=persistent_conn;
402+
rconn=pconn;
381403
}
382404
elseif (PG_NARGS()==2)
383405
{
@@ -386,15 +408,13 @@ dblink_close(PG_FUNCTION_ARGS)
386408
{
387409
curname=GET_STR(PG_GETARG_TEXT_P(0));
388410
fail=PG_GETARG_BOOL(1);
389-
conn=persistent_conn;
411+
rconn=pconn;
390412
}
391413
else
392414
{
393415
conname=GET_STR(PG_GETARG_TEXT_P(0));
394416
curname=GET_STR(PG_GETARG_TEXT_P(1));
395417
rconn=getConnectionByName(conname);
396-
if (rconn)
397-
conn=rconn->conn;
398418
}
399419
}
400420
if (PG_NARGS()==3)
@@ -404,12 +424,12 @@ dblink_close(PG_FUNCTION_ARGS)
404424
curname=GET_STR(PG_GETARG_TEXT_P(1));
405425
fail=PG_GETARG_BOOL(2);
406426
rconn=getConnectionByName(conname);
407-
if (rconn)
408-
conn=rconn->conn;
409427
}
410428

411-
if (!conn)
429+
if (!rconn|| !rconn->conn)
412430
DBLINK_CONN_NOT_AVAIL;
431+
else
432+
conn=rconn->conn;
413433

414434
appendStringInfo(str,"CLOSE %s",curname);
415435

@@ -428,12 +448,22 @@ dblink_close(PG_FUNCTION_ARGS)
428448

429449
PQclear(res);
430450

431-
/*commit thetransaction */
432-
res=PQexec(conn,"COMMIT");
433-
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
434-
DBLINK_RES_INTERNALERROR("commit error");
451+
/*if we started atransaction, decrement cursor count */
452+
if (rconn->newXactForCursor)
453+
{
454+
(rconn->openCursorCount)--;
435455

436-
PQclear(res);
456+
/* if count is zero, commit the transaction */
457+
if (rconn->openCursorCount==0)
458+
{
459+
rconn->newXactForCursor= FALSE;
460+
461+
res=PQexec(conn,"COMMIT");
462+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
463+
DBLINK_RES_INTERNALERROR("commit error");
464+
PQclear(res);
465+
}
466+
}
437467

438468
PG_RETURN_TEXT_P(GET_TEXT("OK"));
439469
}
@@ -456,6 +486,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
456486
char*conname=NULL;
457487
remoteConn*rconn=NULL;
458488

489+
DBLINK_INIT;
490+
459491
/* stuff done only on the first call of the function */
460492
if (SRF_IS_FIRSTCALL())
461493
{
@@ -485,7 +517,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
485517
curname=GET_STR(PG_GETARG_TEXT_P(0));
486518
howmany=PG_GETARG_INT32(1);
487519
fail=PG_GETARG_BOOL(2);
488-
conn=persistent_conn;
520+
conn=pconn->conn;
489521
}
490522
else
491523
{
@@ -503,7 +535,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
503535
/* text,int */
504536
curname=GET_STR(PG_GETARG_TEXT_P(0));
505537
howmany=PG_GETARG_INT32(1);
506-
conn=persistent_conn;
538+
conn=pconn->conn;
507539
}
508540

509541
if (!conn)
@@ -648,6 +680,8 @@ dblink_record(PG_FUNCTION_ARGS)
648680
MemoryContextoldcontext;
649681
boolfreeconn= false;
650682

683+
DBLINK_INIT;
684+
651685
/* stuff done only on the first call of the function */
652686
if (SRF_IS_FIRSTCALL())
653687
{
@@ -678,7 +712,7 @@ dblink_record(PG_FUNCTION_ARGS)
678712
/* text,text or text,bool */
679713
if (get_fn_expr_argtype(fcinfo->flinfo,1)==BOOLOID)
680714
{
681-
conn=persistent_conn;
715+
conn=pconn->conn;
682716
sql=GET_STR(PG_GETARG_TEXT_P(0));
683717
fail=PG_GETARG_BOOL(1);
684718
}
@@ -691,7 +725,7 @@ dblink_record(PG_FUNCTION_ARGS)
691725
elseif (PG_NARGS()==1)
692726
{
693727
/* text */
694-
conn=persistent_conn;
728+
conn=pconn->conn;
695729
sql=GET_STR(PG_GETARG_TEXT_P(0));
696730
}
697731
else
@@ -857,6 +891,8 @@ dblink_exec(PG_FUNCTION_ARGS)
857891
boolfreeconn= false;
858892
boolfail= true;/* default to backward compatible behavior */
859893

894+
DBLINK_INIT;
895+
860896
if (PG_NARGS()==3)
861897
{
862898
/* must be text,text,bool */
@@ -869,7 +905,7 @@ dblink_exec(PG_FUNCTION_ARGS)
869905
/* might be text,text or text,bool */
870906
if (get_fn_expr_argtype(fcinfo->flinfo,1)==BOOLOID)
871907
{
872-
conn=persistent_conn;
908+
conn=pconn->conn;
873909
sql=GET_STR(PG_GETARG_TEXT_P(0));
874910
fail=PG_GETARG_BOOL(1);
875911
}
@@ -882,7 +918,7 @@ dblink_exec(PG_FUNCTION_ARGS)
882918
elseif (PG_NARGS()==1)
883919
{
884920
/* must be single text argument */
885-
conn=persistent_conn;
921+
conn=pconn->conn;
886922
sql=GET_STR(PG_GETARG_TEXT_P(0));
887923
}
888924
else

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp