Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5903e48

Browse files
committed
Add error reporting to the libdtm snapshot acquisition method. Catch exceptions in the reader thread.
1 parent6034200 commit5903e48

File tree

3 files changed

+57
-41
lines changed

3 files changed

+57
-41
lines changed

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include<assert.h>
99
#include<time.h>
1010

11+
#include"postgres.h"
1112
#include"libdtm.h"
1213
#include"dtmd/include/proto.h"
1314
#include"dtmd/include/dtmdlimits.h"
@@ -428,8 +429,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
428429
return;
429430
failure:
430431
DiscardConnection();
431-
fprintf(
432-
stderr,
432+
elog(ERROR,
433433
"DtmGlobalGetSnapshot: failed to"
434434
" get the snapshot for xid = %d\n",
435435
xid

‎contrib/pg_dtm/tests/dtmbench.cpp‎

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -118,35 +118,40 @@ void* reader(void* arg)
118118
int64_t prevSum =0;
119119

120120
while (running) {
121-
xid_t xid =0;
122-
for (size_t i =0; i < conns.size(); i++) {
123-
worktxn(*conns[i]);
124-
if (i ==0) {
125-
xid =execQuery(txn,"select dtm_begin_transaction()");
126-
}else {
127-
exec(txn,"select dtm_join_transaction(%u)", xid);
121+
try {
122+
xid_t xid =0;
123+
for (size_t i =0; i < conns.size(); i++) {
124+
worktxn(*conns[i]);
125+
if (i ==0) {
126+
xid =execQuery(txn,"select dtm_begin_transaction()");
127+
}else {
128+
exec(txn,"select dtm_join_transaction(%u)", xid);
129+
}
130+
txn.commit();
128131
}
129-
txn.commit();
130-
}
131-
vector< unique_ptr<nontransaction> >txns(conns.size());
132-
vector< unique_ptr<pipeline> >pipes(conns.size());
133-
vector<pipeline::query_id>results(conns.size());
134-
for (size_t i =0; i < conns.size(); i++) {
135-
txns[i] =newnontransaction(*conns[i]);
136-
pipes[i] =newpipeline(*txns[i]);
137-
results[i] = pipes[i]->insert("select sum(v) from t");
138-
}
139-
int64_t sum =0;
140-
for (size_t i =0; i < conns.size(); i++) {
141-
pipes[i]->complete();
142-
result r = pipes[i]->retrieve(results[i]);
143-
sum += r[0][0].as(int64_t());
144-
}
145-
if (sum != prevSum) {
146-
printf("Total=%ld xid=%u\n", sum, xid);
147-
prevSum = sum;
132+
vector< unique_ptr<nontransaction> >txns(conns.size());
133+
vector< unique_ptr<pipeline> >pipes(conns.size());
134+
vector<pipeline::query_id>results(conns.size());
135+
for (size_t i =0; i < conns.size(); i++) {
136+
txns[i] =newnontransaction(*conns[i]);
137+
pipes[i] =newpipeline(*txns[i]);
138+
results[i] = pipes[i]->insert("select sum(v) from t");
139+
}
140+
int64_t sum =0;
141+
for (size_t i =0; i < conns.size(); i++) {
142+
pipes[i]->complete();
143+
result r = pipes[i]->retrieve(results[i]);
144+
sum += r[0][0].as(int64_t());
145+
}
146+
if (sum != prevSum) {
147+
printf("Total=%ld xid=%u\n", sum, xid);
148+
prevSum = sum;
149+
}
150+
t.proceeded +=1;
151+
}catch (pqxx_exceptionconst& x) {
152+
printf("reader exception\n");
153+
continue;
148154
}
149-
t.proceeded +=1;
150155
}
151156
returnNULL;
152157
}
@@ -174,8 +179,13 @@ void* writer(void* arg)
174179
nontransactionsrcTx(*conns[srcCon]);
175180
nontransactiondstTx(*conns[dstCon]);
176181

177-
xid_t xid =execQuery(srcTx,"select dtm_begin_transaction()");
178-
exec(dstTx,"select dtm_join_transaction(%u)", xid);
182+
try {
183+
xid_t xid =execQuery(srcTx,"select dtm_begin_transaction()");
184+
exec(dstTx,"select dtm_join_transaction(%u)", xid);
185+
}catch (pqxx_exceptionconst& x) {
186+
i -=1;
187+
continue;
188+
}
179189

180190
exec(srcTx,"begin transaction isolation level %s", cfg.isolationLevel);
181191
exec(dstTx,"begin transaction isolation level %s", cfg.isolationLevel);
@@ -213,14 +223,19 @@ void* writer(void* arg)
213223
voidinitializeDatabase()
214224
{
215225
for (size_t i =0; i < cfg.connections.size(); i++) {
216-
connectionconn(cfg.connections[i]);
217-
worktxn(conn);
218-
exec(txn,"drop extension if exists pg_dtm");
219-
exec(txn,"create extension pg_dtm");
220-
exec(txn,"drop table if exists t");
221-
exec(txn,"create table t(u int primary key, v int)");
222-
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
223-
txn.commit();
226+
try {
227+
connectionconn(cfg.connections[i]);
228+
worktxn(conn);
229+
exec(txn,"drop extension if exists pg_dtm");
230+
exec(txn,"create extension pg_dtm");
231+
exec(txn,"drop table if exists t");
232+
exec(txn,"create table t(u int primary key, v int)");
233+
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
234+
txn.commit();
235+
}catch (pqxx_exceptionconst& x) {
236+
i -=1;
237+
continue;
238+
}
224239
}
225240
}
226241

@@ -308,3 +323,4 @@ int main (int argc, char* argv[])
308323
}
309324
return0;
310325
}
326+
// vim: sts=4 ts=4 sw=4 expandtab

‎contrib/pg_dtm/tests/makefile‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CXXFLAGS=-g -Wall -O2 -pthread
44
all: dtmbench
55

66
dtmbench: dtmbench.cpp
7-
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
7+
$(CXX)$(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx -lpq
88

99
clean:
10-
rm -f dtmbench
10+
rm -f dtmbench

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp