Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit340e694

Browse files
committed
PGPRO-2065: fix async pgut_execute_parallel()
1 parent2b969c4 commit340e694

File tree

5 files changed

+126
-54
lines changed

5 files changed

+126
-54
lines changed

‎src/backup.c

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -945,22 +945,18 @@ do_block_validation(void)
945945
pgBackupGetPath(&current,database_path,lengthof(database_path),
946946
DATABASE_DIR);
947947

948-
/* initializebackup list */
948+
/* initializefile list */
949949
backup_files_list=parray_new();
950950

951951
/* list files with the logical path. omit $PGDATA */
952952
dir_list_file(backup_files_list,instance_config.pgdata, true, true, false,0);
953953

954954
/*
955-
* Sort pathname ascending. It is necessary to create intermediate
956-
* directories sequentially.
955+
* Sort pathname ascending.
957956
*
958957
* For example:
959958
* 1 - create 'base'
960959
* 2 - create 'base/1'
961-
*
962-
* Sorted array is used at least in parse_backup_filelist_filenames(),
963-
* extractPageMap(), make_pagemap_from_ptrack().
964960
*/
965961
parray_qsort(backup_files_list,pgFileComparePath);
966962
/* Extract information about files in backup_list parsing their names:*/
@@ -976,7 +972,6 @@ do_block_validation(void)
976972
/* Sort by size for load balancing */
977973
parray_qsort(backup_files_list,pgFileCompareSize);
978974

979-
980975
/* init thread args with own file lists */
981976
threads= (pthread_t*)palloc(sizeof(pthread_t)*num_threads);
982977
threads_args= (backup_files_arg*)palloc(sizeof(backup_files_arg)*num_threads);
@@ -1021,15 +1016,8 @@ do_block_validation(void)
10211016
/* TODO write better info message */
10221017
if (check_isok)
10231018
elog(INFO,"Data files are valid");
1024-
1025-
if (!check_isok)
1026-
{
1027-
if (thread_interrupted||interrupted)
1028-
elog(ERROR,"Checkdb failed");
1029-
else
1030-
elog(ERROR,"Data files are corrupted");
1031-
}
1032-
1019+
else
1020+
elog(ERROR,"Checkdb failed");
10331021

10341022
if (backup_files_list)
10351023
{
@@ -1053,7 +1041,7 @@ do_amcheck(void)
10531041
boolfirst_db_with_amcheck= true;
10541042
PGconn*db_conn=NULL;
10551043
booldb_skipped= false;
1056-
1044+
10571045
pgBackupGetPath(&current,database_path,lengthof(database_path),
10581046
DATABASE_DIR);
10591047

@@ -1065,7 +1053,7 @@ do_amcheck(void)
10651053

10661054
n_databases=PQntuples(res_db);
10671055

1068-
elog(INFO,"Startchecking PostgreSQL instance with amcheck");
1056+
elog(INFO,"Startamchecking PostgreSQL instance");
10691057

10701058
/* For each database check indexes. In parallel. */
10711059
for(i=0;i<n_databases;i++)
@@ -1112,8 +1100,6 @@ do_amcheck(void)
11121100
arg->ret=1;
11131101
}
11141102

1115-
pgut_atexit_push(threads_conn_disconnect,NULL);
1116-
11171103
/* Run threads */
11181104
for (j=0;j<num_threads;j++)
11191105
{
@@ -1131,18 +1117,25 @@ do_amcheck(void)
11311117
if (threads_args[j].ret>0)
11321118
check_isok= false;
11331119
}
1120+
1121+
/* cleanup */
11341122
pgut_disconnect(db_conn);
1123+
1124+
if (interrupted)
1125+
break;
11351126
}
11361127

11371128
/* TODO write better info message */
1138-
if (check_isok&& !db_skipped)
1139-
elog(INFO,"Indexes are valid");
1129+
if (db_skipped)
1130+
elog(WARNING,"Some databases were not checked");
11401131

1141-
if (!check_isok&& !interrupted)
1142-
elog(ERROR,"Some indexes are corrupted");
1132+
if (!check_isok)
1133+
elog(ERROR,"Checkdb --amcheck failed");
1134+
else
1135+
elog(INFO,"Checkdb --amcheck executed");
11431136

1144-
if (db_skipped)
1145-
elog(ERROR,"Some databases were not checked");
1137+
if (check_isok&& !interrupted&& !db_skipped)
1138+
elog(INFO,"Indexes are valid");
11461139
}
11471140

11481141
/* Entry point of pg_probackup CHECKDB subcommand. */
@@ -1153,7 +1146,7 @@ do_checkdb(bool need_amcheck)
11531146
{
11541147

11551148
if (skip_block_validation&& !need_amcheck)
1156-
elog(ERROR,"--skip-block-validation must be used with --amcheck option");
1149+
elog(ERROR,"Option '--skip-block-validation' must be used with'--amcheck' option");
11571150

11581151
pgdata_basic_setup();
11591152

@@ -2545,19 +2538,17 @@ backup_disconnect(bool fatal, void *userdata)
25452538
staticvoid
25462539
threads_conn_disconnect(boolfatal,void*userdata)
25472540
{
2548-
//int i;
2549-
2550-
//elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads);
2551-
// for (i = 0; i < num_threads; i++)
2552-
// {
2553-
// backup_files_arg *arg = &(threads_args[i]);
2554-
//
2555-
// if (arg->backup_conn)
2556-
// {
2557-
// pgut_cancel(arg->backup_conn);
2558-
// pgut_disconnect(arg->backup_conn);
2559-
// }
2560-
// }
2541+
inti;
2542+
2543+
backup_files_arg*arguments= (backup_files_arg*)userdata;
2544+
2545+
elog(VERBOSE,"threads_conn_disconnect, num_threads %d",arguments->thread_num);
2546+
2547+
if (arguments->backup_conn)
2548+
{
2549+
pgut_cancel(arguments->backup_conn);
2550+
pgut_disconnect(arguments->backup_conn);
2551+
}
25612552
}
25622553

25632554
staticvoid*
@@ -2663,7 +2654,7 @@ check_indexes(void *arg)
26632654
arguments->thread_num);
26642655

26652656
if (progress)
2666-
elog(INFO,"Thread [%d]. Progress: (%d/%d).Processing index '%s.%s'",
2657+
elog(INFO,"Thread [%d]. Progress: (%d/%d).Amchecking index '%s.%s'",
26672658
arguments->thread_num,i+1,n_indexes,
26682659
ind->amcheck_nspname,ind->name);
26692660

@@ -3489,8 +3480,7 @@ get_index_list(PGresult *res_db, int db_number,
34893480
instance_config.pguser);
34903481

34913482
res=pgut_execute(db_conn,"SELECT "
3492-
"extname, nspname, extversion, "
3493-
"extversion::numeric in (1.1, 1) as old_version "
3483+
"extname, nspname, extversion "
34943484
"FROM pg_namespace n "
34953485
"JOIN pg_extension e "
34963486
"ON n.oid=e.extnamespace "
@@ -3520,9 +3510,13 @@ get_index_list(PGresult *res_db, int db_number,
35203510
strcmp(PQgetvalue(res,0,2),"1")!=0)
35213511
heapallindexed_is_supported= true;
35223512

3523-
elog(INFO,"Checking database %s using module '%s' version %s from schema '%s'",
3513+
elog(INFO,"Amchecking database %s using module '%s' version %s from schema '%s'",
35243514
dbname,PQgetvalue(res,0,0),PQgetvalue(res,0,2),PQgetvalue(res,0,1));
35253515

3516+
if (!heapallindexed_is_supported&&heapallindexed)
3517+
elog(WARNING,"Module '%s' verion %s in schema '%s' do not support 'heapallindexed' option",
3518+
PQgetvalue(res,0,0),PQgetvalue(res,0,2),PQgetvalue(res,0,1));
3519+
35263520
/*
35273521
* In order to avoid duplicates, select global indexes
35283522
* (tablespace pg_global with oid 1664) only once
@@ -3549,7 +3543,7 @@ get_index_list(PGresult *res_db, int db_number,
35493543
"JOIN pg_tablespace tbl ON tbl.oid=cls.reltablespace "
35503544
"WHERE am.amname='btree' AND cls.relpersistence != 't' "
35513545
//"AND idx.indisready AND idx.indisvalid "
3552-
"AND tbl.spcname !='pg_global'",0,NULL);
3546+
"AND tbl.spcname !='pg_global'",0,NULL);
35533547
}
35543548

35553549
/* add info needed to check indexes into index_list */
@@ -3598,6 +3592,9 @@ amcheck_one_index(backup_files_arg *arguments,
35983592
/* second argument is heapallindexed */
35993593
params[1]=heapallindexed ?"true" :"false";
36003594

3595+
if (interrupted)
3596+
elog(ERROR,"Interrupted");
3597+
36013598
if (heapallindexed_is_supported)
36023599
{
36033600
query=palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1, $2)")+1);

‎src/data.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ prepare_page(backup_files_arg *arguments,
324324

325325
/* check for interrupt */
326326
if (interrupted||thread_interrupted)
327-
elog(ERROR,"Interrupted duringbackup");
327+
elog(ERROR,"Interrupted duringpage reading");
328328

329329
/*
330330
* Read the page and verify its header and checksum.

‎src/help.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,9 @@ help_checkdb(void)
421421
printf(_(" --skip-block-validation skip file-level block checking\n"));
422422
printf(_(" can be used only with '--amcheck' option\n"));
423423
printf(_(" --amcheck in addition to file-level block checking\n"));
424-
printf(_(" check btree indexes using 'amcheck' or 'amcheck_next' extension\n"));
424+
printf(_(" check btree indexes via function 'bt_index_check()'\n"));
425+
printf(_(" using amcheck' or 'amcheck_next' extension\n"));
426+
printf(_(" --parent use 'bt_index_parent_check()' instead of 'bt_index_check()'\n"));
425427
printf(_(" --heapallindexed also check that heap is indexed\n"));
426428
printf(_(" can be used only with '--amcheck' option\n"));
427429

‎src/pg_probackup.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ bool skip_external_dirs = false;
9393
/* checkdb options */
9494
booldo_amcheck= false;
9595
boolheapallindexed= false;
96+
boolamcheck_parent= false;
9697

9798
/* delete options */
9899
booldelete_wal= false;
@@ -165,6 +166,7 @@ static ConfigOption cmd_options[] =
165166
/* checkdb options */
166167
{'b',157,"amcheck",&do_amcheck,SOURCE_CMD_STRICT },
167168
{'b',158,"heapallindexed",&heapallindexed,SOURCE_CMD_STRICT },
169+
{'b',159,"parent",&amcheck_parent,SOURCE_CMD_STRICT },
168170
/* delete options */
169171
{'b',145,"wal",&delete_wal,SOURCE_CMD_STRICT },
170172
{'b',146,"expired",&delete_expired,SOURCE_CMD_STRICT },

‎src/utils/pgut.c

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#include"logger.h"
2222

2323

24-
constchar*PROGRAM_NAME=NULL;
24+
constchar*PROGRAM_NAME="pg_probackup";
2525

2626
staticchar*password=NULL;
2727
boolprompt_password= true;
@@ -43,6 +43,9 @@ static void on_interrupt(void);
4343
staticvoidon_cleanup(void);
4444
staticpqsigfuncoldhandler=NULL;
4545

46+
//bool is_result_ready(PGconn *conn);
47+
voiddiscard_response(PGconn*conn);
48+
4649
void
4750
pgut_init(void)
4851
{
@@ -363,6 +366,12 @@ pgut_execute_parallel(PGconn* conn,
363366
{
364367
PGresult*res;
365368
intret=0;
369+
PGconn*connections[1];
370+
371+
structtimevaltimeout;
372+
373+
timeout.tv_sec=100;
374+
timeout.tv_usec=200;
366375

367376
if (interrupted&& !in_cleanup)
368377
elog(ERROR,"interrupted");
@@ -390,35 +399,49 @@ pgut_execute_parallel(PGconn* conn,
390399
//on_before_exec(conn, thread_cancel_conn);
391400
if (async)
392401
{
402+
/* clean any old data */
403+
discard_response(conn);
404+
393405
if (nParams==0)
394-
ret=PQsendQuery(conn,query);
406+
PQsendQuery(conn,query);
395407
else
396-
ret=PQsendQueryParams(conn,query,nParams,NULL,params,NULL,NULL,
408+
PQsendQueryParams(conn,query,nParams,NULL,params,NULL,NULL,
397409
/*
398410
* Specify zero to obtain results in text format,
399411
* or one to obtain results in binary format.
400412
*/
401413
(text_result) ?0 :1);
402414

403415
/* wait for processing */
404-
while (true)
416+
//while(!is_result_ready(conn))
417+
//{
418+
//if (interrupted)
419+
//{
420+
//pgut_cancel(conn);
421+
//pgut_disconnect(conn);
422+
//elog(ERROR, "Interrupted");
423+
//}
424+
//}
425+
426+
/* wait for processing, TODO: timeout */
427+
for (;;)
405428
{
406-
407429
if (interrupted)
408430
{
409431
pgut_cancel(conn);
432+
pgut_disconnect(conn);
410433
elog(ERROR,"interrupted");
411434
}
412435

413436
if (!PQconsumeInput(conn))
414437
elog(ERROR,"query failed: %squery was: %s",
415-
PQerrorMessage(conn),query);
438+
PQerrorMessage(conn),query);
416439

417440
/* query is no done */
418441
if (!PQisBusy(conn))
419442
break;
420443

421-
usleep(1000);
444+
usleep(10000);
422445
}
423446

424447
res=PQgetResult(conn);
@@ -1023,3 +1046,51 @@ select_win32(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, con
10231046
}
10241047

10251048
#endif/* WIN32 */
1049+
1050+
void
1051+
discard_response(PGconn*conn)
1052+
{
1053+
PGresult*res;
1054+
1055+
do
1056+
{
1057+
res=PQgetResult(conn);
1058+
if (res)
1059+
PQclear(res);
1060+
}while (res);
1061+
}
1062+
1063+
//bool is_result_ready(PGconn * conn)
1064+
//{
1065+
// int sock;
1066+
// struct timeval timeout;
1067+
// fd_set read_mask;
1068+
//
1069+
// if (!PQisBusy(conn))
1070+
// return true;
1071+
//
1072+
// sock = PQsocket(conn);
1073+
//
1074+
// timeout.tv_sec = (time_t)1;
1075+
// timeout.tv_usec = 0;
1076+
//
1077+
// FD_ZERO(&read_mask);
1078+
// FD_SET(sock, &read_mask);
1079+
//
1080+
// if (select(sock + 1, &read_mask, NULL, NULL, &timeout) == 0)
1081+
// return false;
1082+
// else if (FD_ISSET(sock, &read_mask))
1083+
// {
1084+
// if (PQconsumeInput(conn))
1085+
// {
1086+
// if (PQisBusy(conn))
1087+
// return false;
1088+
// else
1089+
// return true;
1090+
// }
1091+
// else
1092+
// return false;
1093+
// }
1094+
// else
1095+
// return false;
1096+
//}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp