Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add support for nesting temporary namespaces for ATX transactions.#12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
funny-falcon wants to merge1 commit intomaster
base:master
Choose a base branch
Loading
frompp-3882
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletionexpected/regression_ee.diff
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -570,14 +570,43 @@ diff ../../../src/test/regress/expected/rowsecurity.out ../tmp_check/regress_out
diff ../../../src/test/regress/expected/atx.out ../tmp_check/regress_outdir/results/atx.out
--- ../../../src/test/regress/expected/atx.outCENSORED
+++ ../tmp_check/regress_outdir/results/atx.outCENSORED
@@ -1143,6 +1143,7 @@
@@ -1139,6 +1139,7 @@
RESET client_min_messages;
create database regression_atx_test_database;
ALTER DATABASE "regression_atx_test_database" SET lc_messages TO 'C';
+ERROR: [MTM] failed to prepare transaction at peer node
\c regression_atx_test_database
create table atx_test as select 1 as id;
begin;
diff ../../../src/test/regress/expected/atx4.out ../tmp_check/regress_outdir/results/atx4.out
--- ../../../src/test/regress/expected/atx4.outCENSORED
+++ ../tmp_check/regress_outdir/results/atx4.outCENSORED
@@ -142,8 +142,10 @@
(1 row)

commit autonomous;
+ERROR: [MTM] failed to prepare transaction at peer node
-- Multimaster: t2 table will not be created due to pg_temp_N not found on replicas
commit;
+WARNING: there is no transaction in progress
begin;
-- create temp table in top level temptable but abort
begin autonomous;
@@ -213,11 +215,9 @@
commit;
-- Multimaster: t2 were not created
select * from t2;
- a
-----
- hi
-(1 row)
-
+ERROR: relation "t2" does not exist
+LINE 1: select * from t2;
+ ^
select * from t3;
ERROR: relation "t3" does not exist
LINE 1: select * from t3;
diff ../../../src/test/regress/expected/atx5.out ../tmp_check/regress_outdir/results/atx5.out
--- ../../../src/test/regress/expected/atx5.outCENSORED
+++ ../tmp_check/regress_outdir/results/atx5.outCENSORED
Expand Down
4 changes: 4 additions & 0 deletionsmultimaster--1.0.sql
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -195,6 +195,10 @@ CREATE FUNCTION mtm.set_temp_schema(nsp text) RETURNS void
AS 'MODULE_PATHNAME','mtm_set_temp_schema'
LANGUAGE C;

CREATE FUNCTION mtm.set_temp_schema(nsp text, force bool) RETURNS void
AS 'MODULE_PATHNAME','mtm_set_temp_schema'
LANGUAGE C;

CREATE TABLE mtm.local_tables(
rel_schema name,
rel_name name,
Expand Down
4 changes: 4 additions & 0 deletionssrc/commit.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -46,6 +46,7 @@ static bool inside_mtm_begin;
static MtmConfig *mtm_cfg;

MtmCurrentTrans MtmTx;
int MtmTxAtxLevel = 0;

/* holds state defining cleanup actions in case of failure during commit */
static struct MtmCommitState
Expand DownExpand Up@@ -400,6 +401,9 @@ MtmTwoPhaseCommit(void)
StartTransactionCommand();
}

if (MtmTxAtxLevel > 0)
temp_schema_reset(true);

/* prepare for cleanup */
mtm_commit_state.gtx = NULL;
mtm_commit_state.inside_commit_sequence = true;
Expand Down
139 changes: 109 additions & 30 deletionssrc/ddl.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -94,6 +94,7 @@ MtmDDLInProgress DDLApplyInProgress;

static char MtmTempSchema[NAMEDATALEN];
static bool TempDropRegistered;
static int TempDropAtxLevel;

static void const *MtmDDLStatement;

Expand DownExpand Up@@ -247,9 +248,11 @@ temp_schema_reset_all(int my_node_id)
"nsp record; "
"begin "
"reset session_authorization; "
"for nsp in select nspname from pg_namespace where nspname ~ '^mtm_tmp_%d_.*' loop "
"for nsp in select nspname from pg_namespace where "
" nspname ~ '^mtm_tmp_%d_.*' and"
" nspname !~ '_toast$' loop "
" perform mtm.set_temp_schema(nsp.nspname); "
" execute format('drop schema if exists %%I cascade',format('%%s_toast',nsp.nspname)); "
" execute format('drop schema if exists %%I cascade', nsp.nspname||'_toast'); "
" execute format('drop schema if exists %%I cascade', nsp.nspname); "
"end loop; "
"end $$; ",
Expand All@@ -258,26 +261,27 @@ temp_schema_reset_all(int my_node_id)
}

/* Drop temp schemas on peer nodes */
staticvoid
temp_schema_reset(void)
void
temp_schema_reset(bool transactional)
{
Assert(TempDropRegistered);
Assert(TempDropAtxLevel == MtmTxAtxLevel);

/*
* reset session_authorization restores permissions if previous ddl
* dropped them; set_temp_schema allows us to see temporary objects,
* otherwise they can't be dropped
*
*Itis important to run it as 'V', otherwise it might interfere with
*later (if drop is due to DISCARD) or earlier command using the schema.
*If dropisdue to DISCARD, it isimportant to run it as 'V', otherwise
*it might interfere with later or earlier command using the schema.
*/
MtmProcessDDLCommand(
psprintf("RESET session_authorization; "
"select mtm.set_temp_schema('%s'); "
"select mtm.set_temp_schema('%s', false); "
"DROP SCHEMA IF EXISTS %s_toast CASCADE; "
"DROP SCHEMA IF EXISTS %s CASCADE;",
MtmTempSchema, MtmTempSchema, MtmTempSchema),
false,
transactional,
false
);
MtmFinishDDLCommand();
Expand All@@ -290,52 +294,127 @@ temp_schema_at_exit(int status, Datum arg)
Assert(TempDropRegistered);
AbortOutOfAnyTransaction();
StartTransactionCommand();
temp_schema_reset();
for (; MtmTxAtxLevel >= 0; MtmTxAtxLevel--)
{
temp_schema_init();
temp_schema_reset(false);
}
CommitTransactionCommand();
}

/* Register cleanup callback and generate temp schema name */
staticvoid
void
temp_schema_init(void)
{
if (!TempDropRegistered)
{
char *temp_schema;

/*
* NB: namespace.c:isMtmTemp() assumes 'mtm_tmp_' prefix for mtm temp
* tables to defuse autovacuum.
*/
temp_schema = psprintf("mtm_tmp_%d_%d",
Mtm->my_node_id, MyBackendId);
memcpy(&MtmTempSchema, temp_schema, strlen(temp_schema) + 1);
before_shmem_exit(temp_schema_at_exit, (Datum) 0);
TempDropRegistered = true;
pfree(temp_schema);
before_shmem_exit(temp_schema_at_exit, (Datum) 0);
}
if (MtmTxAtxLevel == 0)
snprintf(MtmTempSchema, sizeof(MtmTempSchema),
"mtm_tmp_%d_%d", Mtm->my_node_id, MyBackendId);
else
snprintf(MtmTempSchema, sizeof(MtmTempSchema),
"mtm_tmp_%d_%d_%d", Mtm->my_node_id, MyBackendId, MtmTxAtxLevel);
TempDropAtxLevel = MtmTxAtxLevel;
}

/*
* temp_schema_valid check format of temp schema name.
* Namespace name should be either mtm_tmp_\d+_\d+ or
* mtm_tmp_\d+_\d+_\d+ for non-zero atx level.
*/
static bool
temp_schema_valid(const char *temp_namespace, const char **atx_level)
{
const char *c;
const int mtm_tmp_len = strlen("mtm_tmp_");
int underscores = 0;
bool need_digit = true;
boolvalid = true;

*atx_level = NULL;
if (strlen(temp_namespace) + strlen("_toast") + 1 > NAMEDATALEN)
valid = false;
else if(strncmp(temp_namespace, "mtm_tmp_", mtm_tmp_len) != 0)
valid = false;
for (c = temp_namespace+mtm_tmp_len; *c != 0 && valid; c++)
{
if (!need_digit && *c == '_')
{
underscores++;
if (underscores == 2)
*atx_level = c;
need_digit = true;
}
else if ((unsigned)*c - '0' <= '9' - '0')
need_digit = false;
else
valid = false;
}
if (need_digit || underscores < 1 || underscores > 2)
valid = false;
#ifndef PGPRO_EE
if (underscores == 2)
valid = false;
#endif

return valid;
}

Datum
mtm_set_temp_schema(PG_FUNCTION_ARGS)
{
char *temp_namespace = text_to_cstring(PG_GETARG_TEXT_P(0));
char *temp_toast_namespace = psprintf("%s_toast", temp_namespace);
Oidnsp_oid;
Oidtoast_nsp_oid;
boolforce = PG_NARGS() > 1 ? PG_GETARG_BOOL(1) : true;
chartemp_toast_namespace[NAMEDATALEN] = {0};
Oidnsp_oid = InvalidOid;
Oidtoast_nsp_oid = InvalidOid;
const char *atx_level_start = NULL;
#ifdef PGPRO_EE
chartop_temp_namespace[NAMEDATALEN] = {0};
Oidtop_nsp_oid = InvalidOid;
Oidtop_toast_nsp_oid = InvalidOid;
#endif

if (!temp_schema_valid(temp_namespace, &atx_level_start))
mtm_log(ERROR, "mtm_set_temp_schema: wrong namespace name '%s'",
temp_namespace);

if (!SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(temp_namespace)))
snprintf(temp_toast_namespace, NAMEDATALEN, "%s_toast", temp_namespace);
if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(temp_namespace)))
{
nsp_oid = get_namespace_oid(temp_namespace, false);
toast_nsp_oid = get_namespace_oid(temp_toast_namespace, false);
}
else if (force)
{
nsp_oid = NamespaceCreate(temp_namespace, BOOTSTRAP_SUPERUSERID, true);
toast_nsp_oid = NamespaceCreate(temp_toast_namespace, BOOTSTRAP_SUPERUSERID, true);
CommandCounterIncrement();
}
else

#ifdef PGPRO_EE
if (atx_level_start != NULL)
{
nsp_oid = get_namespace_oid(temp_namespace, false);
toast_nsp_oid = get_namespace_oid(temp_toast_namespace, false);
memcpy(top_temp_namespace, temp_namespace, atx_level_start - temp_namespace);

if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(top_temp_namespace)))
{
top_nsp_oid = get_namespace_oid(top_temp_namespace, false);
strlcat(top_temp_namespace, "_toast", NAMEDATALEN);
top_toast_nsp_oid = get_namespace_oid(top_temp_namespace, false);
}
}

SetTempNamespaceState(nsp_oid, toast_nsp_oid);
SetTempNamespaceForMultimaster();
SetTempNamespaceStateEx(nsp_oid, toast_nsp_oid,
top_nsp_oid, top_toast_nsp_oid,
atx_level_start != NULL);
#else
SetTempNamespace(nsp_oid, toast_nsp_oid);
#endif
PG_RETURN_VOID();
}

Expand DownExpand Up@@ -1030,7 +1109,7 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
{
/* nothing to do if temp schema wasn't created at all */
if (TempDropRegistered)
temp_schema_reset();
temp_schema_reset(false);
SkipCommand(true);
MtmGucDiscard();
}
Expand Down
2 changes: 2 additions & 0 deletionssrc/include/ddl.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -33,6 +33,8 @@ extern MtmDDLInProgress DDLApplyInProgress;
extern void MtmDDLReplicationInit(void);
extern void MtmDDLReplicationShmemStartup(void);
extern void temp_schema_reset_all(int my_node_id);
extern void temp_schema_reset(bool transactional);
extern void temp_schema_init(void);
extern bool MtmIsRelationLocal(Relation rel);
extern void MtmDDLResetStatement(void);
extern void MtmApplyDDLMessage(const char *messageBody, bool transactional);
Expand Down
1 change: 1 addition & 0 deletionssrc/include/multimaster.h
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -216,6 +216,7 @@ extern MtmShared *Mtm;

/* XXX: to delete */
extern MtmCurrentTrans MtmTx;
extern int MtmTxAtxLevel;
extern MemoryContext MtmApplyContext;

/* bgworker identities */
Expand Down
4 changes: 4 additions & 0 deletionssrc/multimaster.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -369,6 +369,8 @@ MtmSuspendTransaction(void)
MtmCurrentTrans *ctx = malloc(sizeof(MtmCurrentTrans));

*ctx = MtmTx;
MtmTxAtxLevel++;
temp_schema_init();
CallXactCallbacks(XACT_EVENT_START);
return ctx;
}
Expand All@@ -378,6 +380,8 @@ MtmResumeTransaction(void *ctx)
{
MtmTx = *(MtmCurrentTrans *) ctx;
free(ctx);
MtmTxAtxLevel--;
temp_schema_init();
}
#endif

Expand Down
17 changes: 15 additions & 2 deletionssrc/state.c
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3920,6 +3920,7 @@ MtmMonitor(Datum arg)
*/
{
intrc;
uint64 nfuncs;

StartTransactionCommand();
if (SPI_connect() != SPI_OK_CONNECT)
Expand DownExpand Up@@ -3955,15 +3956,27 @@ MtmMonitor(Datum arg)
true, 0);
if (rc < 0 || rc != SPI_OK_SELECT)
mtm_log(ERROR, "Failed to query pg_proc");
if (SPI_processed == 0)
nfuncs = SPI_processed;
if (nfuncs == 0)
{
rc = SPI_execute("CREATE FUNCTION mtm.set_temp_schema(nsp text) RETURNS void "
"AS '$libdir/multimaster','mtm_set_temp_schema' "
"LANGUAGE C; ", false, 0);
if (rc < 0 || rc != SPI_OK_UTILITY)
mtm_log(ERROR, "Failed to create mtm.set_temp_schema()");

mtm_log(LOG, "Creating mtm.set_temp_schema()");
mtm_log(LOG, "Creating mtm.set_temp_schema(nsp)");
}

if (nfuncs <= 1)
{
rc = SPI_execute("CREATE FUNCTION mtm.set_temp_schema(nsp text, force bool) RETURNS void "
"AS '$libdir/multimaster','mtm_set_temp_schema' "
"LANGUAGE C; ", false, 0);
if (rc < 0 || rc != SPI_OK_UTILITY)
mtm_log(ERROR, "Failed to create mtm.set_temp_schema()");

mtm_log(LOG, "Creating mtm.set_temp_schema(nsp, force)");
}

SPI_finish();
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp