Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb99e7db

Browse files
committed
Fix unreleased lock in GetPreparedTransactionState
1 parent19eab46 commitb99e7db

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

‎contrib/mmts/multimaster.c

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,10 @@ void MtmRecoveryCompleted(void)
17731773
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
17741774
MtmNodeId, (long long)Mtm->disabledNodeMask,
17751775
(long long)SELF_CONNECTIVITY_MASK,GetXLogInsertRecPtr(),Mtm->nLiveNodes);
1776+
if (Mtm->nAllNodes >=3) {
1777+
elog(WARNING,"restartLSNs at the end of recovery: {%lx, %lx, %lx}",
1778+
Mtm->nodes[0].restartLSN,Mtm->nodes[1].restartLSN,Mtm->nodes[2].restartLSN);
1779+
}
17761780
MtmLock(LW_EXCLUSIVE);
17771781
Mtm->recoverySlot=0;
17781782
Mtm->recoveredLSN=GetXLogInsertRecPtr();
@@ -3244,7 +3248,12 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32443248
||Mtm->recoverySlot==nodeId)
32453249
{
32463250
/* Choose for recovery first available slot or slot of donor node (if any) */
3247-
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
3251+
if (Mtm->nAllNodes >=3) {
3252+
elog(WARNING,"Process %d starts recovery from node %d restartLSNs={%lx, %lx, %lx}",
3253+
MyProcPid,nodeId,Mtm->nodes[0].restartLSN,Mtm->nodes[1].restartLSN,Mtm->nodes[2].restartLSN);
3254+
}else {
3255+
elog(WARNING,"Process %d starts recovery from node %d",MyProcPid,nodeId);
3256+
}
32483257
Mtm->recoverySlot=nodeId;
32493258
Mtm->nReceivers=0;
32503259
Mtm->nSenders=0;
@@ -3383,7 +3392,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
33833392
sscanf(strVal(elem->arg),"%lx",&recoveredLSN);
33843393
MTM_LOG1("Recovered position of node %d is %lx",MtmReplicationNodeId,recoveredLSN);
33853394
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN<recoveredLSN) {
3386-
MTM_LOG2("[restartlsn]node %d:%lx-> %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
3395+
MTM_LOG1("Advance restartLSN fornode %d from%lxto %lx (MtmReplicationStartupHook)",MtmReplicationNodeId,Mtm->nodes[MtmReplicationNodeId-1].restartLSN,recoveredLSN);
33873396
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN==InvalidXLogRecPtr
33883397
||recoveredLSN<Mtm->nodes[MtmReplicationNodeId-1].restartLSN+MtmMaxRecoveryLag);
33893398
Mtm->nodes[MtmReplicationNodeId-1].restartLSN=recoveredLSN;
@@ -3587,9 +3596,13 @@ bool MtmFilterTransaction(char* record, int size)
35873596
origin_node=pq_getmsgbyte(&s);
35883597
origin_lsn=pq_getmsgint64(&s);
35893598

3590-
Assert(replication_node==MtmReplicationNodeId&&
3591-
origin_node!=0&&
3592-
(Mtm->status==MTM_RECOVERY||origin_node==replication_node));
3599+
Assert(replication_node==MtmReplicationNodeId);
3600+
if (!(origin_node!=0&&
3601+
(Mtm->status==MTM_RECOVERY||origin_node==replication_node)))
3602+
{
3603+
elog(WARNING,"Receive redirected commit event %d from node %d origin node %d origin LSN %lx in %s mode",
3604+
event,replication_node,origin_node,origin_lsn,MtmNodeStatusMnem[Mtm->status]);
3605+
}
35933606

35943607
switch (event)
35953608
{
@@ -3616,8 +3629,8 @@ bool MtmFilterTransaction(char* record, int size)
36163629
}
36173630

36183631
if (duplicate) {
3619-
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx)",
3620-
gid,replication_node,event,Mtm->nodes[origin_node-1].restartLSN,origin_node,restart_lsn,end_lsn,origin_lsn);
3632+
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx) mode %s",
3633+
gid,replication_node,event,Mtm->nodes[origin_node-1].restartLSN,origin_node,restart_lsn,end_lsn,origin_lsn,MtmNodeStatusMnem[Mtm->status]);
36213634
}else {
36223635
MTM_LOG2("Apply transaction %s from node %d lsn %lx, event=%x, origin node %d, original lsn=%lx, current lsn=%lx",
36233636
gid,replication_node,end_lsn,event,origin_node,origin_lsn,restart_lsn);

‎contrib/mmts/pglogical_receiver.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ pglogical_receiver_main(Datum main_arg)
342342
* Them are either empty, either new node is synchronized using base_backup.
343343
* So we assume that LSNs are the same for local and remote node
344344
*/
345-
originStartPos=Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
345+
originStartPos=(Mtm->status==MTM_RECOVERY&&Mtm->donorNodeId==nodeId) ?GetXLogInsertRecPtr() :InvalidXLogRecPtr;
346346
MTM_LOG1("Start logical receiver at position %lx from node %d",originStartPos,nodeId);
347347
}else {
348348
if (Mtm->nodes[nodeId-1].restartLSN<originStartPos) {
349-
MTM_LOG2("[restartlsn]node %d: %lx-> %lx (pglogical_receiver_mains)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
349+
MTM_LOG1("Advance restartLSN fornode %d:from%lxto %lx (pglogical_receiver_main)",nodeId,Mtm->nodes[nodeId-1].restartLSN,originStartPos);
350350
Mtm->nodes[nodeId-1].restartLSN=originStartPos;
351351
}
352352
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d",originStartPos,originId,nodeId);
@@ -545,16 +545,17 @@ pglogical_receiver_main(Datum main_arg)
545545
}
546546
if (stmt[0]=='Z'|| (stmt[0]=='M'&& (stmt[1]=='L'||stmt[1]=='A'||stmt[1]=='C'))) {
547547
MTM_LOG3("Process '%c' message from %d",stmt[1],nodeId);
548-
if (stmt[1]=='C') {/* concurrent DDL */
548+
if (stmt[0]=='M'&&stmt[1]=='C') {/* concurrent DDL should be executed by parallel workers */
549549
MtmExecute(stmt,rc-hdr_len);
550550
}else {
551-
MtmExecutor(stmt,rc-hdr_len);
551+
MtmExecutor(stmt,rc-hdr_len);/* all other messages can be processed by receiver itself */
552552
}
553553
}else {
554554
ByteBufferAppend(&buf,stmt,rc-hdr_len);
555555
if (stmt[0]=='C')/* commit */
556556
{
557-
if (!MtmFilterTransaction(stmt,rc-hdr_len)) {
557+
if (!MtmFilterTransaction(stmt,rc-hdr_len))
558+
{
558559
if (spill_file >=0) {
559560
ByteBufferAppend(&buf,")",1);
560561
pq_sendbyte(&spill_info,'(');
@@ -574,6 +575,7 @@ pglogical_receiver_main(Datum main_arg)
574575
elog(WARNING,"Commit of prepared transaction takes %ld usec, flags=%x",stop-start,stmt[1]);
575576
}
576577
}else {
578+
Assert(stmt[1]==PGLOGICAL_PREPARE||stmt[1]==PGLOGICAL_COMMIT);/* all other commits should be applied in place */
577579
MtmExecute(buf.data,buf.used);
578580
}
579581
}

‎src/backend/access/transam/twophase.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
475475
{
476476
if (strcmp(gxact->gid,gid)==0)
477477
{
478+
LWLockRelease(TwoPhaseStateLock);
478479
ereport(ERROR,
479480
(errcode(ERRCODE_DUPLICATE_OBJECT),
480481
errmsg("transaction identifier \"%s\" is already in use",
@@ -484,11 +485,14 @@ MarkAsPreparing(TransactionId xid, const char *gid,
484485

485486
/* Get a free gxact from the freelist */
486487
if (TwoPhaseState->freeGXacts==NULL)
488+
{
489+
LWLockRelease(TwoPhaseStateLock);
487490
ereport(ERROR,
488491
(errcode(ERRCODE_OUT_OF_MEMORY),
489492
errmsg("maximum number of prepared transactions reached"),
490493
errhint("Increase max_prepared_transactions (currently %d).",
491494
max_prepared_xacts)));
495+
}
492496
gxact=TwoPhaseState->freeGXacts;
493497
TwoPhaseState->freeGXacts=gxact->next;
494498

@@ -793,6 +797,7 @@ bool GetPreparedTransactionState(char const* gid, char* state)
793797
{
794798
inti;
795799
GlobalTransactiongxact;
800+
boolresult= false;
796801

797802
LWLockAcquire(TwoPhaseStateLock,LW_SHARED);
798803
i=string_hash(gid,0) %max_prepared_xacts;
@@ -801,11 +806,12 @@ bool GetPreparedTransactionState(char const* gid, char* state)
801806
if (strcmp(gxact->gid,gid)==0)
802807
{
803808
strcpy(state,gxact->state_3pc);
804-
return true;
809+
result= true;
810+
break;
805811
}
806812
}
807813
LWLockRelease(TwoPhaseStateLock);
808-
returnfalse;
814+
returnresult;
809815
}
810816

811817

@@ -845,6 +851,9 @@ void SetPreparedTransactionState(char const* gid, char const* state)
845851
START_CRIT_SECTION();
846852
MyPgXact->delayChkpt= true;
847853

854+
hdr->xl_origin.origin_lsn=replorigin_session_origin_lsn;
855+
hdr->xl_origin.origin_timestamp=replorigin_session_origin_timestamp;
856+
848857
XLogBeginInsert();
849858
XLogRegisterData(buf,hdr->total_len-sizeof(pg_crc32c));
850859
XLogIncludeOrigin();

‎src/bin/pg_xlogdump/pg_xlogdump.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
#include"access/xlogreader.h"
1919
#include"access/xlogrecord.h"
2020
#include"access/xlog_internal.h"
21+
#include"access/xact.h"
2122
#include"access/transam.h"
2223
#include"common/fe_memutils.h"
2324
#include"getopt_long.h"
2425
#include"rmgrdesc.h"
26+
#include"replication/origin.h"
2527

2628

2729
staticconstchar*progname;
@@ -42,6 +44,7 @@ typedef struct XLogDumpConfig
4244
intstop_after_records;
4345
intalready_displayed_records;
4446
boolfollow;
47+
booldump_origin;
4548
boolstats;
4649
boolstats_per_record;
4750

@@ -439,6 +442,35 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
439442
XLogRecGetXid(record),
440443
(uint32) (record->ReadRecPtr >>32), (uint32)record->ReadRecPtr,
441444
(uint32) (xl_prev >>32), (uint32)xl_prev);
445+
446+
if (config->dump_origin&&XLogRecGetOrigin(record)!=InvalidRepOriginId) {
447+
switch (info&XLOG_XACT_OPMASK) {
448+
caseXLOG_XACT_COMMIT:
449+
caseXLOG_XACT_COMMIT_PREPARED:
450+
{
451+
xl_xact_commit*xlrec;
452+
xl_xact_parsed_commitparsed;
453+
454+
xlrec= (xl_xact_commit*)XLogRecGetData(record);
455+
ParseCommitRecord(info,xlrec,&parsed);
456+
printf("origin_id=%d, origin_lsn=%llx, ",XLogRecGetOrigin(record), (long long)parsed.origin_lsn);
457+
break;
458+
}
459+
caseXLOG_XACT_ABORT:
460+
caseXLOG_XACT_ABORT_PREPARED:
461+
{
462+
xl_xact_abort*xlrec;
463+
xl_xact_parsed_abortparsed;
464+
465+
xlrec= (xl_xact_abort*)XLogRecGetData(record);
466+
ParseAbortRecord(info,xlrec,&parsed);
467+
468+
printf("origin_id=%d, origin_lsn=%llx, ",XLogRecGetOrigin(record), (long long)parsed.origin_lsn);
469+
break;
470+
}
471+
}
472+
}
473+
442474
printf("desc: %s ",id);
443475

444476
/* the desc routine will printf the description directly to stdout */
@@ -678,6 +710,7 @@ usage(void)
678710
printf(" -b, --bkp-details output detailed information about backup blocks\n");
679711
printf(" -e, --end=RECPTR stop reading at log position RECPTR\n");
680712
printf(" -f, --follow keep retrying after reaching end of WAL\n");
713+
printf(" -o, --origin dump origins\n");
681714
printf(" -n, --limit=N number of records to display\n");
682715
printf(" -p, --path=PATH directory in which to find log segment files\n");
683716
printf(" (default: ./pg_xlog)\n");
@@ -710,6 +743,7 @@ main(int argc, char **argv)
710743
{"bkp-details",no_argument,NULL,'b'},
711744
{"end",required_argument,NULL,'e'},
712745
{"follow",no_argument,NULL,'f'},
746+
{"origin",no_argument,NULL,'o'},
713747
{"help",no_argument,NULL,'?'},
714748
{"limit",required_argument,NULL,'n'},
715749
{"path",required_argument,NULL,'p'},
@@ -740,6 +774,7 @@ main(int argc, char **argv)
740774
config.stop_after_records=-1;
741775
config.already_displayed_records=0;
742776
config.follow= false;
777+
config.dump_origin= false;
743778
config.filter_by_rmgr=-1;
744779
config.filter_by_xid=InvalidTransactionId;
745780
config.filter_by_xid_enabled= false;
@@ -752,7 +787,7 @@ main(int argc, char **argv)
752787
gotobad_argument;
753788
}
754789

755-
while ((option=getopt_long(argc,argv,"be:?fn:p:r:s:t:Vx:z",
790+
while ((option=getopt_long(argc,argv,"be:?f?on:p:r:s:t:Vx:z",
756791
long_options,&optindex))!=-1)
757792
{
758793
switch (option)
@@ -772,6 +807,9 @@ main(int argc, char **argv)
772807
case'f':
773808
config.follow= true;
774809
break;
810+
case'o':
811+
config.dump_origin= true;
812+
break;
775813
case'?':
776814
usage();
777815
exit(EXIT_SUCCESS);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp