Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit94173d3

Browse files
committed
Fix assorted issues in parallel vacuumdb.
Avoid storing the result of PQsocket() in a pgsocket variable; it'sdeclared as int, and the no-socket test is properly written as "x < 0"not "x == PGINVALID_SOCKET". This accidentally had no bad effectbecause we never got to init_slot() with a bad connection, but it'sstill wrong.Actually, it seems like we should avoid storing the result for a longperiod at all. The function's not so expensive that it's worth avoiding,and the existing coding technique here would fail if anyone tried toPQreset the connection during the life of the program. Hence, justre-call PQsocket every time we construct a select(2) mask.Speaking of select(), GetIdleSlot imagined that it could compute theselect mask once and continue to use it over multiple calls toselect_loop(), which is pretty bogus since that would stomp on themask on return. This could only matter if the function's outer loopiterated more than once, which is unlikely (it'd take some connectionreceiving data, but not enough to complete its command). But if itdid happen, we'd acquire "tunnel vision" and stop watching the otherconnections for query termination, with the effect of losing parallelism.Another way in which GetIdleSlot could lose parallelism is that oncePQisBusy returns false, it would lock in on that connection and doPQgetResult until that returns NULL; in some cases that could resultin blocking. (Perhaps this can never happen in vacuumdb due to thelimited set of commands that it can issue, but I'm not quite sureof that, and even if true today it's not a future-proof assumption.)Refactor the code to do that properly, so that it risks blocking inPQgetResult only in cases where we need to wait anyway.Another loss-of-parallelism problem, which *is* easily demonstrable,is that any setup queries issued during prepare_vacuum_command() werealways issued on the last-to-be-created connection, whether or notthat was idle. Long-running operations on that connection thusprevented issuance of additional operations on the other ones, exceptin the limited cases where no preparatory query was needed. Instead,wait till we've identified a free connection and use that one.Also, avoid core dump due to undersized malloc request in the casethat no tables are identified to be vacuumed.The bogus no-socket test was noted by CharSyam, the other problemsidentified in my own code review. Back-patch to 9.5 where parallelvacuumdb was introduced.Discussion:https://postgr.es/m/CAMrLSE6etb33-192DTEUGkV-TsvEcxtBDxGWG1tgNOMnQHwgDA@mail.gmail.com
1 parent5635c7a commit94173d3

File tree

1 file changed

+114
-78
lines changed

1 file changed

+114
-78
lines changed

‎src/bin/scripts/vacuumdb.c

Lines changed: 114 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@
2828
/* Parallel vacuuming stuff */
2929
typedefstructParallelSlot
3030
{
31-
PGconn*connection;
32-
pgsocketsock;
33-
boolisFree;
31+
PGconn*connection;/* One connection */
32+
boolisFree;/* Is it known to be idle? */
3433
}ParallelSlot;
3534

3635
/* vacuum options controlled by user flags */
@@ -71,13 +70,16 @@ static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
7170
staticParallelSlot*GetIdleSlot(ParallelSlotslots[],intnumslots,
7271
constchar*progname);
7372

73+
staticboolProcessQueryResult(PGconn*conn,PGresult*result,
74+
constchar*progname);
75+
7476
staticboolGetQueryResult(PGconn*conn,constchar*progname);
7577

7678
staticvoidDisconnectDatabase(ParallelSlot*slot);
7779

7880
staticintselect_loop(intmaxFd,fd_set*workerset,bool*aborting);
7981

80-
staticvoidinit_slot(ParallelSlot*slot,PGconn*conn,constchar*progname);
82+
staticvoidinit_slot(ParallelSlot*slot,PGconn*conn);
8183

8284
staticvoidhelp(constchar*progname);
8385

@@ -343,7 +345,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
343345
PQExpBufferDatasql;
344346
PGconn*conn;
345347
SimpleStringListCell*cell;
346-
ParallelSlot*slots=NULL;
348+
ParallelSlot*slots;
347349
SimpleStringListdbtables= {NULL,NULL};
348350
inti;
349351
boolfailed= false;
@@ -387,7 +389,6 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
387389
PQExpBufferDatabuf;
388390
PGresult*res;
389391
intntups;
390-
inti;
391392

392393
initPQExpBuffer(&buf);
393394

@@ -432,15 +433,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
432433
* for the first slot. If not in parallel mode, the first slot in the
433434
* array contains the connection.
434435
*/
436+
if (concurrentCons <=0)
437+
concurrentCons=1;
435438
slots= (ParallelSlot*)pg_malloc(sizeof(ParallelSlot)*concurrentCons);
436-
init_slot(slots,conn,progname);
439+
init_slot(slots,conn);
437440
if (parallel)
438441
{
439442
for (i=1;i<concurrentCons;i++)
440443
{
441444
conn=connectDatabase(dbname,host,port,username,prompt_password,
442445
progname,echo, false, true);
443-
init_slot(slots+i,conn,progname);
446+
init_slot(slots+i,conn);
444447
}
445448
}
446449

@@ -462,11 +465,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
462465
cell=tables ?tables->head :NULL;
463466
do
464467
{
465-
ParallelSlot*free_slot;
466468
constchar*tabname=cell ?cell->val :NULL;
467-
468-
prepare_vacuum_command(&sql,conn,vacopts,tabname,
469-
tables==&dbtables,progname,echo);
469+
ParallelSlot*free_slot;
470470

471471
if (CancelRequested)
472472
{
@@ -498,10 +498,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
498498
else
499499
free_slot=slots;
500500

501+
/*
502+
* Prepare the vacuum command. Note that in some cases this requires
503+
* query execution, so be sure to use the free connection.
504+
*/
505+
prepare_vacuum_command(&sql,free_slot->connection,vacopts,tabname,
506+
tables==&dbtables,progname,echo);
507+
501508
/*
502509
* Execute the vacuum. If not in parallel mode, this terminates the
503510
* program in case of an error. (The parallel case handles query
504-
* errors inGetQueryResult through GetIdleSlot.)
511+
* errors inProcessQueryResult through GetIdleSlot.)
505512
*/
506513
run_vacuum_command(free_slot->connection,sql.data,
507514
echo,tabname,progname,parallel);
@@ -514,13 +521,11 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
514521
{
515522
intj;
516523

524+
/* wait for all connections to finish */
517525
for (j=0;j<concurrentCons;j++)
518526
{
519-
/* wait for all connection to return the results */
520527
if (!GetQueryResult((slots+j)->connection,progname))
521528
gotofinish;
522-
523-
(slots+j)->isFree= true;
524529
}
525530
}
526531

@@ -691,7 +696,8 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
691696
}
692697

693698
/*
694-
* Execute a vacuum/analyze command to the server.
699+
* Send a vacuum/analyze command to the server. In async mode, return after
700+
* sending the command; else, wait for it to finish.
695701
*
696702
* Any errors during command execution are reported to stderr. If async is
697703
* false, this function exits the program after reporting the error.
@@ -739,10 +745,6 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo,
739745
* this happens, we read the whole set and mark as free all sockets that become
740746
* available.
741747
*
742-
* Process the slot list, if any free slot is available then return the slotid
743-
* else perform the select on all the socket's and wait until at least one slot
744-
* becomes available.
745-
*
746748
* If an error occurs, NULL is returned.
747749
*/
748750
staticParallelSlot*
@@ -751,31 +753,43 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
751753
{
752754
inti;
753755
intfirstFree=-1;
754-
fd_setslotset;
755-
pgsocketmaxFd;
756-
757-
for (i=0;i<numslots;i++)
758-
if ((slots+i)->isFree)
759-
returnslots+i;
760-
761-
FD_ZERO(&slotset);
762756

763-
maxFd=slots->sock;
757+
/* Any connection already known free? */
764758
for (i=0;i<numslots;i++)
765759
{
766-
FD_SET((slots+i)->sock,&slotset);
767-
if ((slots+i)->sock>maxFd)
768-
maxFd= (slots+i)->sock;
760+
if (slots[i].isFree)
761+
returnslots+i;
769762
}
770763

771764
/*
772765
* No free slot found, so wait until one of the connections has finished
773766
* its task and return the available slot.
774767
*/
775-
for (firstFree=-1;firstFree<0;)
768+
while (firstFree<0)
776769
{
770+
fd_setslotset;
771+
intmaxFd=0;
777772
boolaborting;
778773

774+
/* We must reconstruct the fd_set for each call to select_loop */
775+
FD_ZERO(&slotset);
776+
777+
for (i=0;i<numslots;i++)
778+
{
779+
intsock=PQsocket(slots[i].connection);
780+
781+
/*
782+
* We don't really expect any connections to lose their sockets
783+
* after startup, but just in case, cope by ignoring them.
784+
*/
785+
if (sock<0)
786+
continue;
787+
788+
FD_SET(sock,&slotset);
789+
if (sock>maxFd)
790+
maxFd=sock;
791+
}
792+
779793
SetCancelConn(slots->connection);
780794
i=select_loop(maxFd,&slotset,&aborting);
781795
ResetCancelConn();
@@ -793,64 +807,93 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
793807

794808
for (i=0;i<numslots;i++)
795809
{
796-
if (!FD_ISSET((slots+i)->sock,&slotset))
797-
continue;
798-
799-
PQconsumeInput((slots+i)->connection);
800-
if (PQisBusy((slots+i)->connection))
801-
continue;
810+
intsock=PQsocket(slots[i].connection);
802811

803-
(slots+i)->isFree= true;
812+
if (sock >=0&&FD_ISSET(sock,&slotset))
813+
{
814+
/* select() says input is available, so consume it */
815+
PQconsumeInput(slots[i].connection);
816+
}
804817

805-
if (!GetQueryResult((slots+i)->connection,progname))
806-
returnNULL;
818+
/* Collect result(s) as long as any are available */
819+
while (!PQisBusy(slots[i].connection))
820+
{
821+
PGresult*result=PQgetResult(slots[i].connection);
807822

808-
if (firstFree<0)
809-
firstFree=i;
823+
if (result!=NULL)
824+
{
825+
/* Check and discard the command result */
826+
if (!ProcessQueryResult(slots[i].connection,result,
827+
progname))
828+
returnNULL;
829+
}
830+
else
831+
{
832+
/* This connection has become idle */
833+
slots[i].isFree= true;
834+
if (firstFree<0)
835+
firstFree=i;
836+
break;
837+
}
838+
}
810839
}
811840
}
812841

813842
returnslots+firstFree;
814843
}
815844

845+
/*
846+
* ProcessQueryResult
847+
*
848+
* Process (and delete) a query result. Returns true if there's no error,
849+
* false otherwise -- but errors about trying to vacuum a missing relation
850+
* are reported and subsequently ignored.
851+
*/
852+
staticbool
853+
ProcessQueryResult(PGconn*conn,PGresult*result,constchar*progname)
854+
{
855+
/*
856+
* If it's an error, report it. Errors about a missing table are harmless
857+
* so we continue processing; but die for other errors.
858+
*/
859+
if (PQresultStatus(result)!=PGRES_COMMAND_OK)
860+
{
861+
char*sqlState=PQresultErrorField(result,PG_DIAG_SQLSTATE);
862+
863+
fprintf(stderr,_("%s: vacuuming of database \"%s\" failed: %s"),
864+
progname,PQdb(conn),PQerrorMessage(conn));
865+
866+
if (sqlState&&strcmp(sqlState,ERRCODE_UNDEFINED_TABLE)!=0)
867+
{
868+
PQclear(result);
869+
return false;
870+
}
871+
}
872+
873+
PQclear(result);
874+
return true;
875+
}
876+
816877
/*
817878
* GetQueryResult
818879
*
819-
* Process the query result. Returns true if there's no error, false
820-
* otherwise -- but errors about trying to vacuum a missing relation are
821-
* reported and subsequently ignored.
880+
* Pump the conn till it's dry of results; return false if any are errors.
881+
* Note that this will block if the conn is busy.
822882
*/
823883
staticbool
824884
GetQueryResult(PGconn*conn,constchar*progname)
825885
{
886+
boolok= true;
826887
PGresult*result;
827888

828889
SetCancelConn(conn);
829890
while ((result=PQgetResult(conn))!=NULL)
830891
{
831-
/*
832-
* If errors are found, report them. Errors about a missing table are
833-
* harmless so we continue processing; but die for other errors.
834-
*/
835-
if (PQresultStatus(result)!=PGRES_COMMAND_OK)
836-
{
837-
char*sqlState=PQresultErrorField(result,PG_DIAG_SQLSTATE);
838-
839-
fprintf(stderr,_("%s: vacuuming of database \"%s\" failed: %s"),
840-
progname,PQdb(conn),PQerrorMessage(conn));
841-
842-
if (sqlState&&strcmp(sqlState,ERRCODE_UNDEFINED_TABLE)!=0)
843-
{
844-
PQclear(result);
845-
return false;
846-
}
847-
}
848-
849-
PQclear(result);
892+
if (!ProcessQueryResult(conn,result,progname))
893+
ok= false;
850894
}
851895
ResetCancelConn();
852-
853-
return true;
896+
returnok;
854897
}
855898

856899
/*
@@ -942,18 +985,11 @@ select_loop(int maxFd, fd_set *workerset, bool *aborting)
942985
}
943986

944987
staticvoid
945-
init_slot(ParallelSlot*slot,PGconn*conn,constchar*progname)
988+
init_slot(ParallelSlot*slot,PGconn*conn)
946989
{
947990
slot->connection=conn;
991+
/* Initially assume connection is idle */
948992
slot->isFree= true;
949-
slot->sock=PQsocket(conn);
950-
951-
if (slot->sock<0)
952-
{
953-
fprintf(stderr,_("%s: invalid socket: %s"),progname,
954-
PQerrorMessage(conn));
955-
exit(1);
956-
}
957993
}
958994

959995
staticvoid

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp