Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitde5e166

Browse files
committed
logical messages for UtilityStmts; GUC context
1 parent96e2407 commitde5e166

File tree

5 files changed

+115
-81
lines changed

5 files changed

+115
-81
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include"replication/walsender.h"
5050
#include"replication/walsender_private.h"
5151
#include"replication/slot.h"
52+
#include"replication/message.h"
5253
#include"port/atomics.h"
5354
#include"tcop/utility.h"
5455
#include"nodes/makefuncs.h"
@@ -235,8 +236,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
235236
ProcessUtilityContextcontext,ParamListInfoparams,
236237
DestReceiver*dest,char*completionTag);
237238

238-
//static StringInfoMtmGUCBuffer;
239-
//static boolMtmGUCBufferAllocated = false;
239+
staticStringInfoMtmGUCBuffer;
240+
staticboolMtmGUCBufferAllocated= false;
240241

241242
/*
242243
* -------------------------------------------
@@ -2979,53 +2980,55 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
29792980
}
29802981
}
29812982

2982-
staticboolMtmProcessDDLCommand(charconst*queryString)
2983-
{
2984-
RangeVar*rv;
2985-
Relationrel;
2986-
TupleDesctupDesc;
2987-
HeapTupletup;
2988-
Datumvalues[Natts_mtm_ddl_log];
2989-
boolnulls[Natts_mtm_ddl_log];
2990-
TimestampTzts=GetCurrentTimestamp();
2991-
2992-
rv=makeRangeVar("public",MULTIMASTER_DDL_TABLE,-1);
2993-
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
2983+
staticvoidMtmGUCBufferAppend(constchar*gucQueryString){
29942984

2995-
if (rel==NULL) {
2996-
if (!MtmIsBroadcast()) {
2997-
MtmBroadcastUtilityStmt(queryString, false);
2998-
return true;
2999-
}
3000-
return false;
2985+
if (!MtmGUCBufferAllocated)
2986+
{
2987+
MemoryContextoldcontext;
2988+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
2989+
MtmGUCBuffer=makeStringInfo();
2990+
MemoryContextSwitchTo(oldcontext);
2991+
MtmGUCBufferAllocated= true;
2992+
appendStringInfoString(MtmGUCBuffer,"RESET SESSION AUTHORIZATION; reset all;");
30012993
}
3002-
3003-
tupDesc=RelationGetDescr(rel);
30042994

3005-
/* Form a tuple. */
3006-
memset(nulls, false,sizeof(nulls));
3007-
3008-
values[Anum_mtm_ddl_log_issued-1]=TimestampTzGetDatum(ts);
3009-
values[Anum_mtm_ddl_log_query-1]=CStringGetTextDatum(queryString);
2995+
appendStringInfoString(MtmGUCBuffer,gucQueryString);
2996+
/* sometimes there is no ';' char at the end. */
2997+
// appendStringInfoString(MtmGUCBuffer, ";");
2998+
}
30102999

3011-
tup=heap_form_tuple(tupDesc,values,nulls);
3000+
staticchar*MtmGUCBufferGet(void){
3001+
if (!MtmGUCBufferAllocated)
3002+
MtmGUCBufferAppend("");
3003+
returnMtmGUCBuffer->data;
3004+
}
30123005

3013-
/* Insert the tuple to the catalog. */
3014-
simple_heap_insert(rel,tup);
3006+
staticboolMtmProcessDDLCommand(charconst*queryString)
3007+
{
3008+
char*queryWithContext;
3009+
char*gucContext;
30153010

3016-
/* Update the indexes. */
3017-
CatalogUpdateIndexes(rel,tup);
3011+
/* Append global GUC to utility stmt. */
3012+
gucContext=MtmGUCBufferGet();
3013+
if (gucContext)
3014+
{
3015+
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3016+
strcpy(queryWithContext,gucContext);
3017+
strcat(queryWithContext,queryString);
3018+
}
3019+
else
3020+
{
3021+
queryWithContext= (char*)queryString;
3022+
}
30183023

3019-
/* Cleanup. */
3020-
heap_freetuple(tup);
3021-
heap_close(rel,RowExclusiveLock);
3024+
MTM_LOG1("Sending utility: %s",queryWithContext);
3025+
LogLogicalMessage("MTM:GUC",queryWithContext,strlen(queryWithContext), true);
30223026

30233027
MtmTx.containsDML= true;
30243028
return false;
30253029
}
30263030

30273031

3028-
30293032
/*
30303033
* Genenerate global transaction identifier for two-pahse commit.
30313034
* It should be unique for all nodes
@@ -3129,43 +3132,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31293132
DiscardStmt*stmt= (DiscardStmt*)parsetree;
31303133
skipCommand=stmt->target==DISCARD_TEMP;
31313134

3132-
// skipCommand = true;
3133-
3134-
// if (MtmGUCBufferAllocated)
3135-
// {
3136-
// // XXX: move allocation somewhere to backend startup and check
3137-
// // where buffer is empty in send routines.
3138-
// MtmGUCBufferAllocated = false;
3139-
// pfree(MtmGUCBuffer);
3140-
// }
3141-
3135+
if (!IsTransactionBlock())
3136+
{
3137+
skipCommand= true;
3138+
MtmGUCBufferAppend(queryString);
3139+
}
31423140
}
31433141
break;
31443142
caseT_VariableSetStmt:
31453143
{
31463144
VariableSetStmt*stmt= (VariableSetStmt*)parsetree;
31473145

3148-
skipCommand= true;
3146+
//skipCommand = true;
31493147

31503148
/* Prevent SET TRANSACTION from replication */
31513149
if (stmt->kind==VAR_SET_MULTI)
3152-
// break;
31533150
skipCommand= true;
31543151

3155-
// if (!MtmGUCBufferAllocated)
3156-
// {
3157-
// MemoryContext oldcontext;
3158-
3159-
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3160-
// MtmGUCBuffer = makeStringInfo();
3161-
// MemoryContextSwitchTo(oldcontext);
3162-
// MtmGUCBufferAllocated = true;
3163-
// }
3164-
3165-
// appendStringInfoString(MtmGUCBuffer, queryString);
3166-
3167-
// sometimes there is no ';' char at the end.
3168-
// appendStringInfoString(MtmGUCBuffer, ";");
3152+
if (!IsTransactionBlock())
3153+
{
3154+
skipCommand= true;
3155+
MtmGUCBufferAppend(queryString);
3156+
}
31693157
}
31703158
break;
31713159
caseT_CreateTableAsStmt:
@@ -3191,7 +3179,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31913179

31923180
viewParse=parse_analyze((Node*)copyObject(stmt->query),
31933181
queryString,NULL,0);
3194-
skipCommand=isQueryUsingTempRelation(viewParse);
3182+
skipCommand=isQueryUsingTempRelation(viewParse)||
3183+
stmt->view->relpersistence==RELPERSISTENCE_TEMP;
31953184
// ||
31963185
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
31973186
}

‎contrib/mmts/pglogical_apply.c

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7171

7272
staticvoidprocess_remote_begin(StringInfos);
73+
staticvoidprocess_remote_message(StringInfos);
7374
staticvoidprocess_remote_commit(StringInfos);
7475
staticvoidprocess_remote_insert(StringInfos,Relationrel);
7576
staticvoidprocess_remote_update(StringInfos,Relationrel);
@@ -338,7 +339,31 @@ process_remote_begin(StringInfo s)
338339
StartTransactionCommand();
339340
MtmJoinTransaction(&gtid,snapshot);
340341

341-
MTM_LOG3("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
342+
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
343+
}
344+
345+
staticvoid
346+
process_remote_message(StringInfos)
347+
{
348+
constchar*stmt;
349+
intrc;
350+
351+
stmt=pq_getmsgstring(s);
352+
MTM_LOG1("utility: %s",stmt);
353+
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,stmt);
354+
355+
SPI_connect();
356+
rc=SPI_execute(stmt, false,0);
357+
SPI_finish();
358+
if (rc<0)
359+
elog(ERROR,"Failed to execute utility statement %s",stmt);
360+
361+
//XXX: create messages for tables localization too.
362+
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
363+
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
364+
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
365+
// MtmMakeTableLocal(schema, name);
366+
// }
342367
}
343368

344369
staticvoid
@@ -610,7 +635,6 @@ process_remote_insert(StringInfo s, Relation rel)
610635
TupleTableSlot*oldslot;
611636
ResultRelInfo*relinfo;
612637
ScanKey*index_keys;
613-
char*relname=RelationGetRelationName(rel);
614638
inti;
615639

616640
estate=create_rel_estate(rel);
@@ -693,22 +717,6 @@ process_remote_insert(StringInfo s, Relation rel)
693717
FreeExecutorState(estate);
694718

695719
CommandCounterIncrement();
696-
697-
if (strcmp(relname,MULTIMASTER_DDL_TABLE)==0) {
698-
char*ddl=TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
699-
intrc;
700-
SPI_connect();
701-
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,ddl);
702-
rc=SPI_execute(ddl, false,0);
703-
SPI_finish();
704-
if (rc<0)
705-
elog(ERROR,"Failed to execute utility statement %s",ddl);
706-
}elseif (strcmp(relname,MULTIMASTER_LOCAL_TABLES_TABLE)==0) {
707-
char*schema=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
708-
char*name=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
709-
MtmMakeTableLocal(schema,name);
710-
}
711-
712720
}
713721

714722
staticvoid
@@ -987,6 +995,11 @@ void MtmExecutor(int id, void* work, size_t size)
987995
s.len=save_len;
988996
continue;
989997
}
998+
case'G':
999+
{
1000+
process_remote_message(&s);
1001+
continue;
1002+
}
9901003
default:
9911004
elog(ERROR,"unknown action of type %c",action);
9921005
}

‎contrib/mmts/pglogical_output.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include"replication/output_plugin.h"
3535
#include"replication/logical.h"
36+
#include"replication/message.h"
3637
#include"replication/origin.h"
3738

3839
#include"utils/builtins.h"
@@ -64,6 +65,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6465
staticboolpg_decode_origin_filter(LogicalDecodingContext*ctx,
6566
RepOriginIdorigin_id);
6667

68+
staticvoidpg_decode_message(LogicalDecodingContext*ctx,
69+
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
70+
booltransactional,constchar*prefix,
71+
Sizesz,constchar*message);
72+
6773
staticvoidsend_startup_message(LogicalDecodingContext*ctx,
6874
PGLogicalOutputData*data,boollast_message);
6975

@@ -81,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8187
cb->commit_cb=pg_decode_commit_txn;
8288
cb->filter_by_origin_cb=pg_decode_origin_filter;
8389
cb->shutdown_cb=pg_decode_shutdown;
90+
cb->message_cb=pg_decode_message;
8491
}
8592

8693
staticbool
@@ -499,6 +506,18 @@ pg_decode_origin_filter(LogicalDecodingContext *ctx,
499506
return false;
500507
}
501508

509+
staticvoid
510+
pg_decode_message(LogicalDecodingContext*ctx,
511+
ReorderBufferTXN*txn,XLogRecPtrlsn,booltransactional,
512+
constchar*prefix,Sizesz,constchar*message)
513+
{
514+
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
515+
516+
OutputPluginPrepareWrite(ctx, true);
517+
data->api->write_message(ctx->out,prefix,sz,message);
518+
OutputPluginWrite(ctx, true);
519+
}
520+
502521
staticvoid
503522
send_startup_message(LogicalDecodingContext*ctx,
504523
PGLogicalOutputData*data,boollast_message)

‎contrib/mmts/pglogical_proto.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
}
120120
}
121121

122+
staticvoid
123+
pglogical_write_message(StringInfoout,
124+
constchar*prefix,Sizesz,constchar*message)
125+
{
126+
pq_sendbyte(out,'G');
127+
pq_sendbytes(out,message,sz);
128+
pq_sendbyte(out,'\0');
129+
}
130+
122131
/*
123132
* Write COMMIT to the output stream.
124133
*/
@@ -429,6 +438,7 @@ pglogical_init_api(PGLogicalProtoType typ)
429438
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
430439
res->write_rel=pglogical_write_rel;
431440
res->write_begin=pglogical_write_begin;
441+
res->write_message=pglogical_write_message;
432442
res->write_commit=pglogical_write_commit;
433443
res->write_insert=pglogical_write_insert;
434444
res->write_update=pglogical_write_update;

‎contrib/mmts/pglogical_proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,structPGLogicalOutputData*data,
2323
ReorderBufferTXN*txn);
24+
typedefvoid (*pglogical_write_message_fn)(StringInfoout,
25+
constchar*prefix,Sizesz,constchar*message);
2426
typedefvoid (*pglogical_write_commit_fn)(StringInfoout,structPGLogicalOutputData*data,
2527
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn);
2628

@@ -43,6 +45,7 @@ typedef struct PGLogicalProtoAPI
4345
{
4446
pglogical_write_rel_fnwrite_rel;
4547
pglogical_write_begin_fnwrite_begin;
48+
pglogical_write_message_fnwrite_message;
4649
pglogical_write_commit_fnwrite_commit;
4750
pglogical_write_origin_fnwrite_origin;
4851
pglogical_write_insert_fnwrite_insert;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp