Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit877f5cb

Browse files
committed
Add dtm_recovery
1 parentc7ed87e commit877f5cb

File tree

4 files changed

+171
-6
lines changed

4 files changed

+171
-6
lines changed

‎dtm_recovery/dtm_recovery.cpp‎

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#include<iostream>
2+
#include<string>
3+
#include<vector>
4+
#include<set>
5+
6+
#include<pqxx/connection>
7+
#include<pqxx/transaction>
8+
#include<pqxx/nontransaction>
9+
10+
usingnamespacestd;
11+
usingnamespacepqxx;
12+
13+
intmain (int argc,char* argv[])
14+
{
15+
if (argc ==1){
16+
printf("Use -h to show usage options\n");
17+
return1;
18+
}
19+
vector<string> connections;
20+
set<string> prepared_xacts;
21+
set<string> committed_xacts;
22+
bool verbose =false;
23+
for (int i =1; i < argc; i++) {
24+
if (argv[i][0] =='-') {
25+
switch (argv[i][1]) {
26+
case'C':
27+
case'c':
28+
connections.push_back(string(argv[++i]));
29+
continue;
30+
case'v':
31+
verbose =true;
32+
continue;
33+
}
34+
}
35+
printf("Perform recovery of pg_tsdtm cluster.\n"
36+
"Usage: dtm_recovery {options}\n"
37+
"Options:\n"
38+
"\t-c STR\tdatabase connection string\n"
39+
"\t-v\tverbose mode: print extra information while processing\n");
40+
return1;
41+
}
42+
if (verbose) {
43+
cout <<"Collecting information about prepared transactions...\n";
44+
}
45+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
46+
{
47+
if (verbose) {
48+
cout <<"Connecting to" << *ic <<"...\n";
49+
}
50+
connectioncon(*ic);
51+
worktxn(con);
52+
result r = txn.exec("select gid from pg_prepared_xacts");
53+
for (result::const_iterator it = r.begin(); it != r.end(); ++it)
54+
{
55+
string gid = it.at("gid").as(string());
56+
prepared_xacts.insert(gid);
57+
}
58+
txn.commit();
59+
}
60+
if (verbose) {
61+
cout <<"Prepared transactions:";
62+
for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
63+
{
64+
cout << *it <<",";
65+
}
66+
cout <<"\nChecking which of them are committed...\n";
67+
}
68+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
69+
{
70+
if (verbose) {
71+
cout <<"Connecting to" << *ic <<"...\n";
72+
}
73+
connectioncon(*ic);
74+
worktxn(con);
75+
con.prepare("commit-check","select * from pg_committed_xacts where gid=$1");
76+
for (set<string>::iterator it = prepared_xacts.begin(); it != prepared_xacts.end(); ++it)
77+
{
78+
string gid = *it;
79+
result r = txn.prepared("commit-check")(gid).exec();
80+
if (!r.empty()) {
81+
committed_xacts.insert(gid);
82+
}
83+
}
84+
txn.commit();
85+
}
86+
if (verbose) {
87+
cout <<"Committed transactions:";
88+
for (set<string>::iterator it = committed_xacts.begin(); it != committed_xacts.end(); ++it)
89+
{
90+
cout << *it <<",";
91+
}
92+
cout <<"\nCommitting them at all nodes...\n";
93+
}
94+
for (vector<string>::iterator ic = connections.begin(); ic != connections.end(); ++ic)
95+
{
96+
if (verbose) {
97+
cout <<"Connecting to" << *ic <<"...\n";
98+
}
99+
connectioncon(*ic);
100+
worktxn(con);
101+
con.prepare("commit-check","select * from pg_committed_xacts where gid=$1");
102+
con.prepare("commit-prepared","commit prepared $1");
103+
con.prepare("rollback-prepared","rollback prepared $1");
104+
result r = txn.exec("select gid from pg_prepared_xacts");
105+
for (result::const_iterator it = r.begin(); it != r.end(); ++it)
106+
{
107+
string gid = it.at("gid").as(string());
108+
result rc = txn.prepared("commit-check")(gid).exec();
109+
if (rc.empty()) {
110+
if (committed_xacts.find(gid) != committed_xacts.end()) {
111+
if (verbose) {
112+
cout <<"Commit transaction" << gid <<"\n";
113+
}
114+
txn.prepared("commit-prepared")(gid);
115+
}else {
116+
if (verbose) {
117+
cout <<"Rollback transaction" << gid <<"\n";
118+
}
119+
txn.prepared("rollback-prepared")(gid);
120+
}
121+
}
122+
}
123+
txn.commit();
124+
}
125+
if (verbose) {
126+
cout <<"Recovery completed\n";
127+
}
128+
return0;
129+
}

‎dtm_recovery/makefile‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CXX=g++
2+
CXXFLAGS=-g -Wall -O0 -pthread
3+
4+
all: dtm_recovery
5+
6+
dtm_recovery: dtm_recovery.cpp
7+
$(CXX)$(CXXFLAGS) -o dtm_recovery dtm_recovery.cpp -lpqxx
8+
9+
clean:
10+
rm -f dtm_recovery

‎pg_dtm.c‎

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include"access/xlog.h"
2626
#include"access/clog.h"
2727
#include"access/twophase.h"
28+
#include"executor/spi.h"
2829
#include"utils/hsearch.h"
2930
#include"utils/tqual.h"
3031
#include<utils/guc.h>
@@ -81,6 +82,7 @@ static DtmNodeState* local;
8182
staticDtmCurrentTransdtm_tx;
8283
staticuint64totalSleepInterrupts;
8384
staticintDtmVacuumDelay;
85+
staticboolDtmRecordCommits;
8486

8587
staticSnapshotDtmGetSnapshot(Snapshotsnapshot);
8688
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum);
@@ -211,6 +213,19 @@ _PG_init(void)
211213
NULL
212214
);
213215

216+
DefineCustomBoolVariable(
217+
"dtm.record_commits",
218+
"Store information about committed global transactions in pg_committed_xacts table",
219+
NULL,
220+
&DtmRecordCommit,
221+
false,
222+
PGC_BACKEND,
223+
0,
224+
NULL,
225+
NULL,
226+
NULL
227+
);
228+
214229

215230
/*
216231
* Install hooks.
@@ -684,6 +699,17 @@ void DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
684699
DTM_TRACE((stderr,"Prepare transaction %u(%s) with CSN %lu\n",id->xid,gtid,cid));
685700
}
686701
SpinLockRelease(&local->lock);
702+
if (DtmRecordCommits) {
703+
charstmt[MAX_GTID_SIZE+64];
704+
intrc;
705+
sprintf(stmt,"insert into pg_committed_xacts values ('%s')",gtid);
706+
SPI_connect();
707+
rc=SPI_execute(stmt, true,0);
708+
SPI_finish();
709+
if (rc!=SPI_OK_INSERT) {
710+
elog(ERROR,"Failed to insert GTID %s in table pg_committed_xacts",gtid);
711+
}
712+
}
687713
}
688714

689715
voidDtmLocalCommitPrepared(DtmCurrentTrans*x,GlobalTransactionIdgtid)

‎tests/dtmbench.cpp‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@ using namespace std;
1717
usingnamespacepqxx;
1818

1919
template<classT>
20-
classunique_ptr
20+
classmy_unique_ptr
2121
{
2222
T* ptr;
2323

2424
public:
25-
unique_ptr(T* p =NULL) : ptr(p) {}
26-
~unique_ptr() {delete ptr; }
25+
my_unique_ptr(T* p =NULL) : ptr(p) {}
26+
~my_unique_ptr() {delete ptr; }
2727
T&operator*() {return *ptr; }
2828
T*operator->() {return ptr; }
2929
voidoperator=(T* p) { ptr = p; }
30-
voidoperator=(unique_ptr& other) {
30+
voidoperator=(my_unique_ptr& other) {
3131
ptr = other.ptr;
3232
other.ptr =NULL;
3333
}
@@ -122,15 +122,15 @@ int64_t execQuery( transaction_base& txn, char const* sql, ...)
122122
void*reader(void* arg)
123123
{
124124
thread& t = *(thread*)arg;
125-
vector<unique_ptr<connection> >conns(cfg.connections.size());
125+
vector<my_unique_ptr<connection> >conns(cfg.connections.size());
126126
for (size_t i =0; i < conns.size(); i++) {
127127
conns[i] =newconnection(cfg.connections[i]);
128128
}
129129
int64_t prevSum =0;
130130

131131
while (running) {
132132
csn_t snapshot =0;
133-
vector<unique_ptr<work> >txns(conns.size());
133+
vector<my_unique_ptr<work> >txns(conns.size());
134134
time_t start =getCurrentTime();
135135
for (size_t i =0; i < conns.size(); i++) {
136136
txns[i] =newwork(*conns[i]);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp