Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41c36e3

Browse files
committed
logical messages for UtilityStmts; GUC context
1 parent9c31d02 commit41c36e3

File tree

5 files changed

+115
-81
lines changed

5 files changed

+115
-81
lines changed

‎multimaster.c

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include"replication/walsender.h"
5050
#include"replication/walsender_private.h"
5151
#include"replication/slot.h"
52+
#include"replication/message.h"
5253
#include"port/atomics.h"
5354
#include"tcop/utility.h"
5455
#include"nodes/makefuncs.h"
@@ -235,8 +236,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
235236
ProcessUtilityContextcontext,ParamListInfoparams,
236237
DestReceiver*dest,char*completionTag);
237238

238-
//static StringInfoMtmGUCBuffer;
239-
//static boolMtmGUCBufferAllocated = false;
239+
staticStringInfoMtmGUCBuffer;
240+
staticboolMtmGUCBufferAllocated= false;
240241

241242
/*
242243
* -------------------------------------------
@@ -3024,53 +3025,55 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
30243025
}
30253026
}
30263027

3027-
staticboolMtmProcessDDLCommand(charconst*queryString)
3028-
{
3029-
RangeVar*rv;
3030-
Relationrel;
3031-
TupleDesctupDesc;
3032-
HeapTupletup;
3033-
Datumvalues[Natts_mtm_ddl_log];
3034-
boolnulls[Natts_mtm_ddl_log];
3035-
TimestampTzts=GetCurrentTimestamp();
3036-
3037-
rv=makeRangeVar("public",MULTIMASTER_DDL_TABLE,-1);
3038-
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
3028+
staticvoidMtmGUCBufferAppend(constchar*gucQueryString){
30393029

3040-
if (rel==NULL) {
3041-
if (!MtmIsBroadcast()) {
3042-
MtmBroadcastUtilityStmt(queryString, false);
3043-
return true;
3044-
}
3045-
return false;
3030+
if (!MtmGUCBufferAllocated)
3031+
{
3032+
MemoryContextoldcontext;
3033+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
3034+
MtmGUCBuffer=makeStringInfo();
3035+
MemoryContextSwitchTo(oldcontext);
3036+
MtmGUCBufferAllocated= true;
3037+
appendStringInfoString(MtmGUCBuffer,"RESET SESSION AUTHORIZATION; reset all;");
30463038
}
3047-
3048-
tupDesc=RelationGetDescr(rel);
30493039

3050-
/* Form a tuple. */
3051-
memset(nulls, false,sizeof(nulls));
3052-
3053-
values[Anum_mtm_ddl_log_issued-1]=TimestampTzGetDatum(ts);
3054-
values[Anum_mtm_ddl_log_query-1]=CStringGetTextDatum(queryString);
3040+
appendStringInfoString(MtmGUCBuffer,gucQueryString);
3041+
/* sometimes there is no ';' char at the end. */
3042+
// appendStringInfoString(MtmGUCBuffer, ";");
3043+
}
30553044

3056-
tup=heap_form_tuple(tupDesc,values,nulls);
3045+
staticchar*MtmGUCBufferGet(void){
3046+
if (!MtmGUCBufferAllocated)
3047+
MtmGUCBufferAppend("");
3048+
returnMtmGUCBuffer->data;
3049+
}
30573050

3058-
/* Insert the tuple to the catalog. */
3059-
simple_heap_insert(rel,tup);
3051+
staticboolMtmProcessDDLCommand(charconst*queryString)
3052+
{
3053+
char*queryWithContext;
3054+
char*gucContext;
30603055

3061-
/* Update the indexes. */
3062-
CatalogUpdateIndexes(rel,tup);
3056+
/* Append global GUC to utility stmt. */
3057+
gucContext=MtmGUCBufferGet();
3058+
if (gucContext)
3059+
{
3060+
queryWithContext=palloc(strlen(gucContext)+strlen(queryString)+1);
3061+
strcpy(queryWithContext,gucContext);
3062+
strcat(queryWithContext,queryString);
3063+
}
3064+
else
3065+
{
3066+
queryWithContext= (char*)queryString;
3067+
}
30633068

3064-
/* Cleanup. */
3065-
heap_freetuple(tup);
3066-
heap_close(rel,RowExclusiveLock);
3069+
MTM_LOG1("Sending utility: %s",queryWithContext);
3070+
LogLogicalMessage("MTM:GUC",queryWithContext,strlen(queryWithContext), true);
30673071

30683072
MtmTx.containsDML= true;
30693073
return false;
30703074
}
30713075

30723076

3073-
30743077
/*
30753078
* Genenerate global transaction identifier for two-pahse commit.
30763079
* It should be unique for all nodes
@@ -3170,43 +3173,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31703173
DiscardStmt*stmt= (DiscardStmt*)parsetree;
31713174
skipCommand=stmt->target==DISCARD_TEMP;
31723175

3173-
// skipCommand = true;
3174-
3175-
// if (MtmGUCBufferAllocated)
3176-
// {
3177-
// // XXX: move allocation somewhere to backend startup and check
3178-
// // where buffer is empty in send routines.
3179-
// MtmGUCBufferAllocated = false;
3180-
// pfree(MtmGUCBuffer);
3181-
// }
3182-
3176+
if (!IsTransactionBlock())
3177+
{
3178+
skipCommand= true;
3179+
MtmGUCBufferAppend(queryString);
3180+
}
31833181
}
31843182
break;
31853183
caseT_VariableSetStmt:
31863184
{
31873185
VariableSetStmt*stmt= (VariableSetStmt*)parsetree;
31883186

3189-
skipCommand= true;
3187+
//skipCommand = true;
31903188

31913189
/* Prevent SET TRANSACTION from replication */
31923190
if (stmt->kind==VAR_SET_MULTI)
3193-
// break;
31943191
skipCommand= true;
31953192

3196-
// if (!MtmGUCBufferAllocated)
3197-
// {
3198-
// MemoryContext oldcontext;
3199-
3200-
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3201-
// MtmGUCBuffer = makeStringInfo();
3202-
// MemoryContextSwitchTo(oldcontext);
3203-
// MtmGUCBufferAllocated = true;
3204-
// }
3205-
3206-
// appendStringInfoString(MtmGUCBuffer, queryString);
3207-
3208-
// sometimes there is no ';' char at the end.
3209-
// appendStringInfoString(MtmGUCBuffer, ";");
3193+
if (!IsTransactionBlock())
3194+
{
3195+
skipCommand= true;
3196+
MtmGUCBufferAppend(queryString);
3197+
}
32103198
}
32113199
break;
32123200
caseT_CreateTableAsStmt:
@@ -3232,7 +3220,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32323220

32333221
viewParse=parse_analyze((Node*)copyObject(stmt->query),
32343222
queryString,NULL,0);
3235-
skipCommand=isQueryUsingTempRelation(viewParse);
3223+
skipCommand=isQueryUsingTempRelation(viewParse)||
3224+
stmt->view->relpersistence==RELPERSISTENCE_TEMP;
32363225
// ||
32373226
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
32383227
}

‎pglogical_apply.c

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
staticvoidUserTableUpdateIndexes(EState*estate,TupleTableSlot*slot);
7171

7272
staticvoidprocess_remote_begin(StringInfos);
73+
staticvoidprocess_remote_message(StringInfos);
7374
staticvoidprocess_remote_commit(StringInfos);
7475
staticvoidprocess_remote_insert(StringInfos,Relationrel);
7576
staticvoidprocess_remote_update(StringInfos,Relationrel);
@@ -338,7 +339,31 @@ process_remote_begin(StringInfo s)
338339
StartTransactionCommand();
339340
MtmJoinTransaction(&gtid,snapshot);
340341

341-
MTM_LOG3("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
342+
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld",gtid.node,gtid.xid,snapshot);
343+
}
344+
345+
staticvoid
346+
process_remote_message(StringInfos)
347+
{
348+
constchar*stmt;
349+
intrc;
350+
351+
stmt=pq_getmsgstring(s);
352+
MTM_LOG1("utility: %s",stmt);
353+
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,stmt);
354+
355+
SPI_connect();
356+
rc=SPI_execute(stmt, false,0);
357+
SPI_finish();
358+
if (rc<0)
359+
elog(ERROR,"Failed to execute utility statement %s",stmt);
360+
361+
//XXX: create messages for tables localization too.
362+
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
363+
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
364+
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
365+
// MtmMakeTableLocal(schema, name);
366+
// }
342367
}
343368

344369
staticvoid
@@ -617,7 +642,6 @@ process_remote_insert(StringInfo s, Relation rel)
617642
TupleTableSlot*oldslot;
618643
ResultRelInfo*relinfo;
619644
ScanKey*index_keys;
620-
char*relname=RelationGetRelationName(rel);
621645
inti;
622646

623647
estate=create_rel_estate(rel);
@@ -700,22 +724,6 @@ process_remote_insert(StringInfo s, Relation rel)
700724
FreeExecutorState(estate);
701725

702726
CommandCounterIncrement();
703-
704-
if (strcmp(relname,MULTIMASTER_DDL_TABLE)==0) {
705-
char*ddl=TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
706-
intrc;
707-
SPI_connect();
708-
MTM_LOG3("%d: Execute utility statement %s",MyProcPid,ddl);
709-
rc=SPI_execute(ddl, false,0);
710-
SPI_finish();
711-
if (rc<0)
712-
elog(ERROR,"Failed to execute utility statement %s",ddl);
713-
}elseif (strcmp(relname,MULTIMASTER_LOCAL_TABLES_TABLE)==0) {
714-
char*schema=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
715-
char*name=TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
716-
MtmMakeTableLocal(schema,name);
717-
}
718-
719727
}
720728

721729
staticvoid
@@ -999,6 +1007,11 @@ void MtmExecutor(int id, void* work, size_t size)
9991007
s.len=save_len;
10001008
continue;
10011009
}
1010+
case'G':
1011+
{
1012+
process_remote_message(&s);
1013+
continue;
1014+
}
10021015
default:
10031016
elog(ERROR,"unknown action of type %c",action);
10041017
}

‎pglogical_output.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include"replication/output_plugin.h"
3535
#include"replication/logical.h"
36+
#include"replication/message.h"
3637
#include"replication/origin.h"
3738

3839
#include"utils/builtins.h"
@@ -64,6 +65,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6465
staticboolpg_decode_origin_filter(LogicalDecodingContext*ctx,
6566
RepOriginIdorigin_id);
6667

68+
staticvoidpg_decode_message(LogicalDecodingContext*ctx,
69+
ReorderBufferTXN*txn,XLogRecPtrmessage_lsn,
70+
booltransactional,constchar*prefix,
71+
Sizesz,constchar*message);
72+
6773
staticvoidsend_startup_message(LogicalDecodingContext*ctx,
6874
PGLogicalOutputData*data,boollast_message);
6975

@@ -81,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8187
cb->commit_cb=pg_decode_commit_txn;
8288
cb->filter_by_origin_cb=pg_decode_origin_filter;
8389
cb->shutdown_cb=pg_decode_shutdown;
90+
cb->message_cb=pg_decode_message;
8491
}
8592

8693
staticbool
@@ -499,6 +506,18 @@ pg_decode_origin_filter(LogicalDecodingContext *ctx,
499506
return false;
500507
}
501508

509+
staticvoid
510+
pg_decode_message(LogicalDecodingContext*ctx,
511+
ReorderBufferTXN*txn,XLogRecPtrlsn,booltransactional,
512+
constchar*prefix,Sizesz,constchar*message)
513+
{
514+
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
515+
516+
OutputPluginPrepareWrite(ctx, true);
517+
data->api->write_message(ctx->out,prefix,sz,message);
518+
OutputPluginWrite(ctx, true);
519+
}
520+
502521
staticvoid
503522
send_startup_message(LogicalDecodingContext*ctx,
504523
PGLogicalOutputData*data,boollast_message)

‎pglogical_proto.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
}
120120
}
121121

122+
staticvoid
123+
pglogical_write_message(StringInfoout,
124+
constchar*prefix,Sizesz,constchar*message)
125+
{
126+
pq_sendbyte(out,'G');
127+
pq_sendbytes(out,message,sz);
128+
pq_sendbyte(out,'\0');
129+
}
130+
122131
/*
123132
* Write COMMIT to the output stream.
124133
*/
@@ -430,6 +439,7 @@ pglogical_init_api(PGLogicalProtoType typ)
430439
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d",MyProcPid,MyReplicationSlot->data.name.data,MtmReplicationNodeId);
431440
res->write_rel=pglogical_write_rel;
432441
res->write_begin=pglogical_write_begin;
442+
res->write_message=pglogical_write_message;
433443
res->write_commit=pglogical_write_commit;
434444
res->write_insert=pglogical_write_insert;
435445
res->write_update=pglogical_write_update;

‎pglogical_proto.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,structPGLogicalOutputData*data,
2323
ReorderBufferTXN*txn);
24+
typedefvoid (*pglogical_write_message_fn)(StringInfoout,
25+
constchar*prefix,Sizesz,constchar*message);
2426
typedefvoid (*pglogical_write_commit_fn)(StringInfoout,structPGLogicalOutputData*data,
2527
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn);
2628

@@ -43,6 +45,7 @@ typedef struct PGLogicalProtoAPI
4345
{
4446
pglogical_write_rel_fnwrite_rel;
4547
pglogical_write_begin_fnwrite_begin;
48+
pglogical_write_message_fnwrite_message;
4649
pglogical_write_commit_fnwrite_commit;
4750
pglogical_write_origin_fnwrite_origin;
4851
pglogical_write_insert_fnwrite_insert;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp