Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit418611c

Browse files
committed
Generalize parallel slot result handling.
Instead of having a hard-coded behavior that we ignore missingtables and report all other errors, let the caller decide whatto do by setting a callback.Mark Dilger, reviewed and somewhat revised by me. The larger patchseries of which this is a part has also had review from PeterGeoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and AmulSul, but I don't know whether any of them have reviewed this bitspecifically.Discussion:http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.comDiscussion:http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.comDiscussion:http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com
1 parente955bd4 commit418611c

File tree

4 files changed

+94
-28
lines changed

4 files changed

+94
-28
lines changed

‎src/bin/scripts/reindexdb.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
466466
gotofinish;
467467
}
468468

469+
ParallelSlotSetHandler(free_slot,TableCommandResultHandler,NULL);
469470
run_reindex_command(free_slot->connection,process_type,objname,
470471
echo,verbose,concurrently, true);
471472

‎src/bin/scripts/vacuumdb.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
713713
* Execute the vacuum. All errors are handled in processQueryResult
714714
* through ParallelSlotsGetIdle.
715715
*/
716+
ParallelSlotSetHandler(free_slot,TableCommandResultHandler,NULL);
716717
run_vacuum_command(free_slot->connection,sql.data,
717718
echo,tabname);
718719

‎src/fe_utils/parallel_slot.c

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,42 +30,32 @@
3030

3131
staticvoidinit_slot(ParallelSlot*slot,PGconn*conn);
3232
staticintselect_loop(intmaxFd,fd_set*workerset);
33-
staticboolprocessQueryResult(PGconn*conn,PGresult*result);
33+
staticboolprocessQueryResult(ParallelSlot*slot,PGresult*result);
3434

3535
staticvoid
3636
init_slot(ParallelSlot*slot,PGconn*conn)
3737
{
3838
slot->connection=conn;
3939
/* Initially assume connection is idle */
4040
slot->isFree= true;
41+
ParallelSlotClearHandler(slot);
4142
}
4243

4344
/*
44-
* Process (and delete) a query result. Returns true if there's noerror,
45-
* false otherwise -- but errors about trying towork on a missing relation
46-
*are reported and subsequently ignored.
45+
* Process (and delete) a query result. Returns true if there's noproblem,
46+
* false otherwise. It's up to the handler todecide what cosntitutes a
47+
*problem.
4748
*/
4849
staticbool
49-
processQueryResult(PGconn*conn,PGresult*result)
50+
processQueryResult(ParallelSlot*slot,PGresult*result)
5051
{
51-
/*
52-
* If it's an error, report it. Errors about a missing table are harmless
53-
* so we continue processing; but die for other errors.
54-
*/
55-
if (PQresultStatus(result)!=PGRES_COMMAND_OK)
56-
{
57-
char*sqlState=PQresultErrorField(result,PG_DIAG_SQLSTATE);
52+
Assert(slot->handler!=NULL);
5853

59-
pg_log_error("processing of database \"%s\" failed: %s",
60-
PQdb(conn),PQerrorMessage(conn));
61-
62-
if (sqlState&&strcmp(sqlState,ERRCODE_UNDEFINED_TABLE)!=0)
63-
{
64-
PQclear(result);
65-
return false;
66-
}
67-
}
54+
/* On failure, the handler should return NULL after freeing the result */
55+
if (!slot->handler(result,slot->connection,slot->handler_context))
56+
return false;
6857

58+
/* Ok, we have to free it ourself */
6959
PQclear(result);
7060
return true;
7161
}
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
7666
* Note that this will block if the connection is busy.
7767
*/
7868
staticbool
79-
consumeQueryResult(PGconn*conn)
69+
consumeQueryResult(ParallelSlot*slot)
8070
{
8171
boolok= true;
8272
PGresult*result;
8373

84-
SetCancelConn(conn);
85-
while ((result=PQgetResult(conn))!=NULL)
74+
SetCancelConn(slot->connection);
75+
while ((result=PQgetResult(slot->connection))!=NULL)
8676
{
87-
if (!processQueryResult(conn,result))
77+
if (!processQueryResult(slot,result))
8878
ok= false;
8979
}
9080
ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
227217

228218
if (result!=NULL)
229219
{
230-
/*Check and discard the command result */
231-
if (!processQueryResult(slots[i].connection,result))
220+
/*Handle and discard the command result */
221+
if (!processQueryResult(slots+i,result))
232222
returnNULL;
233223
}
234224
else
235225
{
236226
/* This connection has become idle */
237227
slots[i].isFree= true;
228+
ParallelSlotClearHandler(slots+i);
238229
if (firstFree<0)
239230
firstFree=i;
240231
break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
329320

330321
for (i=0;i<numslots;i++)
331322
{
332-
if (!consumeQueryResult((slots+i)->connection))
323+
if (!consumeQueryResult(slots+i))
324+
return false;
325+
}
326+
327+
return true;
328+
}
329+
330+
/*
331+
* TableCommandResultHandler
332+
*
333+
* ParallelSlotResultHandler for results of commands (not queries) against
334+
* tables.
335+
*
336+
* Requires that the result status is either PGRES_COMMAND_OK or an error about
337+
* a missing table. This is useful for utilities that compile a list of tables
338+
* to process and then run commands (vacuum, reindex, or whatever) against
339+
* those tables, as there is a race condition between the time the list is
340+
* compiled and the time the command attempts to open the table.
341+
*
342+
* For missing tables, logs an error but allows processing to continue.
343+
*
344+
* For all other errors, logs an error and terminates further processing.
345+
*
346+
* res: PGresult from the query executed on the slot's connection
347+
* conn: connection belonging to the slot
348+
* context: unused
349+
*/
350+
bool
351+
TableCommandResultHandler(PGresult*res,PGconn*conn,void*context)
352+
{
353+
/*
354+
* If it's an error, report it. Errors about a missing table are harmless
355+
* so we continue processing; but die for other errors.
356+
*/
357+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
358+
{
359+
char*sqlState=PQresultErrorField(res,PG_DIAG_SQLSTATE);
360+
361+
pg_log_error("processing of database \"%s\" failed: %s",
362+
PQdb(conn),PQerrorMessage(conn));
363+
364+
if (sqlState&&strcmp(sqlState,ERRCODE_UNDEFINED_TABLE)!=0)
365+
{
366+
PQclear(res);
333367
return false;
368+
}
334369
}
335370

336371
return true;

‎src/include/fe_utils/parallel_slot.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,39 @@
1515
#include"fe_utils/connect_utils.h"
1616
#include"libpq-fe.h"
1717

18+
typedefbool (*ParallelSlotResultHandler) (PGresult*res,PGconn*conn,
19+
void*context);
20+
1821
typedefstructParallelSlot
1922
{
2023
PGconn*connection;/* One connection */
2124
boolisFree;/* Is it known to be idle? */
25+
26+
/*
27+
* Prior to issuing a command or query on 'connection', a handler callback
28+
* function may optionally be registered to be invoked to process the
29+
* results, and context information may optionally be registered for use
30+
* by the handler. If unset, these fields should be NULL.
31+
*/
32+
ParallelSlotResultHandlerhandler;
33+
void*handler_context;
2234
}ParallelSlot;
2335

36+
staticinlinevoid
37+
ParallelSlotSetHandler(ParallelSlot*slot,ParallelSlotResultHandlerhandler,
38+
void*context)
39+
{
40+
slot->handler=handler;
41+
slot->handler_context=context;
42+
}
43+
44+
staticinlinevoid
45+
ParallelSlotClearHandler(ParallelSlot*slot)
46+
{
47+
slot->handler=NULL;
48+
slot->handler_context=NULL;
49+
}
50+
2451
externParallelSlot*ParallelSlotsGetIdle(ParallelSlot*slots,intnumslots);
2552

2653
externParallelSlot*ParallelSlotsSetup(constConnParams*cparams,
@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
3158

3259
externboolParallelSlotsWaitCompletion(ParallelSlot*slots,intnumslots);
3360

61+
externboolTableCommandResultHandler(PGresult*res,PGconn*conn,
62+
void*context);
3463

3564
#endif/* PARALLEL_SLOT_H */

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp