Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5ff17df

Browse files
knizhnikkelvich
authored andcommitted
Trace logical decoding
1 parentb147076 commit5ff17df

File tree

6 files changed

+92
-20
lines changed

6 files changed

+92
-20
lines changed

‎arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ static char const* const messageText[] =
129129

130130
staticBackgroundWorkerMtmSender= {
131131
"mtm-sender",
132-
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,
132+
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,
133133
BgWorkerStart_ConsistentState,
134134
MULTIMASTER_BGW_RESTART_TIMEOUT,
135135
MtmTransSender
136136
};
137137

138138
staticBackgroundWorkerMtmRecevier= {
139139
"mtm-receiver",
140-
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,
140+
BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION,
141141
BgWorkerStart_ConsistentState,
142142
MULTIMASTER_BGW_RESTART_TIMEOUT,
143143
MtmTransReceiver

‎multimaster.c

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
990990
/* Assert(ts->status == TRANSACTION_STATUS_UNKNOWN); */
991991
Assert(ts->status==TRANSACTION_STATUS_UNKNOWN
992992
|| (ts->status==TRANSACTION_STATUS_IN_PROGRESS&&Mtm->status==MTM_RECOVERY));/* ??? Why there is commit without prepare */
993-
if (x->csn>ts->csn) {
993+
if (x->csn>ts->csn||Mtm->status==MTM_RECOVERY) {
994994
ts->csn=x->csn;
995995
MtmSyncClock(ts->csn);
996996
}
@@ -1514,7 +1514,7 @@ bool MtmRefreshClusterStatus(bool nowait)
15141514
MtmWakeUpBackend(ts);
15151515
}
15161516
#if0
1517-
}elseif (TransactionIdIsValid(ts->gtid.xid)&&BIT_CHECK(disabled,ts->gtid.node-1)) {// coordinator of transaction is on disabled node
1517+
}elseif (TransactionIdIsValid(ts->gtid.xid)&&BIT_CHECK(disabled,ts->gtid.node-1)) {/* coordinator of transaction is on disabled node */
15181518
if (ts->gid[0]) {
15191519
if (ts->status==TRANSACTION_STATUS_UNKNOWN||ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
15201520
MTM_LOG1("%d: Abort trasaction %s because its coordinator is at disabled node %d",MyProcPid,ts->gid,ts->gtid.node);
@@ -1604,12 +1604,14 @@ void MtmOnNodeDisconnect(int nodeId)
16041604
MtmAbortTransaction(ts);
16051605
MtmWakeUpBackend(ts);
16061606
}
1607-
}elseif (TransactionIdIsValid(ts->gtid.xid)&&ts->gtid.node==nodeId) {//coordinator of transaction is on disabled node
1607+
#if0
1608+
}elseif (TransactionIdIsValid(ts->gtid.xid)&&ts->gtid.node==nodeId) {/* coordinator of transaction is on disabled node */
16081609
if (ts->gid[0]&&ts->status!=TRANSACTION_STATUS_ABORTED) {
16091610
MtmAbortTransaction(ts);
16101611
MtmTx.status=TRANSACTION_STATUS_ABORTED;/* prevent recursive invocation of MtmAbortPreparedTransaction */
16111612
FinishPreparedTransaction(ts->gid, false);
16121613
}
1614+
#endif
16131615
}
16141616
}
16151617
}
@@ -2284,11 +2286,11 @@ _PG_init(void)
22842286
NULL,
22852287
&MtmConnStrs,
22862288
"",
2287-
PGC_BACKEND,// context
2288-
0,// flags,
2289-
NULL,// GucStringCheckHook check_hook,
2290-
NULL,// GucStringAssignHook assign_hook,
2291-
NULL// GucShowHook show_hook
2289+
PGC_BACKEND,/* context */
2290+
0,/* flags */
2291+
NULL,/* GucStringCheckHook check_hook */
2292+
NULL,/* GucStringAssignHook assign_hook */
2293+
NULL/* GucShowHook show_hook */
22922294
);
22932295

22942296
DefineCustomIntVariable(
@@ -3348,13 +3350,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
33483350
/* Save GUC context for consequent DDL execution */
33493351
caseT_DiscardStmt:
33503352
{
3351-
DiscardStmt*stmt= (DiscardStmt*)parsetree;
3352-
skipCommand=stmt->target==DISCARD_TEMP;
3353+
/*
3354+
* DiscardStmt *stmt = (DiscardStmt *) parsetree;
3355+
* skipCommand = stmt->target == DISCARD_TEMP;
3356+
*/
33533357

33543358
if (!IsTransactionBlock())
33553359
{
3356-
skipCommand= true;
3357-
MtmGUCBufferAppend(queryString);
3360+
/*
3361+
* XXX: move allocation somewhere to backend startup and check
3362+
* where buffer is empty in send routines.
3363+
*/
3364+
MtmGUCBufferAllocated= false;
3365+
pfree(MtmGUCBuffer);
33583366
}
33593367
}
33603368
break;
@@ -3364,7 +3372,38 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
33643372

33653373
/* Prevent SET TRANSACTION from replication */
33663374
if (stmt->kind==VAR_SET_MULTI)
3367-
skipCommand= true;
3375+
break;
3376+
3377+
if (!MtmGUCBufferAllocated)
3378+
{
3379+
MemoryContextoldcontext;
3380+
3381+
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
3382+
MtmGUCBuffer=makeStringInfo();
3383+
MemoryContextSwitchTo(oldcontext);
3384+
MtmGUCBufferAllocated= true;
3385+
}
3386+
3387+
appendStringInfoString(MtmGUCBuffer,queryString);
3388+
3389+
/* sometimes there is no ';' char at the end. */
3390+
appendStringInfoString(MtmGUCBuffer,";");
3391+
}
3392+
break;
3393+
caseT_CreateStmt:
3394+
{
3395+
/* Do not replicate temp tables */
3396+
CreateStmt*stmt= (CreateStmt*)parsetree;
3397+
skipCommand=stmt->relation->relpersistence==RELPERSISTENCE_TEMP||
3398+
(stmt->relation->schemaname&&strcmp(stmt->relation->schemaname,"pg_temp")==0);
3399+
}
3400+
break;
3401+
caseT_IndexStmt:
3402+
{
3403+
Oidrelid;
3404+
Relationrel;
3405+
IndexStmt*stmt= (IndexStmt*)parsetree;
3406+
boolisTopLevel= (context==PROCESS_UTILITY_TOPLEVEL);
33683407

33693408
if (stmt->kind==VAR_RESET&&strcmp(stmt->name,"session_authorization")==0)
33703409
MtmGUCBufferClear();

‎pglogical_apply.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ typedef struct TupleData
5959
boolchanged[MaxTupleAttributeNumber];
6060
}TupleData;
6161

62-
staticboolinside_tx= false;
62+
staticintMtmTransactionRecords;
6363

6464
staticRelationread_rel(StringInfos,LOCKMODEmode);
6565
staticvoidread_tuple_parts(StringInfos,Relationrel,TupleData*tup);
@@ -543,11 +543,16 @@ process_remote_commit(StringInfo in)
543543
csn_tcsn;
544544
constchar*gid=NULL;
545545
XLogRecPtrend_lsn;
546-
546+
intn_records;
547547
/* read flags */
548548
flags=pq_getmsgbyte(in);
549549
MtmReplicationNodeId=pq_getmsgbyte(in);
550550

551+
n_records=pq_getmsgint(in,4);
552+
if (MtmTransactionRecords!=n_records) {
553+
elog(ERROR,"Transaction %d flags %d contains %d records instead of %d",MtmGetCurrentTransactionId(),flags,MtmTransactionRecords,n_records);
554+
}
555+
551556
/* read fields */
552557
replorigin_session_origin_lsn=pq_getmsgint64(in);/* commit_lsn */
553558
end_lsn=pq_getmsgint64(in);/* end_lsn */
@@ -654,6 +659,8 @@ process_remote_insert(StringInfo s, Relation rel)
654659
ScanKey*index_keys;
655660
inti;
656661

662+
MtmTransactionRecords+=1;
663+
657664
estate=create_rel_estate(rel);
658665
newslot=ExecInitExtraTupleSlot(estate);
659666
oldslot=ExecInitExtraTupleSlot(estate);
@@ -752,6 +759,8 @@ process_remote_update(StringInfo s, Relation rel)
752759
ScanKeyDataskey[INDEX_MAX_KEYS];
753760
HeapTupleremote_tuple=NULL;
754761

762+
MtmTransactionRecords+=1;
763+
755764
action=pq_getmsgbyte(s);
756765

757766
/* old key present, identifying key changed */
@@ -869,6 +878,8 @@ process_remote_delete(StringInfo s, Relation rel)
869878
ScanKeyDataskey[INDEX_MAX_KEYS];
870879
boolfound_old;
871880

881+
MtmTransactionRecords+=1;
882+
872883
estate=create_rel_estate(rel);
873884
oldslot=ExecInitExtraTupleSlot(estate);
874885
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
@@ -956,6 +967,7 @@ void MtmExecutor(int id, void* work, size_t size)
956967
}
957968
MemoryContextSwitchTo(ApplyContext);
958969
replorigin_session_origin=InvalidRepOriginId;
970+
MtmTransactionRecords=0;
959971
PG_TRY();
960972
{
961973
while (true) {

‎pglogical_proto.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include"pglogical_relid_map.h"
4040

4141
staticboolMtmIsFilteredTxn;
42+
staticintMtmTransactionRecords;
4243

4344
staticvoidpglogical_write_rel(StringInfoout,PGLogicalOutputData*data,Relationrel);
4445

@@ -106,7 +107,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
106107
{
107108
boolisRecovery=MtmIsRecoveredNode(MtmReplicationNodeId);
108109
csn_tcsn=MtmTransactionSnapshot(txn->xid);
109-
MTM_LOG2("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d",MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery);
110+
MTM_LOG1("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111+
MyProcPid,txn->xid,MtmReplicationNodeId,csn,isRecovery,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
110112

111113
if (csn==INVALID_CSN&& !isRecovery) {
112114
MtmIsFilteredTxn= true;
@@ -116,6 +118,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116118
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
117119
pq_sendint64(out,csn);
118120
MtmIsFilteredTxn= false;
121+
MtmTransactionRecords=0;
119122
}
120123
}
121124

@@ -137,6 +140,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
137140
{
138141
uint8flags=0;
139142

143+
MTM_LOG1("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
144+
MyProcPid,txn->xid,MtmReplicationNodeId,txn->restart_decoding_lsn,txn->first_lsn,txn->end_lsn,MyReplicationSlot->data.confirmed_flush);
145+
146+
140147
if (txn->xact_action==XLOG_XACT_COMMIT)
141148
flags=PGLOGICAL_COMMIT;
142149
elseif (txn->xact_action==XLOG_XACT_PREPARE)
@@ -150,6 +157,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150157

151158
if (flags==PGLOGICAL_COMMIT||flags==PGLOGICAL_PREPARE) {
152159
if (MtmIsFilteredTxn) {
160+
Assert(MtmTransactionRecords==0);
153161
return;
154162
}
155163
}else {
@@ -161,6 +169,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161169
*/
162170
if (csn==INVALID_CSN&& !isRecovery)
163171
{
172+
Assert(MtmTransactionRecords==0);
164173
return;
165174
}
166175
if (MtmRecoveryCaughtUp(MtmReplicationNodeId,txn->end_lsn)) {
@@ -176,18 +185,23 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
176185
pq_sendbyte(out,flags);
177186
pq_sendbyte(out,MtmNodeId);
178187

188+
Assert(txn->xact_action!=XLOG_XACT_PREPARE||txn->xid<1000||MtmTransactionRecords >=2);
189+
pq_sendint(out,MtmTransactionRecords,4);
190+
179191
/* send fixed fields */
180192
pq_sendint64(out,commit_lsn);
181193
pq_sendint64(out,txn->end_lsn);
182194
pq_sendint64(out,txn->commit_time);
183195

184196
if (txn->xact_action==XLOG_XACT_COMMIT_PREPARED) {
197+
Assert(MtmTransactionRecords==0);
185198
pq_sendint64(out,MtmGetTransactionCSN(txn->xid));
186199
}
187200
if (txn->xact_action!=XLOG_XACT_COMMIT) {
188201
pq_sendstring(out,txn->gid);
189202
}
190203

204+
MtmTransactionRecords=0;
191205
MTM_TXTRACE(txn,"pglogical_write_commit Finish");
192206
}
193207

@@ -199,6 +213,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
199213
Relationrel,HeapTuplenewtuple)
200214
{
201215
if (!MtmIsFilteredTxn) {
216+
MtmTransactionRecords+=1;
202217
pq_sendbyte(out,'I');/* action INSERT */
203218
pglogical_write_tuple(out,data,rel,newtuple);
204219
}
@@ -212,6 +227,11 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
212227
Relationrel,HeapTupleoldtuple,HeapTuplenewtuple)
213228
{
214229
if (!MtmIsFilteredTxn) {
230+
MtmTransactionRecords+=1;
231+
232+
MTM_LOG1("%d: pglogical_write_update confirmed_flush=%lx",MyProcPid,MyReplicationSlot->data.confirmed_flush);
233+
234+
215235
pq_sendbyte(out,'U');/* action UPDATE */
216236
/* FIXME support whole tuple (O tuple type) */
217237
if (oldtuple!=NULL)
@@ -233,6 +253,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
233253
Relationrel,HeapTupleoldtuple)
234254
{
235255
if (!MtmIsFilteredTxn) {
256+
MtmTransactionRecords+=1;
236257
pq_sendbyte(out,'D');/* action DELETE */
237258
pglogical_write_tuple(out,data,rel,oldtuple);
238259
}

‎tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ if [ "${1:0:1}" = '-' ]; then
66
set -- postgres"$@"
77
fi
88

9-
#echo "/pg/%p.%s.%c.%P.core" | sudo tee /proc/sys/kernel/core_pattern
9+
echo"/pg/%p.%s.%c.%P.core"| sudo tee /proc/sys/kernel/core_pattern
1010

1111
if ["$1"='postgres' ];then
1212
mkdir -p"$PGDATA"

‎tests2/test_recovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def setUpClass(self):
1212
"dbname=postgres user=postgres host=127.0.0.1",
1313
"dbname=postgres user=postgres host=127.0.0.1 port=5433",
1414
"dbname=postgres user=postgres host=127.0.0.1 port=5434"
15-
],n_accounts=100000)
15+
],n_accounts=1000)
1616
self.client.bgrun()
1717
time.sleep(5)
1818

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp