Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit0a45dd2

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents6681d70 +1ae3274 commit0a45dd2

File tree

6 files changed

+237
-5
lines changed

6 files changed

+237
-5
lines changed

‎contrib/mmts/multimaster--1.0.sql‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4040
AS'MODULE_PATHNAME','mtm_make_table_local'
4141
LANGUAGE C;
4242

43+
CREATEFUNCTIONmtm.dump_lock_graph() RETURNStext
44+
AS'MODULE_PATHNAME','mtm_dump_lock_graph'
45+
LANGUAGE C;
46+
4347
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);
4448

4549
CREATETABLEIF NOT EXISTSmtm.local_tables(rel_schematext, rel_nametext,primary key(rel_schema, rel_name));

‎contrib/mmts/multimaster.c‎

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
109109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110110
PG_FUNCTION_INFO_V1(mtm_make_table_local);
111+
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
111112

112113
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
113114
staticvoidMtmInitialize(void);
@@ -877,7 +878,9 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
877878
}
878879
if (!TransactionIdIsValid(gtid->xid)) {
879880
/* In case of recovery InvalidTransactionId is passed */
880-
Assert(Mtm->status==MTM_RECOVERY);
881+
if (Mtm->status!=MTM_RECOVERY) {
882+
elog(PANIC,"Node %d tries to recover node %d which is in %s mode",MtmReplicationNode,MtmNodeId,MtmNodeStatusMnem[Mtm->status]);
883+
}
881884
}elseif (Mtm->status==MTM_RECOVERY) {
882885
/* When recovery is completed we get normal transaction ID and switch to normal mode */
883886
MtmRecoveryCompleted();
@@ -2154,6 +2157,31 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
21542157
return false;
21552158
}
21562159

2160+
Datummtm_dump_lock_graph(PG_FUNCTION_ARGS)
2161+
{
2162+
StringInfos=makeStringInfo();
2163+
inti;
2164+
for (i=0;i<MtmNodes;i++)
2165+
{
2166+
size_tsize;
2167+
char*data=RaftableGet(psprintf("lock-graph-%d",i+1),&size,NULL, true);
2168+
if (!data)continue;
2169+
GlobalTransactionId*gtid= (GlobalTransactionId*)data;
2170+
GlobalTransactionId*last= (GlobalTransactionId*)(data+size);
2171+
appendStringInfo(s,"node-%d lock graph: ",i+1);
2172+
while (gtid!=last) {
2173+
GlobalTransactionId*src=gtid++;
2174+
appendStringInfo(s,"%d:%d -> ",src->node,src->xid);
2175+
while (gtid->node!=0) {
2176+
GlobalTransactionId*dst=gtid++;
2177+
appendStringInfo(s,"%d:%d, ",dst->node,dst->xid);
2178+
}
2179+
gtid+=1;
2180+
}
2181+
appendStringInfo(s,"\n");
2182+
}
2183+
returnCStringGetTextDatum(s->data);
2184+
}
21572185

21582186
/*
21592187
* -------------------------------------------
@@ -2316,7 +2344,7 @@ static bool MtmProcessDDLCommand(char const* queryString)
23162344
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
23172345

23182346
if (rel==NULL) {
2319-
if (!IsTransactionBlock()) {
2347+
if (!MtmIsBroadcast()) {
23202348
MtmBroadcastUtilityStmt(queryString, false);
23212349
return true;
23222350
}

‎contrib/mmts/t/002_dtmbench.pl‎

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ sub allocate_ports
6060
listen_addresses = '$host'
6161
unix_socket_directories = ''
6262
port =$pgport
63-
max_prepared_transactions =10
63+
max_prepared_transactions =1000
6464
max_worker_processes = 10
6565
wal_level = logical
6666
fsync = off
@@ -103,13 +103,13 @@ sub allocate_ports
103103
push(@argv,'-n', 1000,'-a', 1000,'-w', 10,'-r', 1);
104104

105105
diag("running dtmbench -i");
106-
if (TestLib::run_log([@argv,'-i']))
106+
if (!TestLib::run_log([@argv,'-i']))
107107
{
108108
BAIL_OUT("dtmbench -i failed");
109109
}
110110

111111
diag("running dtmbench");
112-
if (TestLib::run_log(\@argv,'>', \$out))
112+
if (!TestLib::run_log(\@argv,'>', \$out))
113113
{
114114
fail("dtmbench failed");
115115
}

‎contrib/mmts/t/003_pgbench.pl‎

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use strict;
2+
use warnings;
3+
4+
use PostgresNode;
5+
use TestLib;
6+
use Test::Moretests=> 2;
7+
use IPC::Runqw(start finish);
8+
use Cwd;
9+
10+
my%allocated_ports = ();
11+
suballocate_ports
12+
{
13+
my@allocated_now = ();
14+
my ($host,$ports_to_alloc) =@_;
15+
16+
while ($ports_to_alloc > 0)
17+
{
18+
my$port =int(rand() * 16384) + 49152;
19+
nextif$allocated_ports{$port};
20+
diag("checking for port$port\n");
21+
if (!TestLib::run_log(['pg_isready','-h',$host,'-p',$port]))
22+
{
23+
$allocated_ports{$port} = 1;
24+
push(@allocated_now,$port);
25+
$ports_to_alloc--;
26+
}
27+
}
28+
29+
return@allocated_now;
30+
}
31+
32+
my$nnodes = 2;
33+
my@nodes = ();
34+
35+
diag("creating nodes");
36+
foreachmy$i (1..$nnodes)
37+
{
38+
my$host ="127.0.0.1";
39+
my ($pgport,$raftport) = allocate_ports($host, 2);
40+
my$node = new PostgresNode("node$i",$host,$pgport);
41+
$node->{id} =$i;
42+
$node->{raftport} =$raftport;
43+
push(@nodes,$node);
44+
}
45+
46+
my$mm_connstr =join(',',map {"${\$_->connstr('postgres') }" }@nodes);
47+
my$raft_peers =join(',',map {join(':',$_->{id},$_->host,$_->{raftport}) }@nodes);
48+
49+
diag("mm_connstr =$mm_connstr\n");
50+
diag("raft_peers =$raft_peers\n");
51+
52+
diag("initting and configuring nodes");
53+
foreachmy$node (@nodes)
54+
{
55+
my$id =$node->{id};
56+
my$host =$node->host;
57+
my$pgport =$node->port;
58+
my$raftport =$node->{raftport};
59+
60+
$node->init(hba_permit_replication=> 0);
61+
$node->append_conf("postgresql.conf",qq(
62+
listen_addresses = '$host'
63+
unix_socket_directories = ''
64+
port =$pgport
65+
max_prepared_transactions = 1000
66+
max_worker_processes = 10
67+
wal_level = logical
68+
fsync = off
69+
max_wal_senders = 10
70+
wal_sender_timeout = 0
71+
max_replication_slots = 10
72+
shared_preload_libraries = 'raftable,multimaster'
73+
multimaster.workers = 4
74+
multimaster.queue_size = 10485760 # 10mb
75+
multimaster.node_id =$id
76+
multimaster.conn_strings = '$mm_connstr'
77+
multimaster.use_raftable = true
78+
multimaster.ignore_tables_without_pk = true
79+
raftable.id =$id
80+
raftable.peers = '$raft_peers'
81+
));
82+
83+
$node->append_conf("pg_hba.conf",qq(
84+
local replication all trust
85+
host replication all 127.0.0.1/32 trust
86+
host replication all ::1/128 trust
87+
));
88+
}
89+
90+
diag("starting nodes");
91+
foreachmy$node (@nodes)
92+
{
93+
$node->start();
94+
}
95+
96+
my ($rc,$out,$err);
97+
98+
diag("sleeping 10");
99+
sleep(10);
100+
101+
diag("preparing the tables");
102+
if ($nodes[0]->psql('postgres',"create table t (k int primary key, v int)"))
103+
{
104+
BAIL_OUT('failed to create t');
105+
}
106+
107+
if ($nodes[0]->psql('postgres',"insert into t (select generate_series(0, 999), 0)"))
108+
{
109+
BAIL_OUT('failed to fill t');
110+
}
111+
112+
if ($nodes[0]->psql('postgres',"create table reader_log (v int)"))
113+
{
114+
BAIL_OUT('failed to create reader_log');
115+
}
116+
117+
subreader
118+
{
119+
my ($node,$inref,$outref) =@_;
120+
121+
my$clients = 1;
122+
my$jobs = 1;
123+
my$seconds = 30;
124+
my$tps = 10;
125+
my@argv = (
126+
'pgbench',
127+
'-n',
128+
-c=>$clients,
129+
-j=>$jobs,
130+
-T=>$seconds,
131+
-h=>$node->host(),
132+
-p=>$node->port(),
133+
-f=>'tests/writer.pgb',
134+
-R=>$tps,
135+
'postgres',
136+
);
137+
138+
diag("running[" . getcwd() ."]:" .join('',@argv));
139+
140+
return start(\@argv,$inref,$outref);
141+
}
142+
143+
subwriter
144+
{
145+
my ($node,$inref,$outref) =@_;
146+
147+
my$clients = 10;
148+
my$jobs = 10;
149+
my$seconds = 30;
150+
my@argv = (
151+
'pgbench',
152+
'-n',
153+
-c=>$clients,
154+
-j=>$jobs,
155+
-T=>$seconds,
156+
-h=>$node->host(),
157+
-p=>$node->port(),
158+
-f=>'tests/reader.pgb',
159+
'postgres',
160+
);
161+
162+
diag("running[" . getcwd() ."]:" .join('',@argv));
163+
164+
return start(\@argv,$inref,$outref);
165+
}
166+
167+
diag("starting benches");
168+
my$in ='';
169+
my$out ='';
170+
my@benches = ();
171+
foreachmy$node (@nodes)
172+
{
173+
push(@benches, writer($node, \$in, \$out));
174+
push(@benches, reader($node, \$in, \$out));
175+
}
176+
177+
diag("finishing benches");
178+
foreachmy$bench (@benches)
179+
{
180+
finish($bench) || BAIL_OUT("pgbench exited with$?");
181+
}
182+
diag($out);
183+
184+
diag("checking readers' logs");
185+
186+
($rc,$out,$err) =$nodes[0]->psql('postgres',"select count(*) from reader_log where v != 0;");
187+
is($out, 0,"there is nothing except zeros in reader_log");
188+
189+
($rc,$out,$err) =$nodes[0]->psql('postgres',"select count(*) from reader_log where v = 0;");
190+
isnt($out, 0,"there are some zeros in reader_log");

‎contrib/mmts/tests/reader.pgb‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
begin;
2+
insert into reader_log select sum(v) from t;
3+
commit;

‎contrib/mmts/tests/writer.pgb‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
\setrandom src 0 999
2+
\setrandom dst 0 999
3+
\setrandom amount 1 10
4+
begin;
5+
update t set v = v - :amount where k=:src;
6+
update t set v = v + :amount where k=:dst;
7+
commit;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp