Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbbf83ca

Browse files
pg_upgrade: Parallelize data type checks.
This commit makes use of the new task framework in pg_upgrade toparallelize the checks for incompatible data types, i.e., datatypes whose on-disk format has changed, data types that have beenremoved, etc. This step will now process multiple databasesconcurrently when pg_upgrade's --jobs option is provided a valuegreater than 1.Reviewed-by: Daniel Gustafsson, Ilya GladyshevDiscussion:https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
1 parent6ab8f27 commitbbf83ca

File tree

1 file changed

+191
-160
lines changed

1 file changed

+191
-160
lines changed

‎src/bin/pg_upgrade/check.c

Lines changed: 191 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,147 @@ static DataTypesUsageChecks data_types_usage_checks[] =
314314
}
315315
};
316316

317+
/*
318+
* Private state for check_for_data_types_usage()'s UpgradeTask.
319+
*/
320+
structdata_type_check_state
321+
{
322+
DataTypesUsageChecks*check;/* the check for this step */
323+
bool*result;/* true if check failed for any database */
324+
PQExpBuffer*report;/* buffer for report on failed checks */
325+
};
326+
327+
/*
328+
* Returns a palloc'd query string for the data type check, for use by
329+
* check_for_data_types_usage()'s UpgradeTask.
330+
*/
331+
staticchar*
332+
data_type_check_query(intchecknum)
333+
{
334+
DataTypesUsageChecks*check=&data_types_usage_checks[checknum];
335+
336+
returnpsprintf("WITH RECURSIVE oids AS ( "
337+
/* start with the type(s) returned by base_query */
338+
"%s "
339+
"UNION ALL "
340+
"SELECT * FROM ( "
341+
/* inner WITH because we can only reference the CTE once */
342+
"WITH x AS (SELECT oid FROM oids) "
343+
/* domains on any type selected so far */
344+
"SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
345+
"UNION ALL "
346+
/* arrays over any type selected so far */
347+
"SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
348+
"UNION ALL "
349+
/* composite types containing any type selected so far */
350+
"SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
351+
"WHERE t.typtype = 'c' AND "
352+
" t.oid = c.reltype AND "
353+
" c.oid = a.attrelid AND "
354+
" NOT a.attisdropped AND "
355+
" a.atttypid = x.oid "
356+
"UNION ALL "
357+
/* ranges containing any type selected so far */
358+
"SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
359+
"WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
360+
") foo "
361+
") "
362+
/* now look for stored columns of any such type */
363+
"SELECT n.nspname, c.relname, a.attname "
364+
"FROMpg_catalog.pg_class c, "
365+
"pg_catalog.pg_namespace n, "
366+
"pg_catalog.pg_attribute a "
367+
"WHEREc.oid = a.attrelid AND "
368+
"NOT a.attisdropped AND "
369+
"a.atttypid IN (SELECT oid FROM oids) AND "
370+
"c.relkind IN ("
371+
CppAsString2(RELKIND_RELATION) ", "
372+
CppAsString2(RELKIND_MATVIEW)", "
373+
CppAsString2(RELKIND_INDEX)") AND "
374+
"c.relnamespace = n.oid AND "
375+
/* exclude possible orphaned temp tables */
376+
"n.nspname !~ '^pg_temp_' AND "
377+
"n.nspname !~ '^pg_toast_temp_' AND "
378+
/* exclude system catalogs, too */
379+
"n.nspname NOT IN ('pg_catalog', 'information_schema')",
380+
check->base_query);
381+
}
382+
383+
/*
384+
* Callback function for processing results of queries for
385+
* check_for_data_types_usage()'s UpgradeTask. If the query returned any rows
386+
* (i.e., the check failed), write the details to the report file.
387+
*/
388+
staticvoid
389+
process_data_type_check(DbInfo*dbinfo,PGresult*res,void*arg)
390+
{
391+
structdata_type_check_state*state= (structdata_type_check_state*)arg;
392+
intntups=PQntuples(res);
393+
394+
AssertVariableIsOfType(&process_data_type_check,UpgradeTaskProcessCB);
395+
396+
if (ntups)
397+
{
398+
charoutput_path[MAXPGPATH];
399+
inti_nspname;
400+
inti_relname;
401+
inti_attname;
402+
FILE*script=NULL;
403+
booldb_used= false;
404+
405+
snprintf(output_path,sizeof(output_path),"%s/%s",
406+
log_opts.basedir,
407+
state->check->report_filename);
408+
409+
/*
410+
* Make sure we have a buffer to save reports to now that we found a
411+
* first failing check.
412+
*/
413+
if (*state->report==NULL)
414+
*state->report=createPQExpBuffer();
415+
416+
/*
417+
* If this is the first time we see an error for the check in question
418+
* then print a status message of the failure.
419+
*/
420+
if (!(*state->result))
421+
{
422+
pg_log(PG_REPORT," failed check: %s",_(state->check->status));
423+
appendPQExpBuffer(*state->report,"\n%s\n%s %s\n",
424+
_(state->check->report_text),
425+
_("A list of the problem columns is in the file:"),
426+
output_path);
427+
}
428+
*state->result= true;
429+
430+
i_nspname=PQfnumber(res,"nspname");
431+
i_relname=PQfnumber(res,"relname");
432+
i_attname=PQfnumber(res,"attname");
433+
434+
for (introwno=0;rowno<ntups;rowno++)
435+
{
436+
if (script==NULL&& (script=fopen_priv(output_path,"a"))==NULL)
437+
pg_fatal("could not open file \"%s\": %m",output_path);
438+
439+
if (!db_used)
440+
{
441+
fprintf(script,"In database: %s\n",dbinfo->db_name);
442+
db_used= true;
443+
}
444+
fprintf(script," %s.%s.%s\n",
445+
PQgetvalue(res,rowno,i_nspname),
446+
PQgetvalue(res,rowno,i_relname),
447+
PQgetvalue(res,rowno,i_attname));
448+
}
449+
450+
if (script)
451+
{
452+
fclose(script);
453+
script=NULL;
454+
}
455+
}
456+
}
457+
317458
/*
318459
* check_for_data_types_usage()
319460
*Detect whether there are any stored columns depending on given type(s)
@@ -334,13 +475,15 @@ static DataTypesUsageChecks data_types_usage_checks[] =
334475
* there's no storage involved in a view.
335476
*/
336477
staticvoid
337-
check_for_data_types_usage(ClusterInfo*cluster,DataTypesUsageChecks*checks)
478+
check_for_data_types_usage(ClusterInfo*cluster)
338479
{
339-
boolfound= false;
340480
bool*results;
341-
PQExpBufferDatareport;
342-
DataTypesUsageChecks*tmp=checks;
481+
PQExpBufferreport=NULL;
482+
DataTypesUsageChecks*tmp=data_types_usage_checks;
343483
intn_data_types_usage_checks=0;
484+
UpgradeTask*task=upgrade_task_create();
485+
char**queries=NULL;
486+
structdata_type_check_state*states;
344487

345488
prep_status("Checking data type usage");
346489

@@ -353,175 +496,63 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
353496

354497
/* Prepare an array to store the results of checks in */
355498
results=pg_malloc0(sizeof(bool)*n_data_types_usage_checks);
499+
queries=pg_malloc0(sizeof(char*)*n_data_types_usage_checks);
500+
states=pg_malloc0(sizeof(structdata_type_check_state)*n_data_types_usage_checks);
356501

357-
/*
358-
* Connect to each database in the cluster and run all defined checks
359-
* against that database before trying the next one.
360-
*/
361-
for (intdbnum=0;dbnum<cluster->dbarr.ndbs;dbnum++)
502+
for (inti=0;i<n_data_types_usage_checks;i++)
362503
{
363-
DbInfo*active_db=&cluster->dbarr.dbs[dbnum];
364-
PGconn*conn=connectToServer(cluster,active_db->db_name);
504+
DataTypesUsageChecks*check=&data_types_usage_checks[i];
365505

366-
for (intchecknum=0;checknum<n_data_types_usage_checks;checknum++)
506+
if (check->threshold_version==MANUAL_CHECK)
367507
{
368-
PGresult*res;
369-
intntups;
370-
inti_nspname;
371-
inti_relname;
372-
inti_attname;
373-
FILE*script=NULL;
374-
booldb_used= false;
375-
charoutput_path[MAXPGPATH];
376-
DataTypesUsageChecks*cur_check=&checks[checknum];
377-
378-
if (cur_check->threshold_version==MANUAL_CHECK)
379-
{
380-
Assert(cur_check->version_hook);
381-
382-
/*
383-
* Make sure that the check applies to the current cluster
384-
* version and skip if not. If no check hook has been defined
385-
* we run the check for all versions.
386-
*/
387-
if (!cur_check->version_hook(cluster))
388-
continue;
389-
}
390-
elseif (cur_check->threshold_version!=ALL_VERSIONS)
391-
{
392-
if (GET_MAJOR_VERSION(cluster->major_version)>cur_check->threshold_version)
393-
continue;
394-
}
395-
else
396-
Assert(cur_check->threshold_version==ALL_VERSIONS);
397-
398-
snprintf(output_path,sizeof(output_path),"%s/%s",
399-
log_opts.basedir,
400-
cur_check->report_filename);
508+
Assert(check->version_hook);
401509

402510
/*
403-
* The type(s) of interest might be wrapped in a domain, array,
404-
* composite, or range, and these container types can be nested
405-
* (to varying extents depending on server version, but that's not
406-
* of concern here). To handle all these cases we need a
407-
* recursive CTE.
511+
* Make sure that the check applies to the current cluster version
512+
* and skip it if not.
408513
*/
409-
res=executeQueryOrDie(conn,
410-
"WITH RECURSIVE oids AS ( "
411-
/* start with the type(s) returned by base_query */
412-
"%s "
413-
"UNION ALL "
414-
"SELECT * FROM ( "
415-
/* inner WITH because we can only reference the CTE once */
416-
"WITH x AS (SELECT oid FROM oids) "
417-
/* domains on any type selected so far */
418-
"SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
419-
"UNION ALL "
420-
/* arrays over any type selected so far */
421-
"SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
422-
"UNION ALL "
423-
/* composite types containing any type selected so far */
424-
"SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
425-
"WHERE t.typtype = 'c' AND "
426-
" t.oid = c.reltype AND "
427-
" c.oid = a.attrelid AND "
428-
" NOT a.attisdropped AND "
429-
" a.atttypid = x.oid "
430-
"UNION ALL "
431-
/* ranges containing any type selected so far */
432-
"SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
433-
"WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
434-
") foo "
435-
") "
436-
/* now look for stored columns of any such type */
437-
"SELECT n.nspname, c.relname, a.attname "
438-
"FROMpg_catalog.pg_class c, "
439-
"pg_catalog.pg_namespace n, "
440-
"pg_catalog.pg_attribute a "
441-
"WHEREc.oid = a.attrelid AND "
442-
"NOT a.attisdropped AND "
443-
"a.atttypid IN (SELECT oid FROM oids) AND "
444-
"c.relkind IN ("
445-
CppAsString2(RELKIND_RELATION) ", "
446-
CppAsString2(RELKIND_MATVIEW)", "
447-
CppAsString2(RELKIND_INDEX)") AND "
448-
"c.relnamespace = n.oid AND "
449-
/* exclude possible orphaned temp tables */
450-
"n.nspname !~ '^pg_temp_' AND "
451-
"n.nspname !~ '^pg_toast_temp_' AND "
452-
/* exclude system catalogs, too */
453-
"n.nspname NOT IN ('pg_catalog', 'information_schema')",
454-
cur_check->base_query);
455-
456-
ntups=PQntuples(res);
514+
if (!check->version_hook(cluster))
515+
continue;
516+
}
517+
elseif (check->threshold_version!=ALL_VERSIONS)
518+
{
519+
if (GET_MAJOR_VERSION(cluster->major_version)>check->threshold_version)
520+
continue;
521+
}
522+
else
523+
Assert(check->threshold_version==ALL_VERSIONS);
457524

458-
/*
459-
* The datatype was found, so extract the data and log to the
460-
* requested filename. We need to open the file for appending
461-
* since the check might have already found the type in another
462-
* database earlier in the loop.
463-
*/
464-
if (ntups)
465-
{
466-
/*
467-
* Make sure we have a buffer to save reports to now that we
468-
* found a first failing check.
469-
*/
470-
if (!found)
471-
initPQExpBuffer(&report);
472-
found= true;
473-
474-
/*
475-
* If this is the first time we see an error for the check in
476-
* question then print a status message of the failure.
477-
*/
478-
if (!results[checknum])
479-
{
480-
pg_log(PG_REPORT," failed check: %s",_(cur_check->status));
481-
appendPQExpBuffer(&report,"\n%s\n%s %s\n",
482-
_(cur_check->report_text),
483-
_("A list of the problem columns is in the file:"),
484-
output_path);
485-
}
486-
results[checknum]= true;
487-
488-
i_nspname=PQfnumber(res,"nspname");
489-
i_relname=PQfnumber(res,"relname");
490-
i_attname=PQfnumber(res,"attname");
491-
492-
for (introwno=0;rowno<ntups;rowno++)
493-
{
494-
if (script==NULL&& (script=fopen_priv(output_path,"a"))==NULL)
495-
pg_fatal("could not open file \"%s\": %m",output_path);
496-
497-
if (!db_used)
498-
{
499-
fprintf(script,"In database: %s\n",active_db->db_name);
500-
db_used= true;
501-
}
502-
fprintf(script," %s.%s.%s\n",
503-
PQgetvalue(res,rowno,i_nspname),
504-
PQgetvalue(res,rowno,i_relname),
505-
PQgetvalue(res,rowno,i_attname));
506-
}
507-
508-
if (script)
509-
{
510-
fclose(script);
511-
script=NULL;
512-
}
513-
}
525+
queries[i]=data_type_check_query(i);
514526

515-
PQclear(res);
516-
}
527+
states[i].check=check;
528+
states[i].result=&results[i];
529+
states[i].report=&report;
517530

518-
PQfinish(conn);
531+
upgrade_task_add_step(task,queries[i],process_data_type_check,
532+
true,&states[i]);
519533
}
520534

521-
if (found)
522-
pg_fatal("Data type checks failed: %s",report.data);
535+
/*
536+
* Connect to each database in the cluster and run all defined checks
537+
* against that database before trying the next one.
538+
*/
539+
upgrade_task_run(task,cluster);
540+
upgrade_task_free(task);
541+
542+
if (report)
543+
{
544+
pg_fatal("Data type checks failed: %s",report->data);
545+
destroyPQExpBuffer(report);
546+
}
523547

524548
pg_free(results);
549+
for (inti=0;i<n_data_types_usage_checks;i++)
550+
{
551+
if (queries[i])
552+
pg_free(queries[i]);
553+
}
554+
pg_free(queries);
555+
pg_free(states);
525556

526557
check_ok();
527558
}
@@ -616,7 +647,7 @@ check_and_dump_old_cluster(void)
616647
check_old_cluster_subscription_state();
617648
}
618649

619-
check_for_data_types_usage(&old_cluster,data_types_usage_checks);
650+
check_for_data_types_usage(&old_cluster);
620651

621652
/*
622653
* PG 14 changed the function signature of encoding conversion functions.

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp