Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit332d916

Browse files
knizhnikkelvich
authored andcommitted
Fix of automatic resolve of node_id
1 parenta939b57 commit332d916

File tree

2 files changed

+84
-45
lines changed

2 files changed

+84
-45
lines changed

‎multimaster.c

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ void MtmReleaseLocks(void)
318318
* locks[N+1..2*N] are used to synchronize access to distributed lock graph at each node
319319
* -------------------------------------------
320320
*/
321+
statictimestamp_tMtmLockLastReportTime;
322+
statictimestamp_tMtmLockElapsedWaitTime;
323+
statictimestamp_tMtmLockMaxWaitTime;
324+
staticsize_tMtmLockHitCount;
325+
326+
#defineDEBUG_MTM_LOCK 1
327+
321328
voidMtmLock(LWLockModemode)
322329
{
323330
if (!MtmAtExitHookRegistered) {
@@ -330,18 +337,28 @@ void MtmLock(LWLockMode mode)
330337
}
331338
else
332339
{
333-
#ifDEBUG_LEVEL>1
340+
#ifDEBUG_MTM_LOCK
334341
timestamp_tstart,stop;
335342
start=MtmGetSystemTime();
336343
#endif
337344
if (MyProc==NULL) {/* Can not wait if have no PGPROC. It can happen at process exit. TODO: without lock we can get race condition and corrupt Mtm state */
338345
return;
339346
}
340347
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID],mode);
341-
#ifDEBUG_LEVEL>1
348+
#ifDEBUG_MTM_LOCK
342349
stop=MtmGetSystemTime();
343-
if (stop>start+MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
344-
MTM_LOG1("%d: obtaining %s lock takes %lld microseconds",MyProcPid, (mode==LW_EXCLUSIVE ?"exclusive" :"shared"),stop-start);
350+
MtmLockElapsedWaitTime+=stop-start;
351+
if (stop-start>MtmLockMaxWaitTime) {
352+
MtmLockMaxWaitTime=stop-start;
353+
}
354+
MtmLockHitCount+=1;
355+
if (stop-MtmLockLastReportTime>USECS_PER_SEC) {
356+
MTM_LOG1("%d: average lock wait time %lld usec, maximal lock wait time: %lld usec",
357+
MyProcPid,MtmLockElapsedWaitTime/MtmLockHitCount,MtmLockMaxWaitTime);
358+
MtmLockLastReportTime=stop;
359+
MtmLockMaxWaitTime=0;
360+
MtmLockElapsedWaitTime=0;
361+
MtmLockHitCount=0;
345362
}
346363
#endif
347364
if (mode==LW_EXCLUSIVE) {
@@ -2869,7 +2886,7 @@ static void MtmSplitConnStrs(void)
28692886
}
28702887
for (i=0;i<MtmNodes;i++) {
28712888
MTM_LOG3("Node %d, host %s, port=%d, my port %d",i,MtmConnections[i].hostName,MtmConnections[i].postmasterPort,PostPortNumber);
2872-
if ((strcmp(MtmConnections[i].hostName,buf)==0||strcmp(MtmConnections[i].hostName,"localhost")==0)
2889+
if ((strcmp(MtmConnections[i].hostName,buf)==0||strcmp(MtmConnections[i].hostName,"localhost")==0||strcmp(MtmConnections[i].hostName,"127.0.0.1")==0)
28732890
&&MtmConnections[i].postmasterPort==PostPortNumber)
28742891
{
28752892
if (MtmNodeId==INT_MAX) {
@@ -2880,7 +2897,7 @@ static void MtmSplitConnStrs(void)
28802897
}
28812898
}
28822899
if (MtmNodeId==INT_MAX) {
2883-
MTM_ELOG(ERROR,"multimaster.node_id and host name %s can not be located in connection strings list",buf);
2900+
MTM_ELOG(ERROR,"multimaster.node_idis not specifiedand host name %s can not be located in connection strings list",buf);
28842901
}
28852902
}elseif (MtmNodeId>i) {
28862903
MTM_ELOG(ERROR,"Multimaster node id %d is out of range [%d..%d]",MtmNodeId,1,MtmNodes);

‎tests/dtmbench.cpp

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include<stdlib.h>
55
#include<inttypes.h>
66
#include<sys/time.h>
7+
#include<unistd.h>
78
#include<pthread.h>
89

910
#include<string>
@@ -22,7 +23,7 @@ template<class T>
2223
classmy_unique_ptr
2324
{
2425
T* ptr;
25-
26+
2627
public:
2728
my_unique_ptr(T* p =NULL) : ptr(p) {}
2829
~my_unique_ptr() {delete ptr; }
@@ -32,7 +33,7 @@ class my_unique_ptr
3233
voidoperator=(my_unique_ptr& other) {
3334
ptr = other.ptr;
3435
other.ptr =NULL;
35-
}
36+
}
3637
};
3738

3839
typedefvoid* (*thread_proc_t)(void*);
@@ -47,7 +48,7 @@ struct thread
4748
size_t aborts;
4849
int id;
4950

50-
voidstart(int tid,thread_proc_t proc) {
51+
voidstart(int tid,thread_proc_t proc) {
5152
id = tid;
5253
updates =0;
5354
selects =0;
@@ -56,7 +57,7 @@ struct thread
5657
pthread_create(&t,NULL, proc,this);
5758
}
5859

59-
voidwait() {
60+
voidwait() {
6061
pthread_join(t,NULL);
6162
}
6263
};
@@ -72,7 +73,7 @@ struct config
7273
bool scatter;
7374
bool avoidDeadlocks;
7475
bool subtransactions;
75-
76+
7677
config() {
7778
nReaders =1;
7879
nWriters =10;
@@ -118,7 +119,7 @@ T execQuery( transaction_base& txn, char const* sql, ...)
118119
va_end(args);
119120
result r = txn.exec(buf);
120121
return r[0][0].as(T());
121-
}
122+
}
122123

123124
void*reader(void* arg)
124125
{
@@ -134,7 +135,7 @@ void* reader(void* arg)
134135
result r = txn.exec("select sum(v) from t");
135136
int64_t sum = r[0][0].as(int64_t());
136137
if (sum != prevSum) {
137-
r = txn.exec("select mtm.get_snapshot()");
138+
r = txn.exec("select mtm.get_snapshot()");
138139
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
139140
prevSum = sum;
140141
}
@@ -144,7 +145,7 @@ void* reader(void* arg)
144145
}
145146
returnNULL;
146147
}
147-
148+
148149
void*writer(void* arg)
149150
{
150151
thread& t = *(thread*)arg;
@@ -153,17 +154,17 @@ void* writer(void* arg)
153154
conns[i] =newconnection(cfg.connections[i]);
154155
}
155156
for (int i =0; i < cfg.nIterations; i++)
156-
{
157-
//work
157+
{
158+
//work
158159
//transaction<repeatable_read> txn(*conns[random() % conns.size()]);
159160
transaction<read_committed>txn(*conns[random() % conns.size()]);
160161
int srcAcc =random() % cfg.nAccounts;
161162
int dstAcc =random() % cfg.nAccounts;
162-
if (cfg.scatter) {
163+
if (cfg.scatter) {
163164
srcAcc = srcAcc/cfg.nWriters*cfg.nWriters + t.id;
164165
dstAcc = dstAcc/cfg.nWriters*cfg.nWriters + t.id;
165-
}elseif (cfg.subtransactions) {
166-
if (dstAcc < srcAcc) {
166+
}elseif (cfg.subtransactions) {
167+
if (dstAcc < srcAcc) {
167168
int tmp = srcAcc;
168169
srcAcc = dstAcc;
169170
dstAcc = tmp;
@@ -173,7 +174,7 @@ void* writer(void* arg)
173174
subtransactionsubtxn(txn,"withdraw");
174175
exec(subtxn,"update t set v = v - 1 where u=%d", srcAcc);
175176
break;
176-
}catch (pqxx_exceptionconst& x) {
177+
}catch (pqxx_exceptionconst& x) {
177178
t.aborts +=1;
178179
}
179180
}
@@ -182,36 +183,36 @@ void* writer(void* arg)
182183
subtransactionsubtxn(txn,"deposit");
183184
exec(subtxn,"update t set v = v + 1 where u=%d", dstAcc);
184185
break;
185-
}catch (pqxx_exceptionconst& x) {
186+
}catch (pqxx_exceptionconst& x) {
186187
t.aborts +=1;
187188
}
188189
}
189-
txn.commit();
190+
txn.commit();
190191
t.transactions +=1;
191192
continue;
192-
}elseif (cfg.avoidDeadlocks) {
193-
if (dstAcc < srcAcc) {
193+
}elseif (cfg.avoidDeadlocks) {
194+
if (dstAcc < srcAcc) {
194195
int tmp = srcAcc;
195196
srcAcc = dstAcc;
196197
dstAcc = tmp;
197198
}
198199
}
199-
try {
200-
if (random() %100 < cfg.updatePercent) {
200+
try {
201+
if (random() %100 < cfg.updatePercent) {
201202
exec(txn,"update t set v = v - 1 where u=%d", srcAcc);
202203
exec(txn,"update t set v = v + 1 where u=%d", dstAcc);
203204
t.updates +=2;
204-
}else {
205+
}else {
205206
int64_t sum = execQuery<int64_t>(txn,"select v from t where u=%d", srcAcc)
206207
+ execQuery<int64_t>(txn,"select v from t where u=%d", dstAcc);
207-
if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
208+
if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
208209
printf("Wrong sum=%ld\n", sum);
209210
}
210211
t.selects +=2;
211212
}
212-
txn.commit();
213+
txn.commit();
213214
t.transactions +=1;
214-
}catch (pqxx_exceptionconst& x) {
215+
}catch (pqxx_exceptionconst& x) {
215216
txn.abort();
216217
t.aborts +=1;
217218
i -=1;
@@ -220,7 +221,24 @@ void* writer(void* arg)
220221
}
221222
returnNULL;
222223
}
223-
224+
225+
void*monitor(void* arg)
226+
{
227+
vector<thread>& writers = *(vector<thread>*)arg;
228+
time_t start =time(NULL);
229+
size_t elapsed =0;
230+
while (running) {
231+
sleep(1);
232+
long total =0;
233+
for (int i =0; i < cfg.nWriters; i++) {
234+
total += writers[i].transactions;
235+
}
236+
printf("%d: %5ld TPS\n",int(time(NULL) - start),long(total - elapsed));
237+
elapsed = total;
238+
}
239+
returnNULL;
240+
}
241+
224242
voidinitializeDatabase()
225243
{
226244
connectionconn(cfg.connections[0]);
@@ -251,15 +269,15 @@ int main (int argc, char* argv[])
251269
return1;
252270
}
253271

254-
for (int i =1; i < argc; i++) {
255-
if (argv[i][0] =='-') {
256-
switch (argv[i][1]) {
272+
for (int i =1; i < argc; i++) {
273+
if (argv[i][0] =='-') {
274+
switch (argv[i][1]) {
257275
case'r':
258276
cfg.nReaders =atoi(argv[++i]);
259277
continue;
260278
case'w':
261279
cfg.nWriters =atoi(argv[++i]);
262-
continue;
280+
continue;
263281
case'a':
264282
cfg.nAccounts =atoi(argv[++i]);
265283
continue;
@@ -300,7 +318,7 @@ int main (int argc, char* argv[])
300318
return1;
301319
}
302320

303-
if (initialize) {
321+
if (initialize) {
304322
initializeDatabase();
305323
printf("%d accounts inserted\n", cfg.nAccounts);
306324
return0;
@@ -311,34 +329,38 @@ int main (int argc, char* argv[])
311329

312330
vector<thread>readers(cfg.nReaders);
313331
vector<thread>writers(cfg.nWriters);
332+
pthread_t logger;
333+
314334
size_t nAborts =0;
315335
size_t nUpdates =0;
316336
size_t nSelects =0;
317337
size_t nTransactions =0;
318338

319-
for (int i =0; i < cfg.nReaders; i++) {
339+
for (int i =0; i < cfg.nReaders; i++) {
320340
readers[i].start(i, reader);
321341
}
322-
for (int i =0; i < cfg.nWriters; i++) {
342+
for (int i =0; i < cfg.nWriters; i++) {
323343
writers[i].start(i, writer);
324344
}
325-
326-
for (int i =0; i < cfg.nWriters; i++) {
345+
pthread_create(&logger,NULL, monitor, &writers);
346+
347+
for (int i =0; i < cfg.nWriters; i++) {
327348
writers[i].wait();
328349
nUpdates += writers[i].updates;
329350
nSelects += writers[i].selects;
330351
nAborts += writers[i].aborts;
331352
nTransactions += writers[i].transactions;
332353
}
333-
354+
334355
running =false;
335356

336-
for (int i =0; i < cfg.nReaders; i++) {
357+
for (int i =0; i < cfg.nReaders; i++) {
337358
readers[i].wait();
338359
nSelects += readers[i].selects;
339360
nTransactions += writers[i].transactions;
340361
}
341-
362+
pthread_join(logger,NULL);
363+
342364
time_t elapsed =getCurrentTime() - start;
343365

344366
printf(
@@ -347,10 +369,10 @@ int main (int argc, char* argv[])
347369
"\"readers\":%d,\"writers\":%d,\"update_percent\":%d,\"accounts\":%d,\"iterations\":%d,\"hosts\":%ld}\n",
348370
(double)(nTransactions*USEC)/elapsed,
349371
nTransactions,
350-
nSelects,
372+
nSelects,
351373
nUpdates,
352374
nAborts,
353-
(int)(nAborts*100/nTransactions),
375+
(int)(nAborts*100/nTransactions),
354376
cfg.nReaders,
355377
cfg.nWriters,
356378
cfg.updatePercent,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp