Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5c4854e

Browse files
committed
Debug pglogical_output
1 parent00e0519 commit5c4854e

File tree

6 files changed

+74
-61
lines changed

6 files changed

+74
-61
lines changed

‎contrib/multimaster/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o
2+
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o
33
#OBJS = multimaster.o pglogical_receiver.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
44
EXTENSION = multimaster
55
DATA = multimaster--1.0.sql

‎contrib/multimaster/pglogical_apply.c‎

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ process_remote_delete(StringInfo s, Relation rel)
751751
CommandCounterIncrement();
752752
}
753753

754+
staticMemoryContextApplyContext;
754755

755756
voidMMExecutor(intid,void*work,size_tsize)
756757
{
@@ -761,6 +762,15 @@ void MMExecutor(int id, void* work, size_t size)
761762
s.len=size;
762763
s.maxlen=-1;
763764

765+
if (ApplyContext==NULL) {
766+
ApplyContext=AllocSetContextCreate(TopMemoryContext,
767+
"MessageContext",
768+
ALLOCSET_DEFAULT_MINSIZE,
769+
ALLOCSET_DEFAULT_INITSIZE,
770+
ALLOCSET_DEFAULT_MAXSIZE);
771+
}
772+
MemoryContextSwitchTo(ApplyContext);
773+
764774
PG_TRY();
765775
{
766776
while (true) {
@@ -802,5 +812,7 @@ void MMExecutor(int id, void* work, size_t size)
802812
AbortCurrentTransaction();
803813
}
804814
PG_END_TRY();
815+
816+
MemoryContextResetAndDeleteChildren(ApplyContext);
805817
}
806818

‎contrib/multimaster/pglogical_output.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
433433
if (data->api->write_rel)
434434
{
435435
OutputPluginPrepareWrite(ctx, false);
436-
data->api->write_rel(ctx->out,relation);
436+
data->api->write_rel(ctx->out,data,relation);
437437
OutputPluginWrite(ctx, false);
438438
}
439439

‎contrib/multimaster/pglogical_proto.c‎

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ typedef struct PGLogicalProtoMM
4141
boolisLocal;
4242
}PGLogicalProtoMM;
4343

44-
staticvoidpglogical_write_rel(StringInfoout,Relationrel);
44+
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4545

4646
staticvoidpglogical_write_begin(StringInfoout,PGLogicalOutputData*data,
4747
ReorderBufferTXN*txn);
@@ -67,29 +67,32 @@ static char decide_datum_transfer(Form_pg_attribute att,
6767
* Write relation description to the output stream.
6868
*/
6969
staticvoid
70-
pglogical_write_rel(StringInfoout,Relationrel)
70+
pglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel)
7171
{
72-
constchar*nspname;
73-
uint8nspnamelen;
74-
constchar*relname;
75-
uint8relnamelen;
76-
77-
pq_sendbyte(out,'R');/* sending RELATION */
78-
79-
nspname=get_namespace_name(rel->rd_rel->relnamespace);
80-
if (nspname==NULL)
81-
elog(ERROR,"cache lookup failed for namespace %u",
82-
rel->rd_rel->relnamespace);
83-
nspnamelen=strlen(nspname)+1;
84-
85-
relname=NameStr(rel->rd_rel->relname);
86-
relnamelen=strlen(relname)+1;
87-
88-
pq_sendbyte(out,nspnamelen);/* schema name length */
89-
pq_sendbytes(out,nspname,nspnamelen);
90-
91-
pq_sendbyte(out,relnamelen);/* table name length */
92-
pq_sendbytes(out,relname,relnamelen);
72+
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
73+
if (!mm->isLocal) {
74+
constchar*nspname;
75+
uint8nspnamelen;
76+
constchar*relname;
77+
uint8relnamelen;
78+
79+
pq_sendbyte(out,'R');/* sending RELATION */
80+
81+
nspname=get_namespace_name(rel->rd_rel->relnamespace);
82+
if (nspname==NULL)
83+
elog(ERROR,"cache lookup failed for namespace %u",
84+
rel->rd_rel->relnamespace);
85+
nspnamelen=strlen(nspname)+1;
86+
87+
relname=NameStr(rel->rd_rel->relname);
88+
relnamelen=strlen(relname)+1;
89+
90+
pq_sendbyte(out,nspnamelen);/* schema name length */
91+
pq_sendbytes(out,nspname,nspnamelen);
92+
93+
pq_sendbyte(out,relnamelen);/* table name length */
94+
pq_sendbytes(out,relname,relnamelen);
95+
}
9396
}
9497

9598
/*

‎contrib/multimaster/pglogical_proto.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#ifndefPG_LOGICAL_PROTO_H
1414
#definePG_LOGICAL_PROTO_H
1515

16-
typedefvoid (*pglogical_write_rel_fn)(StringInfoout,Relationrel);
16+
typedefvoid (*pglogical_write_rel_fn)(StringInfoout,PGLogicalOutputData*data,Relationrel);
1717

1818
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,PGLogicalOutputData*data,
1919
ReorderBufferTXN*txn);

‎contrib/multimaster/pglogical_receiver.c‎

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ pglogical_receiver_main(Datum main_arg)
244244
resetPQExpBuffer(query);
245245

246246
/* Start logical replication at specified position */
247-
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL 0/0 ",
247+
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL 0/0(\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1')",
248248
args->receiver_slot);
249249
res=PQexec(conn,query->data);
250250
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
@@ -376,51 +376,49 @@ pglogical_receiver_main(Datum main_arg)
376376
walEnd=fe_recvint64(&copybuf[hdr_len]);
377377
hdr_len+=8;/* WALEnd */
378378
hdr_len+=8;/* sendTime */
379-
if (rc<hdr_len+1)
380-
{
381-
ereport(LOG, (errmsg("%s: Streaming header too small",
382-
worker_proc)));
383-
proc_exit(1);
384-
}
385379

386-
stmt=copybuf+hdr_len;
380+
/*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
381+
382+
Assert(rc >=hdr_len);
383+
384+
if (rc>hdr_len)
385+
{
386+
stmt=copybuf+hdr_len;
387387

388388
#ifdefUSE_PGLOGICAL_OUTPUT
389-
ByteBufferAppend(&buf,stmt,rc-hdr_len);
390-
if (stmt[0]=='C')
391-
{
392-
MMExecute(buf.data,buf.used);
393-
ByteBufferReset(&buf);
394-
}
389+
ByteBufferAppend(&buf,stmt,rc-hdr_len);
390+
if (stmt[0]=='C')/* commit */
391+
{
392+
MMExecute(buf.data,buf.used);
393+
ByteBufferReset(&buf);
394+
}
395395
#else
396-
if (strncmp(stmt,"BEGIN ",6)==0) {
397-
TransactionIdxid;
398-
intrc=sscanf(stmt+6,"%u",&xid);
399-
Assert(rc==1);
400-
ByteBufferAppendInt32(&buf,xid);
401-
Assert(!insideTrans);
402-
insideTrans= true;
403-
}elseif (strncmp(stmt,"COMMIT;",7)==0) {
404-
Assert(insideTrans);
405-
Assert(buf.used>4);
406-
buf.data[buf.used-1]='\0';/* replace last ';' with '\0' to make string zero terminated */
407-
MMExecute(buf.data,buf.used);
408-
ByteBufferReset(&buf);
409-
insideTrans= false;
410-
}else {
411-
Assert(insideTrans);
412-
ByteBufferAppend(&buf,stmt,rc-hdr_len/*strlen(stmt)*/);
413-
}
396+
if (strncmp(stmt,"BEGIN ",6)==0) {
397+
TransactionIdxid;
398+
intrc=sscanf(stmt+6,"%u",&xid);
399+
Assert(rc==1);
400+
ByteBufferAppendInt32(&buf,xid);
401+
Assert(!insideTrans);
402+
insideTrans= true;
403+
}elseif (strncmp(stmt,"COMMIT;",7)==0) {
404+
Assert(insideTrans);
405+
Assert(buf.used>4);
406+
buf.data[buf.used-1]='\0';/* replace last ';' with '\0' to make string zero terminated */
407+
MMExecute(buf.data,buf.used);
408+
ByteBufferReset(&buf);
409+
insideTrans= false;
410+
}else {
411+
Assert(insideTrans);
412+
ByteBufferAppend(&buf,stmt,rc-hdr_len/*strlen(stmt)*/);
413+
}
414414
#endif
415+
}
415416
/* Update written position */
416417
output_written_lsn=Max(walEnd,output_written_lsn);
417418
output_fsync_lsn=output_written_lsn;
418419
output_applied_lsn=output_written_lsn;
419420
}
420421

421-
/* Finish process */
422-
pgstat_report_activity(STATE_IDLE,NULL);
423-
424422
/* No data, move to next loop */
425423
if (rc==0)
426424
{

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp