Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf4c9a73

Browse files
committed
2 parents09cf6e4 +71cf250 commitf4c9a73

File tree

5 files changed

+116
-71
lines changed

5 files changed

+116
-71
lines changed

‎contrib/mmts/arbiter.c‎

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ MtmResolveHostByName(const char *hostname, unsigned* addrs, unsigned* n_addrs)
164164
return1;
165165
}
166166

167+
staticintstop=0;
168+
staticvoidSetStop(intsig)
169+
{
170+
stop=1;
171+
}
172+
167173
#ifUSE_EPOLL
168174
staticintepollfd;
169175
#else
@@ -530,17 +536,24 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
530536

531537
staticvoidMtmTransSender(Datumarg)
532538
{
539+
sigset_tsset;
533540
intnNodes=MtmNodes;
534541
inti;
535542
MtmBuffer*txBuffer= (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
536-
543+
544+
signal(SIGINT,SetStop);
545+
signal(SIGQUIT,SetStop);
546+
signal(SIGTERM,SetStop);
547+
sigfillset(&sset);
548+
sigprocmask(SIG_UNBLOCK,&sset,NULL);
549+
537550
MtmOpenConnections();
538551

539552
for (i=0;i<nNodes;i++) {
540553
txBuffer[i].used=0;
541554
}
542555

543-
while (true) {
556+
while (!stop) {
544557
MtmTransState*ts;
545558
PGSemaphoreLock(&Mtm->votingSemaphore);
546559
CHECK_FOR_INTERRUPTS();
@@ -605,6 +618,7 @@ static bool MtmRecovery()
605618

606619
staticvoidMtmTransReceiver(Datumarg)
607620
{
621+
sigset_tsset;
608622
intnNodes=MtmNodes;
609623
intnResponses;
610624
inti,j,n,rc;
@@ -617,14 +631,20 @@ static void MtmTransReceiver(Datum arg)
617631
FD_ZERO(&inset);
618632
max_fd=0;
619633
#endif
634+
635+
signal(SIGINT,SetStop);
636+
signal(SIGQUIT,SetStop);
637+
signal(SIGTERM,SetStop);
638+
sigfillset(&sset);
639+
sigprocmask(SIG_UNBLOCK,&sset,NULL);
620640

621641
MtmAcceptIncomingConnections();
622642

623643
for (i=0;i<nNodes;i++) {
624644
rxBuffer[i].used=0;
625645
}
626646

627-
while (true) {
647+
while (!stop) {
628648
#ifUSE_EPOLL
629649
n=epoll_wait(epollfd,events,nNodes,MtmKeepaliveTimeout/1000);
630650
if (n<0) {

‎contrib/mmts/t/000_deadlock.pl‎

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use PostgresNode;
55
use TestLib;
6-
use Test::Moretests=>2;
6+
use Test::Moretests=>1;
77

88
use DBI;
99
use DBD::Pg':async';
@@ -14,23 +14,23 @@ sub query_row
1414
my$sth =$dbi->prepare($sql) ||die;
1515
$sth->execute(@keys) ||die;
1616
my$ret =$sth->fetchrow_array ||undef;
17-
print"query_row('$sql') ->$ret\n";
17+
diag("query_row('$sql') ->$ret\n");
1818
return$ret;
1919
}
2020

2121
subquery_exec
2222
{
2323
my ($dbi,$sql) =@_;
2424
my$rv =$dbi->do($sql) ||die;
25-
print"query_exec('$sql')\n";
25+
diag("query_exec('$sql') =$rv\n");
2626
return$rv;
2727
}
2828

2929
subquery_exec_async
3030
{
3131
my ($dbi,$sql) =@_;
3232
my$rv =$dbi->do($sql, {pg_async=> PG_ASYNC}) ||die;
33-
print"query_exec('$sql')\n";
33+
diag("query_exec_async('$sql')\n");
3434
return$rv;
3535
}
3636

@@ -44,7 +44,7 @@ sub allocate_ports
4444
{
4545
my$port =int(rand() * 16384) + 49152;
4646
nextif$allocated_ports{$port};
47-
print"#Checking for port$port\n";
47+
diag("Checking for port$port\n");
4848
if (!TestLib::run_log(['pg_isready','-h',$host,'-p',$port]))
4949
{
5050
$allocated_ports{$port} = 1;
@@ -73,8 +73,8 @@ sub allocate_ports
7373
my$mm_connstr =join(',',map {"${\$_->connstr('postgres') }" }@nodes);
7474
my$raft_peers =join(',',map {join(':',$_->{id},$_->host,$_->{raftport}) }@nodes);
7575

76-
print("#mm_connstr =$mm_connstr\n");
77-
print("#raft_peers =$raft_peers\n");
76+
diag("mm_connstr =$mm_connstr\n");
77+
diag("raft_peers =$raft_peers\n");
7878

7979
# Init and Configure
8080
foreachmy$node (@nodes)
@@ -89,22 +89,19 @@ sub allocate_ports
8989
listen_addresses = '$host'
9090
unix_socket_directories = ''
9191
port =$pgport
92-
max_connections = 200
93-
shared_buffers = 1GB
94-
max_prepared_transactions = 200
95-
max_worker_processes = 100
92+
max_prepared_transactions = 10
93+
max_worker_processes = 10
9694
wal_level = logical
9795
fsync = off
98-
max_wal_size = 100GB
99-
min_wal_size = 1GB
10096
max_wal_senders = 10
10197
wal_sender_timeout = 0
10298
max_replication_slots = 10
10399
shared_preload_libraries = 'raftable,multimaster'
104-
multimaster.workers =8
105-
multimaster.queue_size =104857600 #100mb
100+
multimaster.workers =4
101+
multimaster.queue_size =10485760 #10mb
106102
multimaster.node_id =$id
107103
multimaster.conn_strings = '$mm_connstr'
104+
multimaster.use_raftable = true
108105
raftable.id =$id
109106
raftable.peers = '$raft_peers'
110107
));
@@ -122,27 +119,55 @@ sub allocate_ports
122119
$node->start();
123120
}
124121

125-
$nodes[0]->psql("create table t(k int primary key, v text)");
126-
$nodes[0]->psql("insert into t values (1, 'hello'), (2, 'world')");
127-
128-
#my @conns = map { DBI->connect('DBI:Pg:' . $_->connstr()) } @nodes;
129-
#
130-
#query_exec($conns[0], "begin");
131-
#query_exec($conns[1], "begin");
132-
#
133-
#query_exec($conns[0], "update t set v = 'foo' where k = 1");
134-
#query_exec($conns[1], "update t set v = 'bar' where k = 2");
135-
#
136-
#query_exec($conns[0], "update t set v = 'bar' where k = 2");
137-
#query_exec($conns[1], "update t set v = 'foo' where k = 1");
138-
#
139-
#query_exec_async($conns[0], "commit");
140-
#query_exec_async($conns[1], "commit");
141-
#
142-
#my $ready = 0;
143-
#$ready++ if $conns[0]->pg_ready;
144-
#$ready++ if $conns[1]->pg_ready;
145-
#
146-
#is($ready, 1, "one of the connections is deadlocked");
147-
#
148-
#sleep(2);
122+
my ($rc,$out,$err);
123+
sleep(10);
124+
125+
$nodes[0]->psql('postgres',"create table t(k int primary key, v text)");
126+
$nodes[0]->psql('postgres',"insert into t values (1, 'hello'), (2, 'world')");
127+
128+
my@conns =map { DBI->connect('DBI:Pg:' .$_->connstr()) }@nodes;
129+
130+
query_exec($conns[0],"begin");
131+
query_exec($conns[1],"begin");
132+
133+
query_exec($conns[0],"update t set v = 'asd' where k = 1");
134+
query_exec($conns[1],"update t set v = 'bsd'");
135+
136+
query_exec($conns[0],"update t set v = 'bar' where k = 2");
137+
query_exec($conns[1],"update t set v = 'foo'");
138+
139+
query_exec_async($conns[0],"commit");
140+
query_exec_async($conns[1],"commit");
141+
142+
my$timeout = 5;
143+
while ($timeout > 0)
144+
{
145+
my$r0 =$conns[0]->pg_ready();
146+
my$r1 =$conns[1]->pg_ready();
147+
if ($r0 &&$r1) {
148+
last;
149+
}
150+
diag("queries still running: [0]=$r0 [1]=$r1\n");
151+
sleep(1);
152+
}
153+
154+
if ($timeout > 0)
155+
{
156+
diag("queries finished\n");
157+
158+
my$succeeded = 0;
159+
$succeeded++if$conns[0]->pg_result();
160+
$succeeded++if$conns[1]->pg_result();
161+
162+
pass("queries finished");
163+
}
164+
else
165+
{
166+
diag("queries timed out\n");
167+
$conns[0]->pg_cancel()unless$conns[0]->pg_ready();
168+
$conns[1]->pg_cancel()unless$conns[1]->pg_ready();
169+
170+
fail("queries timed out");
171+
}
172+
173+
query_row($conns[0],"select * from t where k = 1");

‎contrib/mmts/t/001_basic_recovery.pl‎

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
use warnings;
33
use PostgresNode;
44
use TestLib;
5-
use Test::Moretests=>2;
5+
use Test::Moretests=>3;
66
use DBI;
77
use DBD::Pg':async';
88

@@ -54,6 +54,7 @@ sub PostgresNode::inet_connstr {
5454
for (my$i=0;$i <$nnodes;$i++) {
5555
$nodes[$i]->append_conf('postgresql.conf',$pgconf_common);
5656
$nodes[$i]->append_conf('postgresql.conf',qq(
57+
#port = ${\$nodes[$i]->port }
5758
multimaster.node_id = @{[$i + 1 ]}
5859
multimaster.conn_strings = '$mm_connstr'
5960
#multimaster.arbiter_port = ${\$nodes[0]->port }
@@ -69,8 +70,8 @@ sub PostgresNode::inet_connstr {
6970
###############################################################################
7071

7172
my$psql_out;
72-
# XXX:change topoll_untill
73-
sleep(7);
73+
# XXX:create extension on start andpoll_untill status is Online
74+
sleep(5);
7475

7576
###############################################################################
7677
# Replication check
@@ -79,11 +80,9 @@ sub PostgresNode::inet_connstr {
7980
$nodes[0]->psql('postgres',"
8081
create extension multimaster;
8182
create table if not exists t(k int primary key, v int);
82-
insert into t values(1, 10);
83-
");
84-
83+
insert into t values(1, 10);");
8584
$nodes[1]->psql('postgres',"select v from t where k=1;",stdout=> \$psql_out);
86-
is($psql_out,'10',"Checksanity while all nodes are up.");
85+
is($psql_out,'10',"Checkreplication while all nodes are up.");
8786

8887
###############################################################################
8988
# Isolation regress checks
@@ -97,30 +96,27 @@ sub PostgresNode::inet_connstr {
9796

9897
$nodes[2]->teardown_node;
9998

100-
# $nodes[0]->poll_query_until('postgres',
101-
# "select disconnected = true from mtm.get_nodes_state() where id=3;")
102-
# or die "Timed out while waiting for node to disconnect";
103-
104-
$nodes[0]->psql('postgres',"
105-
insert into t values(2, 20);
106-
");
99+
diag("sleeping");
100+
sleep(15);
107101

102+
diag("inserting 2");
103+
$nodes[0]->psql('postgres',"insert into t values(2, 20);");
104+
diag("selecting");
108105
$nodes[1]->psql('postgres',"select v from t where k=2;",stdout=> \$psql_out);
109-
is($psql_out,'20',"Check that we can commit after one node disconnect.");
110-
111-
112-
113-
114-
115-
116-
117-
118-
119-
120-
121-
122-
106+
diag("selected");
107+
is($psql_out,'20',"Check replication after node failure.");
123108

109+
###############################################################################
110+
# Work after node start
111+
###############################################################################
124112

113+
$nodes[2]->start;
114+
sleep(15);# XXX: here we can poll
115+
diag("inserting 3");
116+
$nodes[0]->psql('postgres',"insert into t values(3, 30);");
117+
diag("selecting");
118+
$nodes[2]->psql('postgres',"select v from t where k=3;",stdout=> \$psql_out);
119+
diag("selected");
120+
is($psql_out,'30',"Check replication after failed node recovery.");
125121

126122

‎contrib/raftable/raft/src/raft.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
812812
raft_entry_t*e=&r->log.newentry;
813813
raft_update_t*u=&e->update;
814814

815-
if (!m->snapshot&& !raft_appendable(r,m->previndex,m->prevterm)) gotofinish;
815+
if (!m->empty&& !m->snapshot&& !raft_appendable(r,m->previndex,m->prevterm)) gotofinish;
816816

817817
if (reply.progress.entries>0) {
818818
reply.term=RAFT_LOG(r,reply.progress.entries-1).term;

‎contrib/raftable/worker.c‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,12 @@ void parse_peers(HostPort *peers, char *peerstr)
457457
char*host;
458458
intid,port;
459459
inti;
460+
peerstr=pstrdup(peerstr);
460461

461462
for (i=0;i<RAFTABLE_PEERS_MAX;i++)
462463
peers[i].up= false;
463464

465+
464466
fprintf(stderr,"parsing '%s'\n",peerstr);
465467
peer=strtok_r(peerstr,",",&state);
466468
while (peer)
@@ -488,4 +490,6 @@ void parse_peers(HostPort *peers, char *peerstr)
488490

489491
peer=strtok_r(NULL,",",&state);
490492
}
493+
494+
pfree(peerstr);
491495
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp