Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitb99ad93

Browse files
knizhnikkelvich
authored andcommitted
Support table copy between multimaster nodes
1 parentb85c8af commitb99ad93

File tree

9 files changed

+196
-42
lines changed

9 files changed

+196
-42
lines changed

‎arbitrator/arbitrator.cpp

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include<stdio.h>
33
#include<stdarg.h>
44
#include<stdlib.h>
5+
#include<unistd.h>
56
#include<inttypes.h>
67
#include<sys/time.h>
78
#include<pthread.h>
@@ -34,18 +35,34 @@ class my_unique_ptr
3435
public:
3536
my_unique_ptr(T* p =NULL) : ptr(p) {}
3637
~my_unique_ptr() {delete ptr; }
37-
T&operator*() {return *ptr; }
38-
T*operator->() {return ptr; }
39-
voidoperator=(T* p) { ptr = p; }
38+
T&operator*()const {return *ptr; }
39+
T*operator->()const {return ptr; }
40+
booloperator ==(Tconst* p)const {return ptr == p; }
41+
booloperator !=(Tconst* p)const {return ptr != p; }
42+
voidoperator=(T* p) {
43+
delete ptr;
44+
ptr = p;
45+
}
4046
voidoperator=(my_unique_ptr& other) {
47+
delete ptr;
4148
ptr = other.ptr;
4249
other.ptr =NULL;
4350
}
4451
};
4552

53+
structconfig
54+
{
55+
int timeout;
56+
vector<string> connections;
57+
58+
config() {
59+
timeout =1000000;// 1 second
60+
}
61+
};
62+
4663
intmain (int argc,char* argv[])
4764
{
48-
vector<string> connection_strings;
65+
config cfg;
4966

5067
if (argc ==1){
5168
printf("Use -h to show usage options\n");
@@ -56,26 +73,27 @@ int main (int argc, char* argv[])
5673
if (argv[i][0] =='-') {
5774
switch (argv[i][1]) {
5875
case't':
59-
cfs.timeout =atoi(argv[++i]);
76+
cfg.timeout =atoi(argv[++i]);
6077
continue;
6178
case'c':
6279
cfg.connections.push_back(string(argv[++i]));
6380
continue;
6481
}
6582
}
6683
printf("Options:\n"
67-
"\t-t TIMEOUT\ttimeout inseconds of waiting database connection string\n"
84+
"\t-t TIMEOUT\ttimeout inmicroseconds of waiting database connection string (default: 1 second)\n"
6885
"\t-c STR\tdatabase connection string\n");
6986
return1;
7087
}
7188

72-
size_t nConns =connection_strings.size();
89+
size_t nConns =cfg.connections.size();
7390
vector< my_unique_ptr<connection> >conns(nConns);
7491
for (size_t i =0; i < nConns; i++) {
75-
conns[i] =newconnection(connection_strings[i]);
92+
conns[i] =newconnection(cfg.connections[i]);
7693
}
7794
nodemask_t disabledMask =0;
78-
nodemask_t enabledMask =0;
95+
nodemask_t newEnabledMask =0;
96+
nodemask_t oldEnabledMask =0;
7997

8098
while (true) {
8199
vector< my_unique_ptr<nontransaction> >txns(conns.size());
@@ -84,46 +102,63 @@ int main (int argc, char* argv[])
84102
char sql[128];
85103
sprintf(sql,"select mtm.arbitrator_poll(%lld)", disabledMask);
86104

105+
// Initiate queries to all live nodes
87106
for (size_t i =0; i < nConns; i++) {
88-
if (BIT_CHECK(disabledMask, i)) {
89-
if (BIT_CHECK(enabledMask, i)) {
107+
// Some of live node reestablished connection with dead node, so arbitrator should also try to connect to this node
108+
if (conns[i] ==NULL) {
109+
if (BIT_CHECK(newEnabledMask, i)) {
90110
try {
91-
delete conns[i];
92-
conns[i] =newconnection(connection_strings[i]);
111+
conns[i] =newconnection(cfg.connections[i]);
93112
BIT_CLEAR(disabledMask, i);
113+
fprintf(stdout,"Reestablish connection with node %d\n", (int)i+1);
94114
}catch (pqxx_exceptionconst& x) {
95-
conns[i] =NULL;
96-
fprintf(stderr,"Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
115+
if (conns[i] ==NULL) {
116+
conns[i] =NULL;
117+
fprintf(stderr,"Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
118+
}
97119
}
98120
}
99-
}
100-
if (!BIT_CHECK(disabledMask, i)) {
121+
}else {
101122
txns[i] =newnontransaction(*conns[i]);
102123
pipes[i] =newpipeline(*txns[i]);
103124
queries[i] = pipes[i]->insert(sql);
104125
}
105-
sleep(cfg.timeout);
106-
enabledMask =0;
107-
for (size_t i =0; i < nConns; i++) {
108-
if (!BIT_CHECK(didsabledMask, i)) {
109-
if (!pipes[i]->is_finished(queries[i]))
110-
{
111-
fprintf(stderr,"Doesn't receive response from node %d within %d seconds\n", (int)i+1, cfs.timeout);
112-
BIT_SET(disabledMask, i);
113-
delete conns[i];
126+
}
127+
// Wait some time
128+
usleep(cfg.timeout);
129+
oldEnabledMask = newEnabledMask;
130+
newEnabledMask = ~0;
131+
for (size_t i =0; i < nConns; i++) {
132+
if (!BIT_CHECK(disabledMask, i)) {
133+
if (!pipes[i]->is_finished(queries[i])) {
134+
fprintf(stderr,"Doesn't receive response from node %d within %d microseconds\n", (int)i+1, cfg.timeout);
135+
BIT_SET(disabledMask, i);
136+
conns[i] =NULL;
137+
}else {
138+
try {
139+
result r = pipes[i]->retrieve(queries[i]);
140+
newEnabledMask &= r[0][0].as(nodemask_t());
141+
}catch (pqxx_exceptionconst& x) {
114142
conns[i] =NULL;
115-
}else {
116-
try {
117-
result r = pipes[i]->retrieve(results[i]);
118-
enabledMask |= r[0][0].as(nodemask_t());
119-
}catch (pqxx_exceptionconst& x) {
120-
delete conns[i];
121-
conns[i] =NULL;
122-
fprintf(stderr,"Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
123-
}
124-
}
143+
fprintf(stderr,"Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
144+
}
125145
}
126146
}
127147
}
148+
if (newEnabledMask == ~0) {
149+
if (oldEnabledMask != ~0) {
150+
fprintf(stdout,"There are no more live nodes\n");
151+
}
152+
// No live nodes:
153+
disabledNodeMask =0;
154+
}else {
155+
if (newEnabledMask != oldEnabledMask) {
156+
for (size_t i =0; i < nConns; i++) {
157+
if (BIT_CHECK(newEnabledMask ^ oldEnabledMask, i)) {
158+
fprintf(stdout,"Node %d is %s\n", (int)i+1,BIT_CHECK(newEnabledMask, i) ?"enabled" :"disabled");
159+
}
160+
}
161+
}
162+
}
128163
}
129164
}

‎multimaster--1.0.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
7575
AS'MODULE_PATHNAME','mtm_make_table_local'
7676
LANGUAGE C;
7777

78+
CREATEFUNCTIONmtm.broadcast_table(srcTable regclass, dstNodesMaskbigint) RETURNS void
79+
AS'MODULE_PATHNAME','mtm_broadcast_table'
80+
LANGUAGE C;
81+
82+
CREATEFUNCTIONmtm.copy_table(srcTable regclass, dstNodeinteger) RETURNS void
83+
AS'MODULE_PATHNAME','mtm_copy_table'
84+
LANGUAGE C;
85+
7886
CREATEFUNCTIONmtm.dump_lock_graph() RETURNStext
7987
AS'MODULE_PATHNAME','mtm_dump_lock_graph'
8088
LANGUAGE C;

‎multimaster.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
129129
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
130130
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
131131
PG_FUNCTION_INFO_V1(mtm_arbitrator_poll);
132+
PG_FUNCTION_INFO_V1(mtm_broadcast_table);
133+
PG_FUNCTION_INFO_V1(mtm_copy_table);
132134

133135
staticSnapshotMtmGetSnapshot(Snapshotsnapshot);
134136
staticvoidMtmInitialize(void);
@@ -4382,6 +4384,24 @@ mtm_collect_cluster_info(PG_FUNCTION_ARGS)
43824384
}
43834385
}
43844386

4387+
Datummtm_broadcast_table(PG_FUNCTION_ARGS)
4388+
{
4389+
MtmCopyRequestcopy;
4390+
copy.sourceTable=PG_GETARG_OID(0);
4391+
copy.targetNodes=PG_GETARG_INT64(1);
4392+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4393+
PG_RETURN_VOID();
4394+
}
4395+
4396+
Datummtm_copy_table(PG_FUNCTION_ARGS)
4397+
{
4398+
MtmCopyRequestcopy;
4399+
copy.sourceTable=PG_GETARG_OID(0);
4400+
copy.targetNodes= (nodemask_t)1 << (PG_GETARG_INT32(1)-1);
4401+
LogLogicalMessage("B", (char*)&copy,sizeof(copy), false);
4402+
PG_RETURN_VOID();
4403+
}
4404+
43854405

43864406
Datummtm_make_table_local(PG_FUNCTION_ARGS)
43874407
{

‎multimaster.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,13 @@ typedef struct
207207
}MtmConnectionInfo;
208208

209209

210+
typedefstruct
211+
{
212+
OidsourceTable;
213+
nodemask_ttargetNodes;
214+
}MtmCopyRequest;
215+
216+
210217
typedefstruct
211218
{
212219
MtmConnectionInfocon;

‎pglogical_apply.c

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include"catalog/catversion.h"
1515
#include"catalog/dependency.h"
1616
#include"catalog/index.h"
17+
#include"catalog/heap.h"
1718
#include"catalog/namespace.h"
1819
#include"catalog/pg_type.h"
1920

@@ -367,6 +368,49 @@ process_remote_begin(StringInfo s)
367368
return true;
368369
}
369370

371+
staticvoid
372+
process_broadcast_table(StringInfos)
373+
{
374+
Relationrel;
375+
charch;
376+
EState*estate;
377+
TupleDatanew_tuple;
378+
TupleTableSlot*newslot;
379+
TupleTableSlot*oldslot;
380+
HeapTupletup;
381+
382+
StartTransactionCommand();
383+
384+
ch=pq_getmsgbyte(s);
385+
Assert(ch=='R');
386+
rel=read_rel(s,AccessExclusiveLock);
387+
388+
heap_truncate_one_rel(rel);
389+
390+
estate=create_rel_estate(rel);
391+
newslot=ExecInitExtraTupleSlot(estate);
392+
oldslot=ExecInitExtraTupleSlot(estate);
393+
ExecSetSlotDescriptor(newslot,RelationGetDescr(rel));
394+
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
395+
396+
ExecOpenIndices(estate->es_result_relation_info, false);
397+
398+
while (s->cursor!=s->len) {
399+
read_tuple_parts(s,rel,&new_tuple);
400+
tup=heap_form_tuple(RelationGetDescr(rel),
401+
new_tuple.values,new_tuple.isnull);
402+
ExecStoreTuple(tup,newslot,InvalidBuffer, true);
403+
simple_heap_insert(rel,newslot->tts_tuple);
404+
UserTableUpdateOpenIndexes(estate,newslot);
405+
}
406+
407+
ExecCloseIndices(estate->es_result_relation_info);
408+
ExecResetTupleTable(estate->es_tupleTable, true);
409+
FreeExecutorState(estate);
410+
411+
CommitTransactionCommand();
412+
}
413+
370414
staticbool
371415
process_remote_message(StringInfos)
372416
{
@@ -377,6 +421,12 @@ process_remote_message(StringInfo s)
377421

378422
switch (action)
379423
{
424+
case'B':
425+
{
426+
process_broadcast_table(s);
427+
standalone= true;
428+
break;
429+
}
380430
case'C':
381431
{
382432
MTM_LOG1("%d: Executing non-tx utility statement %s",MyProcPid,messageBody);

‎pglogical_output.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ pg_decode_message(LogicalDecodingContext *ctx,
557557
PGLogicalOutputData*data= (PGLogicalOutputData*)ctx->output_plugin_private;
558558

559559
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
560-
data->api->write_message(ctx->out,prefix,sz,message);
560+
data->api->write_message(ctx->out,data,prefix,sz,message);
561561
MtmOutputPluginWrite(ctx, true, !transactional);
562562
}
563563

‎pglogical_proto.c

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include"utils/syscache.h"
3535
#include"utils/timestamp.h"
3636
#include"utils/typcache.h"
37+
#include"utils/snapmgr.h"
3738

3839
#include"multimaster.h"
3940
#include"pglogical_relid_map.h"
@@ -61,7 +62,7 @@ static void pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
6162
Relationrel,HeapTupleoldtuple);
6263

6364
staticvoidpglogical_write_tuple(StringInfoout,PGLogicalOutputData*data,
64-
Relationrel,HeapTupletuple);
65+
Relationrel,HeapTupletuple);
6566
staticchardecide_datum_transfer(Form_pg_attributeatt,
6667
Form_pg_typetypclass,
6768
boolallow_internal_basetypes,
@@ -167,8 +168,38 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
167168
}
168169
}
169170

171+
staticvoidpglogical_broadcast_table(StringInfoout,PGLogicalOutputData*data,MtmCopyRequest*copy)
172+
{
173+
if (BIT_CHECK(copy->targetNodes,MtmReplicationNodeId-1)) {
174+
HeapScanDescscandesc;
175+
HeapTupletuple;
176+
Relationrel;
177+
178+
StartTransactionCommand();
179+
180+
rel=heap_open(copy->sourceTable,ShareLock);
181+
182+
pq_sendbyte(out,'M');
183+
pq_sendbyte(out,'B');
184+
pq_sendint(out,sizeof(*copy),4);
185+
pq_sendbytes(out, (char*)copy,sizeof(*copy));
186+
187+
pglogical_write_rel(out,data,rel);
188+
189+
scandesc=heap_beginscan(rel,GetTransactionSnapshot(),0,NULL);
190+
while ((tuple=heap_getnext(scandesc,ForwardScanDirection))!=NULL)
191+
{
192+
pglogical_write_tuple(out,data,rel,tuple);
193+
}
194+
heap_endscan(scandesc);
195+
heap_close(rel,ShareLock);
196+
197+
CommitTransactionCommand();
198+
}
199+
}
200+
170201
staticvoid
171-
pglogical_write_message(StringInfoout,
202+
pglogical_write_message(StringInfoout,PGLogicalOutputData*data,
172203
constchar*prefix,Sizesz,constchar*message)
173204
{
174205
MtmLastRelId=InvalidOid;
@@ -199,6 +230,9 @@ pglogical_write_message(StringInfo out,
199230
* so no need to send that to replicas.
200231
*/
201232
return;
233+
case'B':
234+
pglogical_broadcast_table(out,data, (MtmCopyRequest*)message);
235+
return;
202236
}
203237
pq_sendbyte(out,'M');
204238
pq_sendbyte(out,*prefix);

‎pglogical_proto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedefvoid (*pglogical_write_begin_fn)(StringInfoout,structPGLogicalOutputData*data,
2323
ReorderBufferTXN*txn);
24-
typedefvoid (*pglogical_write_message_fn)(StringInfoout,
24+
typedefvoid (*pglogical_write_message_fn)(StringInfoout,structPGLogicalOutputData*data,
2525
constchar*prefix,Sizesz,constchar*message);
2626
typedefvoid (*pglogical_write_commit_fn)(StringInfoout,structPGLogicalOutputData*data,
2727
ReorderBufferTXN*txn,XLogRecPtrcommit_lsn);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp