Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit7baa36d

Browse files
pg_upgrade: Parallelize subscription check.
This commit makes use of the new task framework in pg_upgrade toparallelize the part of check_old_cluster_subscription_state() thatverifies each of the subscribed tables is in the 'i' (initialize)or 'r' (ready) state. This check will now process multipledatabases concurrently when pg_upgrade's --jobs option is provideda value greater than 1.Reviewed-by: Daniel Gustafsson, Ilya GladyshevDiscussion:https://postgr.es/m/20240516211638.GA1688936%40nathanxps13
1 parent6d3d2e8 commit7baa36d

File tree

1 file changed

+111
-95
lines changed

1 file changed

+111
-95
lines changed

‎src/bin/pg_upgrade/check.c

Lines changed: 111 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void)
19051905
check_ok();
19061906
}
19071907

1908+
/*
1909+
* Callback function for processing results of query for
1910+
* check_old_cluster_subscription_state()'s UpgradeTask. If the query returned
1911+
* any rows (i.e., the check failed), write the details to the report file.
1912+
*/
1913+
staticvoid
1914+
process_old_sub_state_check(DbInfo*dbinfo,PGresult*res,void*arg)
1915+
{
1916+
UpgradeTaskReport*report= (UpgradeTaskReport*)arg;
1917+
intntup=PQntuples(res);
1918+
inti_srsubstate=PQfnumber(res,"srsubstate");
1919+
inti_subname=PQfnumber(res,"subname");
1920+
inti_nspname=PQfnumber(res,"nspname");
1921+
inti_relname=PQfnumber(res,"relname");
1922+
1923+
AssertVariableIsOfType(&process_old_sub_state_check,UpgradeTaskProcessCB);
1924+
1925+
for (inti=0;i<ntup;i++)
1926+
{
1927+
if (report->file==NULL&&
1928+
(report->file=fopen_priv(report->path,"w"))==NULL)
1929+
pg_fatal("could not open file \"%s\": %m",report->path);
1930+
1931+
fprintf(report->file,"The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
1932+
PQgetvalue(res,i,i_srsubstate),
1933+
dbinfo->db_name,
1934+
PQgetvalue(res,i,i_subname),
1935+
PQgetvalue(res,i,i_nspname),
1936+
PQgetvalue(res,i,i_relname));
1937+
}
1938+
}
1939+
19081940
/*
19091941
* check_old_cluster_subscription_state()
19101942
*
@@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void)
19151947
staticvoid
19161948
check_old_cluster_subscription_state(void)
19171949
{
1918-
FILE*script=NULL;
1919-
charoutput_path[MAXPGPATH];
1950+
UpgradeTask*task=upgrade_task_create();
1951+
UpgradeTaskReportreport;
1952+
constchar*query;
1953+
PGresult*res;
1954+
PGconn*conn;
19201955
intntup;
19211956

19221957
prep_status("Checking for subscription state");
19231958

1924-
snprintf(output_path,sizeof(output_path),"%s/%s",
1959+
report.file=NULL;
1960+
snprintf(report.path,sizeof(report.path),"%s/%s",
19251961
log_opts.basedir,
19261962
"subs_invalid.txt");
1927-
for (intdbnum=0;dbnum<old_cluster.dbarr.ndbs;dbnum++)
1928-
{
1929-
PGresult*res;
1930-
DbInfo*active_db=&old_cluster.dbarr.dbs[dbnum];
1931-
PGconn*conn=connectToServer(&old_cluster,active_db->db_name);
1932-
1933-
/* We need to check for pg_replication_origin only once. */
1934-
if (dbnum==0)
1935-
{
1936-
/*
1937-
* Check that all the subscriptions have their respective
1938-
* replication origin.
1939-
*/
1940-
res=executeQueryOrDie(conn,
1941-
"SELECT d.datname, s.subname "
1942-
"FROM pg_catalog.pg_subscription s "
1943-
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1944-
"ON o.roname = 'pg_' || s.oid "
1945-
"INNER JOIN pg_catalog.pg_database d "
1946-
"ON d.oid = s.subdbid "
1947-
"WHERE o.roname IS NULL;");
1948-
1949-
ntup=PQntuples(res);
1950-
for (inti=0;i<ntup;i++)
1951-
{
1952-
if (script==NULL&& (script=fopen_priv(output_path,"w"))==NULL)
1953-
pg_fatal("could not open file \"%s\": %m",output_path);
1954-
fprintf(script,"The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
1955-
PQgetvalue(res,i,0),
1956-
PQgetvalue(res,i,1));
1957-
}
1958-
PQclear(res);
1959-
}
1960-
1961-
/*
1962-
* We don't allow upgrade if there is a risk of dangling slot or
1963-
* origin corresponding to initial sync after upgrade.
1964-
*
1965-
* A slot/origin not created yet refers to the 'i' (initialize) state,
1966-
* while 'r' (ready) state refers to a slot/origin created previously
1967-
* but already dropped. These states are supported for pg_upgrade. The
1968-
* other states listed below are not supported:
1969-
*
1970-
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state
1971-
* would retain a replication slot, which could not be dropped by the
1972-
* sync worker spawned after the upgrade because the subscription ID
1973-
* used for the slot name won't match anymore.
1974-
*
1975-
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state
1976-
* would retain the replication origin when there is a failure in
1977-
* tablesync worker immediately after dropping the replication slot in
1978-
* the publisher.
1979-
*
1980-
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on
1981-
* a relation upgraded while in this state would expect an origin ID
1982-
* with the OID of the subscription used before the upgrade, causing
1983-
* it to fail.
1984-
*
1985-
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
1986-
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog,
1987-
* so we need not allow these states.
1988-
*/
1989-
res=executeQueryOrDie(conn,
1990-
"SELECT r.srsubstate, s.subname, n.nspname, c.relname "
1991-
"FROM pg_catalog.pg_subscription_rel r "
1992-
"LEFT JOIN pg_catalog.pg_subscription s"
1993-
"ON r.srsubid = s.oid "
1994-
"LEFT JOIN pg_catalog.pg_class c"
1995-
"ON r.srrelid = c.oid "
1996-
"LEFT JOIN pg_catalog.pg_namespace n"
1997-
"ON c.relnamespace = n.oid "
1998-
"WHERE r.srsubstate NOT IN ('i', 'r') "
1999-
"ORDER BY s.subname");
2000-
2001-
ntup=PQntuples(res);
2002-
for (inti=0;i<ntup;i++)
2003-
{
2004-
if (script==NULL&& (script=fopen_priv(output_path,"w"))==NULL)
2005-
pg_fatal("could not open file \"%s\": %m",output_path);
2006-
2007-
fprintf(script,"The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n",
2008-
PQgetvalue(res,i,0),
2009-
active_db->db_name,
2010-
PQgetvalue(res,i,1),
2011-
PQgetvalue(res,i,2),
2012-
PQgetvalue(res,i,3));
2013-
}
20141963

2015-
PQclear(res);
2016-
PQfinish(conn);
1964+
/*
1965+
* Check that all the subscriptions have their respective replication
1966+
* origin. This check only needs to run once.
1967+
*/
1968+
conn=connectToServer(&old_cluster,old_cluster.dbarr.dbs[0].db_name);
1969+
res=executeQueryOrDie(conn,
1970+
"SELECT d.datname, s.subname "
1971+
"FROM pg_catalog.pg_subscription s "
1972+
"LEFT OUTER JOIN pg_catalog.pg_replication_origin o "
1973+
"ON o.roname = 'pg_' || s.oid "
1974+
"INNER JOIN pg_catalog.pg_database d "
1975+
"ON d.oid = s.subdbid "
1976+
"WHERE o.roname IS NULL;");
1977+
ntup=PQntuples(res);
1978+
for (inti=0;i<ntup;i++)
1979+
{
1980+
if (report.file==NULL&&
1981+
(report.file=fopen_priv(report.path,"w"))==NULL)
1982+
pg_fatal("could not open file \"%s\": %m",report.path);
1983+
fprintf(report.file,"The replication origin is missing for database:\"%s\" subscription:\"%s\"\n",
1984+
PQgetvalue(res,i,0),
1985+
PQgetvalue(res,i,1));
20171986
}
1987+
PQclear(res);
1988+
PQfinish(conn);
20181989

2019-
if (script)
1990+
/*
1991+
* We don't allow upgrade if there is a risk of dangling slot or origin
1992+
* corresponding to initial sync after upgrade.
1993+
*
1994+
* A slot/origin not created yet refers to the 'i' (initialize) state,
1995+
* while 'r' (ready) state refers to a slot/origin created previously but
1996+
* already dropped. These states are supported for pg_upgrade. The other
1997+
* states listed below are not supported:
1998+
*
1999+
* a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
2000+
* retain a replication slot, which could not be dropped by the sync
2001+
* worker spawned after the upgrade because the subscription ID used for
2002+
* the slot name won't match anymore.
2003+
*
2004+
* b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
2005+
* retain the replication origin when there is a failure in tablesync
2006+
* worker immediately after dropping the replication slot in the
2007+
* publisher.
2008+
*
2009+
* c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
2010+
* relation upgraded while in this state would expect an origin ID with
2011+
* the OID of the subscription used before the upgrade, causing it to
2012+
* fail.
2013+
*
2014+
* d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
2015+
* SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we
2016+
* need not allow these states.
2017+
*/
2018+
query="SELECT r.srsubstate, s.subname, n.nspname, c.relname "
2019+
"FROM pg_catalog.pg_subscription_rel r "
2020+
"LEFT JOIN pg_catalog.pg_subscription s"
2021+
" ON r.srsubid = s.oid "
2022+
"LEFT JOIN pg_catalog.pg_class c"
2023+
" ON r.srrelid = c.oid "
2024+
"LEFT JOIN pg_catalog.pg_namespace n"
2025+
" ON c.relnamespace = n.oid "
2026+
"WHERE r.srsubstate NOT IN ('i', 'r') "
2027+
"ORDER BY s.subname";
2028+
2029+
upgrade_task_add_step(task,query,process_old_sub_state_check,
2030+
true,&report);
2031+
2032+
upgrade_task_run(task,&old_cluster);
2033+
upgrade_task_free(task);
2034+
2035+
if (report.file)
20202036
{
2021-
fclose(script);
2037+
fclose(report.file);
20222038
pg_log(PG_REPORT,"fatal");
20232039
pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n"
20242040
"You can allow the initial sync to finish for all relations and then restart the upgrade.\n"
20252041
"A list of the problematic subscriptions is in the file:\n"
2026-
" %s",output_path);
2042+
" %s",report.path);
20272043
}
20282044
else
20292045
check_ok();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp