1919#include "catalog/pg_class_d.h"
2020
2121#include "common.h"
22+ #include "fe_utils/connect.h"
2223#include "fe_utils/simple_list.h"
2324#include "fe_utils/string_utils.h"
2425
@@ -61,10 +62,8 @@ static void vacuum_all_databases(vacuumingOptions *vacopts,
6162int concurrentCons ,
6263const char * progname ,bool echo ,bool quiet );
6364
64- static void prepare_vacuum_command (PQExpBuffer sql ,PGconn * conn ,
65- vacuumingOptions * vacopts ,const char * table ,
66- bool table_pre_qualified ,
67- const char * progname ,bool echo );
65+ static void prepare_vacuum_command (PQExpBuffer sql ,int serverVersion ,
66+ vacuumingOptions * vacopts ,const char * table );
6867
6968static void run_vacuum_command (PGconn * conn ,const char * sql ,bool echo ,
7069const char * table ,const char * progname ,bool async );
@@ -359,13 +358,18 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
359358const char * progname ,bool echo ,bool quiet )
360359{
361360PQExpBufferData sql ;
361+ PQExpBufferData buf ;
362+ PQExpBufferData catalog_query ;
363+ PGresult * res ;
362364PGconn * conn ;
363365SimpleStringListCell * cell ;
364366ParallelSlot * slots ;
365367SimpleStringList dbtables = {NULL ,NULL };
366368int i ;
369+ int ntups ;
367370bool failed = false;
368371bool parallel = concurrentCons > 1 ;
372+ bool tables_listed = false;
369373const char * stage_commands []= {
370374"SET default_statistics_target=1; SET vacuum_cost_delay=0;" ,
371375"SET default_statistics_target=10; RESET vacuum_cost_delay;" ,
@@ -410,53 +414,132 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
410414fflush (stdout );
411415}
412416
413- initPQExpBuffer (& sql );
414-
415417/*
416- * If a table list is not provided and we're using multiple connections,
417- * prepare the list of tables by querying the catalogs.
418+ * Prepare the list of tables to process by querying the catalogs.
419+ *
420+ * Since we execute the constructed query with the default search_path
421+ * (which could be unsafe), everything in this query MUST be fully
422+ * qualified.
423+ *
424+ * First, build a WITH clause for the catalog query if any tables were
425+ * specified, with a set of values made of relation names and their
426+ * optional set of columns. This is used to match any provided column
427+ * lists with the generated qualified identifiers and to filter for the
428+ * tables provided via --table. If a listed table does not exist, the
429+ * catalog query will fail.
418430 */
419- if (parallel && (!tables || !tables -> head ))
431+ initPQExpBuffer (& catalog_query );
432+ for (cell = tables ?tables -> head :NULL ;cell ;cell = cell -> next )
420433{
421- PQExpBufferData buf ;
422- PGresult * res ;
423- int ntups ;
424-
425- initPQExpBuffer (& buf );
426-
427- res = executeQuery (conn ,
428- "SELECT c.relname, ns.nspname"
429- " FROM pg_class c, pg_namespace ns\n"
430- " WHERE relkind IN ("
431- CppAsString2 (RELKIND_RELATION )", "
432- CppAsString2 (RELKIND_MATVIEW )")"
433- " AND c.relnamespace = ns.oid\n"
434- " ORDER BY c.relpages DESC;" ,
435- progname ,echo );
436-
437- ntups = PQntuples (res );
438- for (i = 0 ;i < ntups ;i ++ )
439- {
440- appendPQExpBufferStr (& buf ,
441- fmtQualifiedId (PQgetvalue (res ,i ,1 ),
442- PQgetvalue (res ,i ,0 )));
434+ char * just_table ;
435+ const char * just_columns ;
443436
444- simple_string_list_append (& dbtables ,buf .data );
445- resetPQExpBuffer (& buf );
437+ /*
438+ * Split relation and column names given by the user, this is used to
439+ * feed the CTE with values on which are performed pre-run validity
440+ * checks as well. For now these happen only on the relation name.
441+ */
442+ splitTableColumnsSpec (cell -> val ,PQclientEncoding (conn ),
443+ & just_table ,& just_columns );
444+
445+ if (!tables_listed )
446+ {
447+ appendPQExpBuffer (& catalog_query ,
448+ "WITH listed_tables (table_oid, column_list) "
449+ "AS (\n VALUES (" );
450+ tables_listed = true;
446451}
452+ else
453+ appendPQExpBuffer (& catalog_query ,",\n (" );
447454
448- termPQExpBuffer ( & buf );
449- tables = & dbtables ;
455+ appendStringLiteralConn ( & catalog_query , just_table , conn );
456+ appendPQExpBuffer ( & catalog_query , "::pg_catalog.regclass, " ) ;
450457
451- /*
452- * If there are more connections than vacuumable relations, we don't
453- * need to use them all.
454- */
458+ if (just_columns && just_columns [0 ]!= '\0' )
459+ appendStringLiteralConn (& catalog_query ,just_columns ,conn );
460+ else
461+ appendPQExpBufferStr (& catalog_query ,"NULL" );
462+
463+ appendPQExpBufferStr (& catalog_query ,"::pg_catalog.text)" );
464+
465+ pg_free (just_table );
466+ }
467+
468+ /* Finish formatting the CTE */
469+ if (tables_listed )
470+ appendPQExpBuffer (& catalog_query ,"\n)\n" );
471+
472+ appendPQExpBuffer (& catalog_query ,"SELECT c.relname, ns.nspname" );
473+
474+ if (tables_listed )
475+ appendPQExpBuffer (& catalog_query ,", listed_tables.column_list" );
476+
477+ appendPQExpBuffer (& catalog_query ,
478+ " FROM pg_catalog.pg_class c\n"
479+ " JOIN pg_catalog.pg_namespace ns"
480+ " ON c.relnamespace OPERATOR(pg_catalog.=) ns.oid\n" );
481+
482+ /* Used to match the tables listed by the user */
483+ if (tables_listed )
484+ appendPQExpBuffer (& catalog_query ," JOIN listed_tables"
485+ " ON listed_tables.table_oid OPERATOR(pg_catalog.=) c.oid\n" );
486+
487+ appendPQExpBuffer (& catalog_query ," WHERE c.relkind OPERATOR(pg_catalog.=) ANY (array["
488+ CppAsString2 (RELKIND_RELATION )", "
489+ CppAsString2 (RELKIND_MATVIEW )"])\n" );
490+
491+ /*
492+ * Execute the catalog query. We use the default search_path for this
493+ * query for consistency with table lookups done elsewhere by the user.
494+ */
495+ appendPQExpBuffer (& catalog_query ," ORDER BY c.relpages DESC;" );
496+ executeCommand (conn ,"RESET search_path;" ,progname ,echo );
497+ res = executeQuery (conn ,catalog_query .data ,progname ,echo );
498+ termPQExpBuffer (& catalog_query );
499+ PQclear (executeQuery (conn ,ALWAYS_SECURE_SEARCH_PATH_SQL ,
500+ progname ,echo ));
501+
502+ /*
503+ * If no rows are returned, there are no matching tables, so we are done.
504+ */
505+ ntups = PQntuples (res );
506+ if (ntups == 0 )
507+ {
508+ PQclear (res );
509+ PQfinish (conn );
510+ return ;
511+ }
512+
513+ /*
514+ * Build qualified identifiers for each table, including the column list
515+ * if given.
516+ */
517+ initPQExpBuffer (& buf );
518+ for (i = 0 ;i < ntups ;i ++ )
519+ {
520+ appendPQExpBufferStr (& buf ,
521+ fmtQualifiedId (PQgetvalue (res ,i ,1 ),
522+ PQgetvalue (res ,i ,0 )));
523+
524+ if (tables_listed && !PQgetisnull (res ,i ,2 ))
525+ appendPQExpBufferStr (& buf ,PQgetvalue (res ,i ,2 ));
526+
527+ simple_string_list_append (& dbtables ,buf .data );
528+ resetPQExpBuffer (& buf );
529+ }
530+ termPQExpBuffer (& buf );
531+ PQclear (res );
532+
533+ /*
534+ * If there are more connections than vacuumable relations, we don't need
535+ * to use them all.
536+ */
537+ if (parallel )
538+ {
455539if (concurrentCons > ntups )
456540concurrentCons = ntups ;
457541if (concurrentCons <=1 )
458542parallel = false;
459- PQclear (res );
460543}
461544
462545/*
@@ -493,10 +576,12 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
493576stage_commands [stage ],progname ,echo );
494577}
495578
496- cell = tables ?tables -> head :NULL ;
579+ initPQExpBuffer (& sql );
580+
581+ cell = dbtables .head ;
497582do
498583{
499- const char * tabname = cell ? cell -> val : NULL ;
584+ const char * tabname = cell -> val ;
500585ParallelSlot * free_slot ;
501586
502587if (CancelRequested )
@@ -529,12 +614,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
529614else
530615free_slot = slots ;
531616
532- /*
533- * Prepare the vacuum command. Note that in some cases this requires
534- * query execution, so be sure to use the free connection.
535- */
536- prepare_vacuum_command (& sql ,free_slot -> connection ,vacopts ,tabname ,
537- tables == & dbtables ,progname ,echo );
617+ prepare_vacuum_command (& sql ,PQserverVersion (free_slot -> connection ),
618+ vacopts ,tabname );
538619
539620/*
540621 * Execute the vacuum. If not in parallel mode, this terminates the
@@ -544,8 +625,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
544625run_vacuum_command (free_slot -> connection ,sql .data ,
545626echo ,tabname ,progname ,parallel );
546627
547- if (cell )
548- cell = cell -> next ;
628+ cell = cell -> next ;
549629}while (cell != NULL );
550630
551631if (parallel )
@@ -653,14 +733,12 @@ vacuum_all_databases(vacuumingOptions *vacopts,
653733 * Construct a vacuum/analyze command to run based on the given options, in the
654734 * given string buffer, which may contain previous garbage.
655735 *
656- *An optional table namecan bepassed; this must be already be properly
657- *quoted. The command is semicolon-terminated.
736+ *The table nameused must bealready properly quoted. The command generated
737+ *depends on the server version involved and it is semicolon-terminated.
658738 */
659739static void
660- prepare_vacuum_command (PQExpBuffer sql ,PGconn * conn ,
661- vacuumingOptions * vacopts ,const char * table ,
662- bool table_pre_qualified ,
663- const char * progname ,bool echo )
740+ prepare_vacuum_command (PQExpBuffer sql ,int serverVersion ,
741+ vacuumingOptions * vacopts ,const char * table )
664742{
665743const char * paren = " (" ;
666744const char * comma = ", " ;
@@ -673,12 +751,12 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
673751appendPQExpBufferStr (sql ,"ANALYZE" );
674752
675753/* parenthesized grammar of ANALYZE is supported since v11 */
676- if (PQserverVersion ( conn ) >=110000 )
754+ if (serverVersion >=110000 )
677755{
678756if (vacopts -> skip_locked )
679757{
680758/* SKIP_LOCKED is supported since v12 */
681- Assert (PQserverVersion ( conn ) >=120000 );
759+ Assert (serverVersion >=120000 );
682760appendPQExpBuffer (sql ,"%sSKIP_LOCKED" ,sep );
683761sep = comma ;
684762}
@@ -701,19 +779,19 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
701779appendPQExpBufferStr (sql ,"VACUUM" );
702780
703781/* parenthesized grammar of VACUUM is supported since v9.0 */
704- if (PQserverVersion ( conn ) >=90000 )
782+ if (serverVersion >=90000 )
705783{
706784if (vacopts -> disable_page_skipping )
707785{
708786/* DISABLE_PAGE_SKIPPING is supported since v9.6 */
709- Assert (PQserverVersion ( conn ) >=90600 );
787+ Assert (serverVersion >=90600 );
710788appendPQExpBuffer (sql ,"%sDISABLE_PAGE_SKIPPING" ,sep );
711789sep = comma ;
712790}
713791if (vacopts -> skip_locked )
714792{
715793/* SKIP_LOCKED is supported since v12 */
716- Assert (PQserverVersion ( conn ) >=120000 );
794+ Assert (serverVersion >=120000 );
717795appendPQExpBuffer (sql ,"%sSKIP_LOCKED" ,sep );
718796sep = comma ;
719797}
@@ -753,15 +831,7 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
753831}
754832}
755833
756- if (table )
757- {
758- appendPQExpBufferChar (sql ,' ' );
759- if (table_pre_qualified )
760- appendPQExpBufferStr (sql ,table );
761- else
762- appendQualifiedRelation (sql ,table ,conn ,progname ,echo );
763- }
764- appendPQExpBufferChar (sql ,';' );
834+ appendPQExpBuffer (sql ," %s;" ,table );
765835}
766836
767837/*