Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbf09fde

Browse files
committed
Revert "Revert "Merge branch 'PGPROEE9_6' into PGPROEE9_6_MULTIMASTER""
This reverts commit1460198.
1 parenta9317a7 commitbf09fde

38 files changed

+2482
-957
lines changed

‎contrib/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ SUBDIRS = \
4747
pg_visibility\
4848
pg_wait_sampling\
4949
postgres_fdw\
50+
referee\
5051
rum\
5152
seg\
5253
spi\

‎contrib/mmts/Cluster.pm

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ sub new
6666
my$node = new PostgresNode("node$i",$host,$pgport);
6767
$node->{id} =$i;
6868
$node->{arbiter_port} =$arbiter_port;
69+
$node->{mmconnstr} ="${\$node->connstr('postgres') } arbiter_port=${\$node->{arbiter_port} }";
6970
push(@$nodes,$node);
7071
}
7172

@@ -89,47 +90,54 @@ sub init
8990
}
9091
}
9192

93+
suball_connstrs
94+
{
95+
my ($self) =@_;
96+
my$nodes =$self->{nodes};
97+
returnjoin(',',map {"${\$_->connstr('postgres') } arbiter_port=${\$_->{arbiter_port} }" }@$nodes);
98+
}
99+
100+
92101
subconfigure
93102
{
94103
my ($self) =@_;
95104
my$nodes =$self->{nodes};
96-
my$nnodes =scalar @{$nodes };
97105

98-
my$connstr =join(',',map {"${\$_->connstr('postgres') } arbiter_port=${\$_->{arbiter_port} }" }@$nodes);
106+
my$connstr =$self->all_connstrs();
99107

100108
foreachmy$node (@$nodes)
101109
{
102110
my$id =$node->{id};
103111
my$host =$node->host;
104112
my$pgport =$node->port;
105113
my$arbiter_port =$node->{arbiter_port};
114+
my$unix_sock_dir =$ENV{PGHOST};
106115

107116
$node->append_conf("postgresql.conf",qq(
108117
log_statement = none
109118
listen_addresses = '$host'
110-
unix_socket_directories = ''
119+
unix_socket_directories = '$unix_sock_dir'
111120
port =$pgport
112121
max_prepared_transactions = 10
113122
max_connections = 10
114123
max_worker_processes = 100
115124
wal_level = logical
116-
max_wal_senders =5
125+
max_wal_senders =6
117126
wal_sender_timeout = 0
118127
default_transaction_isolation = 'repeatable read'
119-
max_replication_slots =5
128+
max_replication_slots =6
120129
shared_preload_libraries = 'multimaster'
121130
shared_buffers = 16MB
122131
123132
multimaster.arbiter_port =$arbiter_port
124133
multimaster.workers = 1
125134
multimaster.node_id =$id
126135
multimaster.conn_strings = '$connstr'
127-
multimaster.heartbeat_recv_timeout =2050
136+
multimaster.heartbeat_recv_timeout =1050
128137
multimaster.heartbeat_send_timeout = 250
129-
multimaster.max_nodes =$nnodes
130-
multimaster.ignore_tables_without_pk =true
138+
multimaster.max_nodes =6
139+
multimaster.ignore_tables_without_pk =false
131140
multimaster.queue_size = 4194304
132-
multimaster.min_2pc_timeout = 150000
133141
log_line_prefix = '%t: '
134142
));
135143

‎contrib/mmts/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ COPY ./ /pg/mmts/
66
RUN export USE_PGXS=1 && \
77
cd /pg/mmts && make clean && make install
88

9+
RUN export USE_PGXS=1 && \
10+
cd /pg/src/contrib/referee && make clean && make install
11+
912
# pg_regress client assumes such dir exists on server
1013
RUN cp /pg/src/src/test/regress/*.so /pg/install/lib/postgresql/
1114
USER postgres

‎contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
4-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o
4+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o referee.o state.o
55
MODULE_big = multimaster
66

77
PG_CPPFLAGS = -I$(libpq_srcdir)

‎contrib/mmts/arbiter.c

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676

7777

7878
#include"multimaster.h"
79+
#include"state.h"
7980

8081
#defineMAX_ROUTES 16
8182
#defineINIT_BUFFER_SIZE 1024
@@ -189,7 +190,6 @@ static void MtmDisconnect(int node)
189190
MtmUnregisterSocket(sockets[node]);
190191
pg_closesocket(sockets[node],MtmUseRDMA);
191192
sockets[node]=-1;
192-
MtmOnNodeDisconnect(node+1);
193193
}
194194

195195
staticintMtmWaitSocket(intsd,boolforWrite,timestamp_ttimeoutMsec)
@@ -316,25 +316,22 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
316316
}else {
317317
BIT_CLEAR(Mtm->currentLockNodeMask,resp->node-1);
318318
}
319-
if (
320-
(BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)||Mtm->status==MTM_IN_MINORITY )
321-
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
322-
&&Mtm->status!=MTM_RECOVERY
323-
&&Mtm->status!=MTM_RECOVERED
324-
&&Mtm->nodes[MtmNodeId-1].lastStatusChangeTime+MSEC_TO_USEC(MtmNodeDisableDelay)<MtmGetSystemTime())
325-
{
326-
MTM_ELOG(WARNING,"Node %d thinks that I'm dead, while I'm %s (message %s)",resp->node,MtmNodeStatusMnem[Mtm->status],MtmMessageKindMnem[resp->code]);
327-
BIT_SET(Mtm->disabledNodeMask,MtmNodeId-1);
328-
Mtm->nConfigChanges+=1;
329-
MtmSwitchClusterMode(MTM_RECOVERY);
330-
}elseif (BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)&&sockets[resp->node-1]<0) {
331-
/* We receive heartbeat from disabled node.
319+
320+
// if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1))
321+
// {
322+
// MtmStateProcessEvent(MTM_REMOTE_DISABLE);
323+
// }
324+
325+
if (BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)&&
326+
sockets[resp->node-1]<0)
327+
{
328+
/* We've received heartbeat from disabled node.
332329
* Looks like it is restarted.
333330
* Try to reconnect to it.
334331
*/
335332
MTM_ELOG(WARNING,"Receive heartbeat from disabled node %d",resp->node);
336333
BIT_SET(Mtm->reconnectMask,resp->node-1);
337-
}
334+
}
338335
}
339336

340337
staticvoidMtmScheduleHeartbeat()
@@ -543,17 +540,9 @@ static void MtmOpenConnections()
543540
for (i=0;i<nNodes;i++) {
544541
if (i+1!=MtmNodeId&&i<Mtm->nAllNodes) {
545542
sockets[i]=MtmConnectSocket(i,Mtm->nodes[i].con.arbiterPort);
546-
if (sockets[i]<0) {
547-
MtmOnNodeDisconnect(i+1);
548-
}
549543
}
550544
}
551-
if (Mtm->nLiveNodes<Mtm->nAllNodes/2+1) {/* no quorum */
552-
MTM_ELOG(WARNING,"Node is out of quorum: only %d nodes of %d are accessible",Mtm->nLiveNodes,Mtm->nAllNodes);
553-
MtmSwitchClusterMode(MTM_IN_MINORITY);
554-
}elseif (Mtm->status==MTM_INITIALIZATION) {
555-
MtmSwitchClusterMode(MTM_CONNECTED);
556-
}
545+
MtmStateProcessEvent(MTM_ARBITER_RECEIVER_START);
557546
}
558547

559548

@@ -586,7 +575,6 @@ static bool MtmSendToNode(int node, void const* buf, int size)
586575
}
587576
sockets[node]=MtmConnectSocket(node,Mtm->nodes[node].con.arbiterPort);
588577
if (sockets[node]<0) {
589-
MtmOnNodeDisconnect(node+1);
590578
result= false;
591579
break;
592580
}
@@ -716,16 +704,18 @@ static void MtmSender(Datum arg)
716704
{
717705
intnNodes=MtmMaxNodes;
718706
inti;
707+
MtmBuffer*txBuffer;
719708

720709
MtmBackgroundWorker= true;
721710

722-
MtmBuffer*txBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
711+
txBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
723712
MTM_ELOG(LOG,"Start arbiter sender %d",MyProcPid);
724713
InitializeTimeouts();
725714

726715
pqsignal(SIGINT,SetStop);
727716
pqsignal(SIGQUIT,SetStop);
728717
pqsignal(SIGTERM,SetStop);
718+
pqsignal(SIGHUP,PostgresSigHupHandler);
729719

730720
/* We're now ready to receive signals */
731721
BackgroundWorkerUnblockSignals();
@@ -744,6 +734,12 @@ static void MtmSender(Datum arg)
744734
PGSemaphoreLock(&Mtm->sendSemaphore);
745735
CHECK_FOR_INTERRUPTS();
746736

737+
if (ConfigReloadPending)
738+
{
739+
ConfigReloadPending= false;
740+
ProcessConfigFile(PGC_SIGHUP);
741+
}
742+
747743
MtmCheckHeartbeat();
748744
/*
749745
* Use shared lock to improve locality,
@@ -805,6 +801,7 @@ static void MtmMonitor(Datum arg)
805801
pqsignal(SIGINT,SetStop);
806802
pqsignal(SIGQUIT,SetStop);
807803
pqsignal(SIGTERM,SetStop);
804+
pqsignal(SIGHUP,PostgresSigHupHandler);
808805

809806
MtmBackgroundWorker= true;
810807

@@ -819,6 +816,13 @@ static void MtmMonitor(Datum arg)
819816
if (rc&WL_POSTMASTER_DEATH) {
820817
break;
821818
}
819+
820+
if (ConfigReloadPending)
821+
{
822+
ConfigReloadPending= false;
823+
ProcessConfigFile(PGC_SIGHUP);
824+
}
825+
822826
MtmRefreshClusterStatus();
823827
}
824828
}
@@ -844,6 +848,7 @@ static void MtmReceiver(Datum arg)
844848
pqsignal(SIGINT,SetStop);
845849
pqsignal(SIGQUIT,SetStop);
846850
pqsignal(SIGTERM,SetStop);
851+
pqsignal(SIGHUP,PostgresSigHupHandler);
847852

848853
MtmBackgroundWorker= true;
849854

@@ -879,7 +884,14 @@ static void MtmReceiver(Datum arg)
879884
for (j=0;j<n;j++) {
880885
if (events[j].events&EPOLLIN)
881886
#else
882-
fd_setevents;
887+
fd_setevents;
888+
889+
if (ConfigReloadPending)
890+
{
891+
ConfigReloadPending= false;
892+
ProcessConfigFile(PGC_SIGHUP);
893+
}
894+
883895
do {
884896
structtimevaltv;
885897
events=inset;
@@ -1006,7 +1018,7 @@ static void MtmReceiver(Datum arg)
10061018
default:
10071019
break;
10081020
}
1009-
if (BIT_CHECK(msg->disabledNodeMask,node-1)) {
1021+
if (BIT_CHECK(msg->disabledNodeMask,node-1)||BIT_CHECK(Mtm->disabledNodeMask,node-1)) {
10101022
MTM_ELOG(WARNING,"Ignore message from dead node %d\n",node);
10111023
continue;
10121024
}
@@ -1084,7 +1096,7 @@ static void MtmReceiver(Datum arg)
10841096
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
10851097
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d",ts->gid, (long64)ts->xid,node);
10861098
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1087-
ts->aborted_by_node=node;
1099+
ts->abortedByNode=node;
10881100
MtmAbortTransaction(ts);
10891101
}
10901102
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {
@@ -1161,4 +1173,3 @@ static void MtmReceiver(Datum arg)
11611173
}
11621174
proc_exit(1);/* force restart of this bgwroker */
11631175
}
1164-

‎contrib/mmts/bgwpool.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include"bgwpool.h"
1616
#include"multimaster.h"
17+
#include"utils/guc.h"
1718

1819
boolMtmIsLogicalReceiver;
1920
intMtmMaxWorkers;
@@ -43,14 +44,21 @@ static void BgwPoolMainLoop(BgwPool* pool)
4344
pqsignal(SIGINT,BgwShutdownWorker);
4445
pqsignal(SIGQUIT,BgwShutdownWorker);
4546
pqsignal(SIGTERM,BgwShutdownWorker);
47+
pqsignal(SIGHUP,PostgresSigHupHandler);
4648

4749
BackgroundWorkerUnblockSignals();
4850
BackgroundWorkerInitializeConnection(pool->dbname,pool->dbuser);
4951
ActivePortal=&fakePortal;
5052
ActivePortal->status=PORTAL_ACTIVE;
5153
ActivePortal->sourceText="";
5254

53-
while (true) {
55+
while (true) {
56+
if (ConfigReloadPending)
57+
{
58+
ConfigReloadPending= false;
59+
ProcessConfigFile(PGC_SIGHUP);
60+
}
61+
5462
PGSemaphoreLock(&pool->available);
5563
SpinLockAcquire(&pool->lock);
5664
if (pool->shutdown) {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp