Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfc058eb

Browse files
committed
Resolve conflicts
2 parents7fa827b +df085fc commitfc058eb

File tree

7 files changed

+83
-21
lines changed

7 files changed

+83
-21
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmMonitor(Datum arg);
9191
staticvoidMtmSendHeartbeat(void);
9292
staticboolMtmSendToNode(intnode,voidconst*buf,intsize);
9393

94-
94+
/*
9595
static char const* const messageText[] =
9696
{
9797
"INVALID",
@@ -105,6 +105,7 @@ static char const* const messageText[] =
105105
"POLL_REQUEST",
106106
"POLL_STATUS"
107107
};
108+
*/
108109

109110
staticBackgroundWorkerMtmSenderWorker= {
110111
"mtm-sender",
@@ -363,7 +364,7 @@ static void MtmSendHeartbeat()
363364
MTM_LOG2("Send heartbeat to node %d with timestamp %ld",i+1,now);
364365
}
365366
}else {
366-
MTM_LOG1("Do not send heartbeat to node %d, busy mask %ld, status %d",i+1,busy_mask,Mtm->status);
367+
MTM_LOG1("Do not send heartbeat to node %d, busy mask %lld, status %d",i+1, (long long)busy_mask,Mtm->status);
367368
}
368369
}
369370
}
@@ -940,7 +941,9 @@ static void MtmReceiver(Datum arg)
940941
CommitTransactionCommand();
941942
Assert(ts->status==TRANSACTION_STATUS_ABORTED);
942943
}else {
943-
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%lx, participantsMask=%lx",msg->status,msg->gid,node,ts->votedMask,ts->participantsMask& ~Mtm->disabledNodeMask);
944+
elog(LOG,"Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
945+
msg->status,msg->gid,node, (long long)ts->votedMask,
946+
(long long) (ts->participantsMask& ~Mtm->disabledNodeMask) );
944947
continue;
945948
}
946949
}elseif (ts->status==TRANSACTION_STATUS_ABORTED&&msg->status==TRANSACTION_STATUS_COMMITTED) {

‎contrib/mmts/multimaster.c‎

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ static ExecutorFinish_hook_type PreviousExecutorFinishHook;
232232
staticProcessUtility_hook_typePreviousProcessUtilityHook;
233233
staticshmem_startup_hook_typePreviousShmemStartupHook;
234234

235+
staticnodemask_tlastKnownMatrix[MAX_NODES];
235236

236237
staticvoidMtmExecutorFinish(QueryDesc*queryDesc);
237238
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
@@ -1368,8 +1369,8 @@ static void MtmEnableNode(int nodeId)
13681369
voidMtmRecoveryCompleted(void)
13691370
{
13701371
inti;
1371-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, connectivity mask=%lx, live nodes=%d",
1372-
MtmNodeId,Mtm->disabledNodeMask,Mtm->connectivityMask,Mtm->nLiveNodes);
1372+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d",
1373+
MtmNodeId,(long long)Mtm->disabledNodeMask, (long long)Mtm->connectivityMask,Mtm->nLiveNodes);
13731374
MtmLock(LW_EXCLUSIVE);
13741375
Mtm->recoverySlot=0;
13751376
BIT_CLEAR(Mtm->disabledNodeMask,MtmNodeId-1);
@@ -1563,7 +1564,8 @@ static bool
15631564
MtmBuildConnectivityMatrix(nodemask_t*matrix,boolnowait)
15641565
{
15651566
inti,j,n=Mtm->nAllNodes;
1566-
fprintf(stderr,"Connectivity matrix:\n");
1567+
boolchanged= false;
1568+
15671569
for (i=0;i<n;i++) {
15681570
if (i+1!=MtmNodeId) {
15691571
void*data=RaftableGet(psprintf("node-mask-%d",i+1),NULL,NULL,nowait);
@@ -1574,12 +1576,27 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
15741576
}else {
15751577
matrix[i]=Mtm->connectivityMask;
15761578
}
1577-
for (j=0;j<n;j++) {
1578-
putc(BIT_CHECK(matrix[i],j) ?'X' :'+',stderr);
1579+
1580+
if (lastKnownMatrix[i]!=matrix[i])
1581+
{
1582+
changed= true;
1583+
lastKnownMatrix[i]=matrix[i];
15791584
}
1580-
putc('\n',stderr);
15811585
}
1582-
fputs("-----------------------\n",stderr);
1586+
1587+
/* Print matrix if changed */
1588+
if (changed)
1589+
{
1590+
fprintf(stderr,"Connectivity matrix:\n");
1591+
for (i=0;i<n;i++)
1592+
{
1593+
for (j=0;j<n;j++)
1594+
putc(BIT_CHECK(matrix[i],j) ?'X' :'+',stderr);
1595+
putc('\n',stderr);
1596+
}
1597+
fputs("-----------------------\n",stderr);
1598+
}
1599+
15831600
/* make matrix symetric: required for Bron–Kerbosch algorithm */
15841601
for (i=0;i<n;i++) {
15851602
for (j=0;j<i;j++) {
@@ -1588,8 +1605,9 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
15881605
}
15891606
matrix[i] &= ~((nodemask_t)1 <<i);
15901607
}
1608+
15911609
return true;
1592-
}
1610+
}
15931611

15941612

15951613
/**
@@ -1610,6 +1628,11 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16101628
}
16111629

16121630
clique=MtmFindMaxClique(matrix,Mtm->nAllNodes,&clique_size);
1631+
1632+
if (clique== (~Mtm->disabledNodeMask& (((nodemask_t)1 <<Mtm->nAllNodes)-1)) )
1633+
/* Nothing is changed */
1634+
return false;
1635+
16131636
if (clique_size >=Mtm->nAllNodes/2+1) {/* have quorum */
16141637
fprintf(stderr,"Old mask: ");
16151638
for (i=0;i<Mtm->nAllNodes;i++) {
@@ -1648,7 +1671,7 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16481671
/* Interrupt voting for active transaction and abort them */
16491672
for (ts=Mtm->transListHead;ts!=NULL;ts=ts->next) {
16501673
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1651-
ts->gid,ts->gtid.nхode,ts->xid,ts->status,ts->gtid.xid);
1674+
ts->gid,ts->gtid.node,ts->xid,ts->status,ts->gtid.xid);
16521675
if (MtmIsCoordinator(ts)) {
16531676
if (!ts->votingCompleted&&disabled!=0&&ts->status!=TRANSACTION_STATUS_ABORTED) {
16541677
MtmAbortTransaction(ts);
@@ -1705,7 +1728,7 @@ void MtmOnNodeDisconnect(int nodeId)
17051728
MtmLock(LW_EXCLUSIVE);
17061729
BIT_SET(Mtm->connectivityMask,nodeId-1);
17071730
BIT_SET(Mtm->reconnectMask,nodeId-1);
1708-
MTM_LOG1("Disconnect node %d connectivity mask %lx",nodeId,Mtm->connectivityMask);
1731+
MTM_LOG1("Disconnect node %d connectivity mask %llx",nodeId, (long long)Mtm->connectivityMask);
17091732
MtmUnlock();
17101733

17111734
if (!RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false))
@@ -1755,7 +1778,7 @@ void MtmOnNodeConnect(int nodeId)
17551778
BIT_CLEAR(Mtm->reconnectMask,nodeId-1);
17561779
MtmUnlock();
17571780

1758-
MTM_LOG1("Reconnect node %d, connectivityMask=%lx",nodeId,Mtm->connectivityMask);
1781+
MTM_LOG1("Reconnect node %d, connectivityMask=%llx",nodeId, (long long)Mtm->connectivityMask);
17591782
RaftableSet(psprintf("node-mask-%d",MtmNodeId),&Mtm->connectivityMask,sizeofMtm->connectivityMask, false);
17601783
}
17611784

@@ -3579,7 +3602,12 @@ static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
35793602
hash_search(MtmGucHash,key,HASH_REMOVE,NULL);
35803603
}
35813604
break;
3605+
35823606
caseVAR_RESET_ALL:
3607+
{
3608+
hash_destroy(MtmGucHash);
3609+
MtmGucHashInit();
3610+
}
35833611
break;
35843612

35853613
caseVAR_SET_MULTI:
@@ -3591,7 +3619,11 @@ static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
35913619

35923620
staticvoidMtmGucDiscard(DiscardStmt*stmt)
35933621
{
3594-
3622+
if (stmt->target==DISCARD_ALL)
3623+
{
3624+
hash_destroy(MtmGucHash);
3625+
MtmGucHashInit();
3626+
}
35953627
}
35963628

35973629
staticvoidMtmGucClear(void)
@@ -3616,7 +3648,18 @@ static char * MtmGucSerialize(void)
36163648
appendStringInfoString(serialized_gucs,"SET ");
36173649
appendStringInfoString(serialized_gucs,hentry->key);
36183650
appendStringInfoString(serialized_gucs," TO ");
3619-
appendStringInfoString(serialized_gucs,hentry->value);
3651+
3652+
/* quite a crutch */
3653+
if (strcmp(hentry->key,"work_mem")==0)
3654+
{
3655+
appendStringInfoString(serialized_gucs,"'");
3656+
appendStringInfoString(serialized_gucs,hentry->value);
3657+
appendStringInfoString(serialized_gucs,"'");
3658+
}
3659+
else
3660+
{
3661+
appendStringInfoString(serialized_gucs,hentry->value);
3662+
}
36203663
appendStringInfoString(serialized_gucs,"; ");
36213664
}
36223665
}
@@ -3838,6 +3881,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38383881
{
38393882
MTM_LOG1("Xact accessed temp table, stopping replication");
38403883
MtmTx.isDistributed= false;/* Skip */
3884+
MtmTx.snapshot=INVALID_CSN;
38413885
}
38423886

38433887
}

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,8 @@ process_remote_insert(StringInfo s, Relation rel)
680680
ScanKey*index_keys;
681681
inti;
682682

683+
PushActiveSnapshot(GetTransactionSnapshot());
684+
683685
estate=create_rel_estate(rel);
684686
newslot=ExecInitExtraTupleSlot(estate);
685687
oldslot=ExecInitExtraTupleSlot(estate);
@@ -755,6 +757,9 @@ process_remote_insert(StringInfo s, Relation rel)
755757

756758
ExecCloseIndices(estate->es_result_relation_info);
757759

760+
if (ActiveSnapshotSet())
761+
PopActiveSnapshot();
762+
758763
heap_close(rel,NoLock);
759764
ExecResetTupleTable(estate->es_tupleTable, true);
760765
FreeExecutorState(estate);

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,14 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
112112

113113
if (!isRecovery&&csn==INVALID_CSN) {
114114
MtmIsFilteredTxn= true;
115-
}else {
115+
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered",MyProcPid,txn->xid);
116+
}else {
117+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent",MyProcPid,txn->xid);
118+
MtmIsFilteredTxn= false;
116119
pq_sendbyte(out,'B');/* BEGIN */
117120
pq_sendint(out,MtmNodeId,4);
118121
pq_sendint(out,isRecovery ?InvalidTransactionId :txn->xid,4);
119122
pq_sendint64(out,csn);
120-
MtmIsFilteredTxn= false;
121123
MtmTransactionRecords=0;
122124
}
123125
}
@@ -128,6 +130,12 @@ pglogical_write_message(StringInfo out,
128130
{
129131
if (*prefix=='L') {
130132
MTM_LOG1("Send deadlock message to node %d",MtmReplicationNodeId);
133+
}else {
134+
if (MtmIsFilteredTxn)
135+
{
136+
MTM_LOG3("%d: pglogical_write_message filtered",MyProcPid);
137+
return;
138+
}
131139
}
132140
pq_sendbyte(out,*prefix);
133141
pq_sendint(out,sz,4);

‎contrib/raftable/raftable.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ static RaftableMessage *raftable_try_query(RaftableMessage *msg, size_t size, si
332332
RaftableMessage*answer;
333333

334334
s=get_connection(timeout);
335-
if (s<0)returnfalse;
335+
if (s<0)returnNULL;
336336

337337
if (timeout_happened(timeout))
338338
{
@@ -619,7 +619,7 @@ pid_t raftable_start(int id)
619619
snprintf(worker.bgw_name,BGW_MAXLEN,"raftable worker %d",sharedcfg->id);
620620
worker.bgw_flags=BGWORKER_SHMEM_ACCESS;
621621
worker.bgw_start_time=BgWorkerStart_ConsistentState;
622-
worker.bgw_restart_time=BGW_NEVER_RESTART;
622+
worker.bgw_restart_time=RAFTABLE_RESTART_TIMEOUT;
623623
worker.bgw_main=raftable_worker_main;
624624
worker.bgw_main_arg=PointerGetDatum(&sharedcfg);
625625

‎contrib/raftable/raftable.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef__RAFTABLE_H__
22
#define__RAFTABLE_H__
33

4+
#defineRAFTABLE_RESTART_TIMEOUT 1
5+
46
/*
57
* Gets value by key. Returns the value or NULL if not found. Gives up after
68
* 'timeout_ms' milliseconds

‎src/test/regress/serial_schedule‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ test: triggers
7070
test: inherit
7171
test: create_table_like
7272
test: typed_table
73-
#test: vacuum # issue#18
73+
test: vacuum
7474
test: drop_if_exists
7575
test: updatable_views
7676
test: rolenames

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp