Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit74ae6ab

Browse files
committed
Merge branch 'more_tests' of github.com:postgrespro/postgres_cluster into more_tests
2 parentsbf32a65 +bcbc076 commit74ae6ab

File tree

8 files changed

+140
-112
lines changed

8 files changed

+140
-112
lines changed

‎contrib/mmts/Cluster.pm‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ sub stop
233233
}
234234
}
235235
}
236-
236+
sleep(2);
237237
return$ok;
238238
}
239239

‎contrib/mmts/bgwpool.c‎

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include"postmaster/bgworker.h"
66
#include"storage/s_lock.h"
77
#include"storage/spin.h"
8+
#include"storage/proc.h"
89
#include"storage/pg_sema.h"
910
#include"storage/shmem.h"
1011
#include"datatype/timestamp.h"
@@ -16,23 +17,41 @@
1617
boolMtmIsLogicalReceiver;
1718
intMtmMaxWorkers;
1819

20+
staticBgwPool*pool;
21+
22+
staticvoidBgwShutdownWorker(intsig)
23+
{
24+
BgwPoolStop(pool);
25+
}
26+
1927
staticvoidBgwPoolMainLoop(BgwPool*pool)
2028
{
2129
intsize;
2230
void*work;
2331
staticPortalDatafakePortal;
32+
sigset_tsset;
2433

2534
MtmIsLogicalReceiver= true;
2635

36+
signal(SIGINT,BgwShutdownWorker);
37+
signal(SIGQUIT,BgwShutdownWorker);
38+
signal(SIGTERM,BgwShutdownWorker);
39+
40+
sigfillset(&sset);
41+
sigprocmask(SIG_UNBLOCK,&sset,NULL);
42+
2743
BackgroundWorkerUnblockSignals();
2844
BackgroundWorkerInitializeConnection(pool->dbname,pool->dbuser);
2945
ActivePortal=&fakePortal;
3046
ActivePortal->status=PORTAL_ACTIVE;
3147
ActivePortal->sourceText="";
3248

33-
while(true) {
49+
while(true) {
3450
PGSemaphoreLock(&pool->available);
3551
SpinLockAcquire(&pool->lock);
52+
if (pool->shutdown) {
53+
break;
54+
}
3655
size=*(int*)&pool->queue[pool->head];
3756
Assert(size<pool->size);
3857
work=malloc(size);
@@ -64,6 +83,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
6483
pool->lastPeakTime=0;
6584
SpinLockRelease(&pool->lock);
6685
}
86+
SpinLockRelease(&pool->lock);
6787
}
6888

6989
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,charconst*dbuser,size_tqueueSize,size_tnWorkers)
@@ -75,6 +95,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
7595
PGSemaphoreReset(&pool->available);
7696
PGSemaphoreReset(&pool->overflow);
7797
SpinLockInit(&pool->lock);
98+
pool->shutdown= false;
7899
pool->producerBlocked= false;
79100
pool->head=0;
80101
pool->tail=0;
@@ -167,7 +188,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
167188
}
168189

169190
SpinLockAcquire(&pool->lock);
170-
while (true) {
191+
while (!pool->shutdown) {
171192
if ((pool->head <=pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
172193
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
173194
{
@@ -204,3 +225,11 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
204225
SpinLockRelease(&pool->lock);
205226
}
206227

228+
voidBgwPoolStop(BgwPool*pool)
229+
{
230+
SpinLockAcquire(&pool->lock);
231+
pool->shutdown= true;
232+
SpinLockRelease(&pool->lock);
233+
PGSemaphoreUnlock(&pool->available);
234+
PGSemaphoreUnlock(&pool->overflow);
235+
}

‎contrib/mmts/bgwpool.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ typedef struct
3434
time_tlastPeakTime;
3535
timestamp_tlastDynamicWorkerStartTime;
3636
boolproducerBlocked;
37+
boolshutdown;
3738
chardbname[MAX_DBNAME_LEN];
3839
chardbuser[MAX_DBUSER_LEN];
3940
char*queue;
@@ -51,4 +52,5 @@ extern size_t BgwPoolGetQueueSize(BgwPool* pool);
5152

5253
externtimestamp_tBgwGetLastPeekTime(BgwPool*pool);
5354

55+
externvoidBgwPoolStop(BgwPool*pool);
5456
#endif

‎contrib/mmts/multimaster.c‎

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,9 @@ MtmAdjustOldestXid(TransactionId xid)
583583

584584
for (ts=Mtm->transListHead;
585585
ts!=NULL
586+
&& (ts->status==TRANSACTION_STATUS_ABORTED||ts->status==TRANSACTION_STATUS_COMMITTED)
586587
&&ts->csn<oldestSnapshot
588+
&& !ts->isPinned
587589
&&TransactionIdPrecedes(ts->xid,xid);
588590
prev=ts,ts=ts->next)
589591
{
@@ -653,6 +655,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
653655
sts= (MtmTransState*)hash_search(MtmXid2State,&subxids[i],HASH_ENTER,&found);
654656
Assert(!found);
655657
sts->isActive= false;
658+
sts->isPinned= false;
656659
sts->status=ts->status;
657660
sts->csn=ts->csn;
658661
sts->votingCompleted= true;
@@ -814,6 +817,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
814817
ts->isLocal= true;
815818
ts->isPrepared= false;
816819
ts->isTwoPhase=x->isTwoPhase;
820+
ts->isPinned= false;
817821
ts->votingCompleted= false;
818822
if (!found) {
819823
ts->isEnqueued= false;
@@ -963,8 +967,13 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
963967
{
964968
intresult=0;
965969
intnConfigChanges=Mtm->nConfigChanges;
966-
timestamp_telapsed,start=MtmGetSystemTime();
967-
timestamp_tdeadline=0;
970+
timestamp_tprepareTime=ts->csn-ts->snapshot;
971+
timestamp_ttimeout=Max(prepareTime+MSEC_TO_USEC(MtmMin2PCTimeout),prepareTime*MtmMax2PCRatio/100);
972+
timestamp_tdeadline=MtmGetSystemTime()+timeout;
973+
timestamp_tnow;
974+
975+
Assert(ts->csn>ts->snapshot);
976+
968977
/* Wait votes from all nodes until: */
969978
while (!MtmVotingCompleted(ts)
970979
&& (ts->isPrepared||nConfigChanges==Mtm->nConfigChanges))
@@ -980,19 +989,16 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
980989
if (result&WL_LATCH_SET) {
981990
ResetLatch(&MyProc->procLatch);
982991
}
983-
elapsed=MtmGetSystemTime()-start;
992+
now=MtmGetSystemTime();
984993
MtmLock(LW_EXCLUSIVE);
985-
if (deadline==0&&ts->votedMask!=0) {
986-
deadline=Max(MSEC_TO_USEC(MtmMin2PCTimeout),elapsed*MtmMax2PCRatio/100);
987-
}else {
994+
if (now>deadline) {
988995
if (ts->isPrepared) {
989996
/* resend precommit message */
990997
MtmSend2PCMessage(ts,MSG_PRECOMMIT);
991998
}else {
992-
if (elapsed>deadline) {
993-
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(elapsed));
994-
MtmAbortTransaction(ts);
995-
}
999+
elog(WARNING,"Commit of distributed transaction is canceled because of %ld msec timeout expiration",USEC_TO_MSEC(timeout));
1000+
MtmAbortTransaction(ts);
1001+
break;
9961002
}
9971003
}
9981004
}
@@ -1005,7 +1011,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10051011
}else {
10061012
if (Mtm->status!=MTM_ONLINE) {
10071013
elog(WARNING,"Commit of distributed transaction is canceled because node is switched to %s mode",MtmNodeStatusMnem[Mtm->status]);
1008-
}elseif (nConfigChanges!=Mtm->nConfigChanges){
1014+
}else {
10091015
elog(WARNING,"Commit of distributed transaction is canceled because cluster configuration was changed");
10101016
}
10111017
MtmAbortTransaction(ts);
@@ -1202,6 +1208,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021208
ts->status=TRANSACTION_STATUS_ABORTED;
12031209
ts->isLocal= true;
12041210
ts->isPrepared= false;
1211+
ts->isPinned= false;
12051212
ts->snapshot=x->snapshot;
12061213
ts->isTwoPhase=x->isTwoPhase;
12071214
ts->csn=MtmAssignCSN();
@@ -1280,7 +1287,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12801287
}
12811288
}
12821289

1283-
voidMtmBroadcastPollMessage(MtmTransState*ts)
1290+
staticvoidMtmBroadcastPollMessage(MtmTransState*ts)
12841291
{
12851292
inti;
12861293
MtmArbiterMessagemsg;
@@ -1293,7 +1300,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
12931300

12941301
for (i=0;i<Mtm->nAllNodes;i++)
12951302
{
1296-
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask& ~ts->votedMask,i))
1303+
if (BIT_CHECK(ts->participantsMask& ~Mtm->disabledNodeMask,i))
12971304
{
12981305
msg.node=i+1;
12991306
MTM_LOG3("Send request for transaction %s to node %d",msg.gid,msg.node);
@@ -1480,15 +1487,17 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14801487
Assert(ts->gid[0]);
14811488
if (ts->status==TRANSACTION_STATUS_IN_PROGRESS) {
14821489
elog(LOG,"Abort transaction %s because its coordinator is disabled and it is not prepared at node %d",ts->gid,MtmNodeId);
1483-
//MtmUnlock();
1490+
ts->isPinned= true;
1491+
MtmUnlock();
14841492
MtmFinishPreparedTransaction(ts, false);
1485-
//MtmLock(LW_EXCLUSIVE);
1493+
MtmLock(LW_EXCLUSIVE);
1494+
ts->isPinned= false;
14861495
}else {
14871496
MTM_LOG1("Poll state of transaction %d (%s)",ts->xid,ts->gid);
14881497
MtmBroadcastPollMessage(ts);
14891498
}
14901499
}else {
1491-
MTM_LOG2("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
1500+
MTM_LOG1("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
14921501
ts->xid,ts->gid,ts->status,ts->gtid.node,ts->gtid.xid,ts->votedMask);
14931502
}
14941503
}
@@ -3216,8 +3225,13 @@ bool MtmFilterTransaction(char* record, int size)
32163225
duplicate= true;
32173226
}
32183227

3219-
MTM_LOG2("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3220-
duplicate ?"Ignore" :"Apply",gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
3228+
if (duplicate) {
3229+
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3230+
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
3231+
}else {
3232+
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3233+
gid,replication_node,end_lsn,flags,origin_node,origin_lsn,restart_lsn);
3234+
}
32213235
returnduplicate;
32223236
}
32233237

@@ -3831,7 +3845,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38313845
}else {
38323846
CommitTransactionCommand();
38333847
if (x->isSuspended) {
3834-
elog(WARNING,"Transaction %s is left in prepared state because coordinatoronde is not online",x->gid);
3848+
elog(WARNING,"Transaction %s is left in prepared state because coordinatornode is not online",x->gid);
38353849
}else {
38363850
StartTransactionCommand();
38373851
if (x->status==TRANSACTION_STATUS_ABORTED) {

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ typedef struct MtmTransState
223223
boolisEnqueued;/* Transaction is inserted in queue */
224224
boolisPrepared;/* Transaction is prepared: now it is safe to commit transaction */
225225
boolisActive;/* Transaction is active */
226-
boolisTwoPhase;/* user level 2PC */
226+
boolisTwoPhase;/* User level 2PC */
227+
boolisPinned;/* Transaction oid potected from GC */
227228
nodemask_tparticipantsMask;/* Mask of nodes involved in transaction */
228229
nodemask_tvotedMask;/* Mask of voted nodes */
229230
TransactionIdxids[1];/* [Mtm->nAllNodes]: transaction ID at replicas */
@@ -331,7 +332,6 @@ extern void MtmExecutor(void* work, size_t size);
331332
externvoidMtmSend2PCMessage(MtmTransState*ts,MtmMessageCodecmd);
332333
externvoidMtmSendMessage(MtmArbiterMessage*msg);
333334
externvoidMtmAdjustSubtransactions(MtmTransState*ts);
334-
externvoidMtmBroadcastPollMessage(MtmTransState*ts);
335335
externvoidMtmLock(LWLockModemode);
336336
externvoidMtmUnlock(void);
337337
externvoidMtmLockNode(intnodeId,LWLockModemode);

‎contrib/mmts/tests2/lib/bank_client.py‎

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,24 @@ def status(self):
111111
whileself.running:
112112
msg=yieldfromself.child_pipe.coro_recv()
113113
ifmsg=='status':
114-
# print('evloop: got status request')
115-
serialized_aggs= {}
116-
forname,aggregateinself.aggregates.items():
117-
serialized_aggs[name]=aggregate.as_dict()
118-
aggregate.clear_values()
114+
serialized_aggs= []
115+
116+
forconn_id,conn_aggsinself.aggregates.items():
117+
serialized_aggs.append({})
118+
foraggname,agginconn_aggs.items():
119+
serialized_aggs[conn_id][aggname]=agg.as_dict()
120+
agg.clear_values()
121+
119122
self.child_pipe.send(serialized_aggs)
120-
# print('evloop: sent status response')
121123
else:
122124
print('evloop: unknown message')
123125

124126
@asyncio.coroutine
125127
defexec_tx(self,tx_block,aggname_prefix,conn_i):
126128
aggname="%s_%i"% (aggname_prefix,conn_i)
127-
agg=self.aggregates[aggname]=MtmTxAggregate(aggname)
129+
ifconn_inotinself.aggregates:
130+
self.aggregates[conn_i]= {}
131+
agg=self.aggregates[conn_i][aggname_prefix]=MtmTxAggregate(aggname)
128132
pool=yieldfromaiopg.create_pool(self.dsns[conn_i])
129133
conn=yieldfrompool.acquire()
130134
cur=yieldfromconn.cursor()
@@ -167,8 +171,8 @@ def total_tx(self, conn, cur, agg):
167171
total=yieldfromcur.fetchone()
168172
iftotal[0]!=0:
169173
agg.isolation+=1
170-
#print(self.oops)
171-
#print('Isolation error, total = ', total[0])
174+
print(self.oops)
175+
print('Isolation error, total = ',total[0])
172176
# yield from cur.execute('select * from mtm.get_nodes_state()')
173177
# nodes_state = yield from cur.fetchall()
174178
# for i, col in enumerate(self.nodes_state_fields):
@@ -177,7 +181,6 @@ def total_tx(self, conn, cur, agg):
177181
# print("%19s" % nodes_state[j][i], end="\t")
178182
# print("\n")
179183

180-
181184
defrun(self):
182185
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
183186
self.loop=asyncio.get_event_loop()
@@ -196,13 +199,9 @@ def bgrun(self):
196199
self.evloop_process=multiprocessing.Process(target=self.run,args=())
197200
self.evloop_process.start()
198201

199-
# XXX: introduce periodic report from client?
200202
defget_aggregates(self,print=True):
201-
# print('test: sending status request')
202203
self.parent_pipe.send('status')
203-
# print('test: awaitng status response')
204204
resp=self.parent_pipe.recv()
205-
# print('test: got status response')
206205
ifprint:
207206
MtmClient.print_aggregates(resp)
208207
returnresp
@@ -216,7 +215,7 @@ def stop(self):
216215
self.evloop_process.terminate()
217216

218217
@classmethod
219-
defprint_aggregates(cls,serialized_agg):
218+
defprint_aggregates(cls,aggs):
220219
columns= ['running_latency','max_latency','isolation','finish']
221220

222221
# print table header
@@ -225,23 +224,17 @@ def print_aggregates(cls, serialized_agg):
225224
print(col,end="\t")
226225
print("\n",end="")
227226

228-
serialized_agg
229-
230-
foraggnameinsorted(serialized_agg.keys()):
231-
agg=serialized_agg[aggname]
232-
print("%s\t"%aggname,end="")
233-
forcolincolumns:
234-
ifcolinagg:
227+
forconn_id,agg_conninenumerate(aggs):
228+
foraggname,agginagg_conn.items():
229+
print("Node %d: %s\t"% (conn_id+1,aggname),end="")
230+
forcolincolumns:
235231
ifisinstance(agg[col],float):
236232
print("%.2f\t"% (agg[col],),end="\t")
237233
else:
238234
print(agg[col],end="\t")
239-
else:
240-
print("-\t",end="")
241-
print("")
235+
print("")
242236
print("")
243237

244-
245238
if__name__=="__main__":
246239
c=MtmClient(['dbname=postgres user=postgres host=127.0.0.1',
247240
'dbname=postgres user=postgres host=127.0.0.1 port=5433',

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp