Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1faa89

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents62d4762 +7f077cf commitf1faa89

File tree

11 files changed

+391
-132
lines changed

11 files changed

+391
-132
lines changed

‎contrib/mmts/Cluster.pm

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use Cwd;
1010

1111
use Socket;
1212

13+
use IPC::Run;
14+
1315
subcheck_port
1416
{
1517
my ($host,$port) =@_;
@@ -27,7 +29,6 @@ sub check_port
2729
}
2830

2931
close(SOCK);
30-
diag("checking for port$port =$available\n");
3132
return$available;
3233
}
3334

@@ -148,6 +149,7 @@ sub start
148149
foreachmy$node (@$nodes)
149150
{
150151
$node->start();
152+
diag"Starting node with connstr 'dbname=postgres port=@{[$node->port() ]} host=@{[$node->host() ]}'";
151153
}
152154
}
153155

@@ -272,4 +274,37 @@ sub poll
272274
return 0;
273275
}
274276

277+
subpgbench()
278+
{
279+
my ($self,$node,@args) =@_;
280+
my$pgbench_handle =$self->pgbench_async($node,@args);
281+
$self->pgbench_await($pgbench_handle);
282+
}
283+
284+
subpgbench_async()
285+
{
286+
my ($self,$node,@args) =@_;
287+
288+
my ($in,$out,$err,$rc);
289+
$in ='';
290+
$out ='';
291+
292+
my@pgbench_command = (
293+
'pgbench',
294+
@args,
295+
-h=>$self->{nodes}->[$node]->host(),
296+
-p=>$self->{nodes}->[$node]->port(),
297+
'postgres',
298+
);
299+
# diag("running pgbench init");
300+
my$handle = IPC::Run::start(\@pgbench_command,$in,$out);
301+
return$handle;
302+
}
303+
304+
subpgbench_await()
305+
{
306+
my ($self,$pgbench_handle) =@_;
307+
IPC::Run::finish($pgbench_handle) || BAIL_OUT("pgbench exited with$?");
308+
}
309+
275310
1;

‎contrib/mmts/arbiter.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,8 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
314314
}else {
315315
BIT_CLEAR(Mtm->currentLockNodeMask,resp->node-1);
316316
}
317-
if (BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)
317+
if (
318+
(BIT_CHECK(resp->disabledNodeMask,MtmNodeId-1)||Mtm->status==MTM_IN_MINORITY )
318319
&& !BIT_CHECK(Mtm->disabledNodeMask,resp->node-1)
319320
&&Mtm->status!=MTM_RECOVERY
320321
&&Mtm->status!=MTM_RECOVERED
@@ -714,6 +715,8 @@ static void MtmSender(Datum arg)
714715
intnNodes=MtmMaxNodes;
715716
inti;
716717

718+
MtmBackgroundWorker= true;
719+
717720
MtmBuffer*txBuffer= (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
718721
MTM_ELOG(LOG,"Start arbiter sender %d",MyProcPid);
719722
InitializeTimeouts();
@@ -801,6 +804,8 @@ static void MtmMonitor(Datum arg)
801804
pqsignal(SIGQUIT,SetStop);
802805
pqsignal(SIGTERM,SetStop);
803806

807+
MtmBackgroundWorker= true;
808+
804809
/* We're now ready to receive signals */
805810
BackgroundWorkerUnblockSignals();
806811

@@ -837,7 +842,9 @@ static void MtmReceiver(Datum arg)
837842
pqsignal(SIGINT,SetStop);
838843
pqsignal(SIGQUIT,SetStop);
839844
pqsignal(SIGTERM,SetStop);
840-
845+
846+
MtmBackgroundWorker= true;
847+
841848
/* We're now ready to receive signals */
842849
BackgroundWorkerUnblockSignals();
843850

@@ -1075,6 +1082,7 @@ static void MtmReceiver(Datum arg)
10751082
if (ts->status!=TRANSACTION_STATUS_ABORTED) {
10761083
MTM_LOG1("Arbiter receive abort message for transaction %s (%llu) from node %d",ts->gid, (long64)ts->xid,node);
10771084
Assert(ts->status==TRANSACTION_STATUS_IN_PROGRESS);
1085+
ts->aborted_by_node=node;
10781086
MtmAbortTransaction(ts);
10791087
}
10801088
if ((ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask)==0) {

‎contrib/mmts/bgwpool.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3636

3737
MTM_ELOG(LOG,"Start background worker %d, shutdown=%d",MyProcPid,pool->shutdown);
3838

39+
MtmBackgroundWorker= true;
3940
MtmIsLogicalReceiver= true;
4041
MtmPool=pool;
4142

‎contrib/mmts/multimaster--1.0.sql

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATEFUNCTIONmtm.stop_node(nodeinteger, drop_slot bool default false) RETURNS void
2224
AS'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATEFUNCTIONmtm.add_node(conn_strtext) RETURNS void
2629
AS'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previouslystopped
32+
-- Create replication slot for the node which was previouslystalled (its replicatoin slot was deleted)
3033
CREATEFUNCTIONmtm.recover_node(nodeinteger) RETURNS void
3134
AS'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATEFUNCTIONmtm.resume_node(nodeinteger) RETURNS void
40+
AS'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATEFUNCTIONmtm.get_snapshot() RETURNSbigint
3645
AS'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
75+
CREATEFUNCTIONmtm.get_cluster_state() RETURNSmtm.cluster_state
6776
AS'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
79+
CREATEFUNCTIONmtm.collect_cluster_info() RETURNS SETOFmtm.cluster_state
7180
AS'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -103,9 +112,9 @@ CREATE FUNCTION mtm.referee_poll(xid bigint) RETURNS bigint
103112
AS'MODULE_PATHNAME','mtm_referee_poll'
104113
LANGUAGE C;
105114

106-
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key(rel_schema, rel_name));
115+
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schemaname, rel_namename,primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
117+
CREATE OR REPLACEFUNCTIONmtm.alter_sequences() RETURNSbooleanAS
109118
$$
110119
DECLARE
111120
seq_class record;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp