Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit75895d3

Browse files
knizhnikkelvich
authored andcommitted
New DDL replication mechanism
1 parent3911ec5 commit75895d3

File tree

9 files changed

+134
-51
lines changed

9 files changed

+134
-51
lines changed

‎multimaster--1.0.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use"CREATE EXTENSION multimaster" to load this file. \quit
33

4-
CREATEFUNCTIONmtm_start_replication() RETURNS void
4+
CREATEFUNCTIONmtm.start_replication() RETURNS void
55
AS'MODULE_PATHNAME','mtm_start_replication'
66
LANGUAGE C;
77

8-
CREATEFUNCTIONmtm_stop_replication() RETURNS void
8+
CREATEFUNCTIONmtm.stop_replication() RETURNS void
99
AS'MODULE_PATHNAME','mtm_stop_replication'
1010
LANGUAGE C;
1111

12-
CREATEFUNCTIONmtm_drop_node(nodeinteger, drop_slot bool default false) RETURNS void
12+
CREATEFUNCTIONmtm.drop_node(nodeinteger, drop_slot bool default false) RETURNS void
1313
AS'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATEFUNCTIONmtm_get_snapshot() RETURNSbigint
16+
CREATEFUNCTIONmtm.get_snapshot() RETURNSbigint
1717
AS'MODULE_PATHNAME','mtm_get_snapshot'
1818
LANGUAGE C;
1919

20+
CREATETABLEIF NOT EXISTSmtm.ddl_log (issuedtimestamp with time zonenot null, querytext);

‎multimaster.c

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
#include"replication/slot.h"
4949
#include"port/atomics.h"
5050
#include"tcop/utility.h"
51+
#include"nodes/makefuncs.h"
52+
#include"access/htup_details.h"
53+
#include"catalog/indexing.h"
5154

5255
#include"multimaster.h"
5356

@@ -865,7 +868,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
865868
dtm->nNodes-=1;
866869
if (!IsTransactionBlock())
867870
{
868-
MtmBroadcastUtilityStmt(psprintf("selectmtm_drop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
871+
MtmBroadcastUtilityStmt(psprintf("selectmultimaster.drop_node(%d,%s)",nodeId,dropSlot ?"true" :"false"), true);
869872
}
870873
if (dropSlot)
871874
{
@@ -880,7 +883,7 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
880883
{
881884
PG_RETURN_INT64(dtmTx.snapshot);
882885
}
883-
886+
884887
/*
885888
* Execute statement with specified parameters and check its result
886889
*/
@@ -926,7 +929,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
926929
failedNode=i;
927930
do {
928931
PQfinish(conns[i]);
929-
}while (--i >=0);
932+
}while (--i >=0);
930933
elog(ERROR,"Failed to establish connection '%s' to node %d",conn_str,failedNode);
931934
}
932935
}
@@ -935,7 +938,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
935938
i+=1;
936939
}
937940
Assert(i==MtmNodes);
938-
941+
939942
for (i=0;i<MtmNodes;i++)
940943
{
941944
if (conns[i])
@@ -972,7 +975,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
972975
failedNode=i;
973976
}
974977
}
975-
}
978+
}
976979
for (i=0;i<MtmNodes;i++)
977980
{
978981
if (conns[i])
@@ -986,6 +989,48 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
986989
}
987990
}
988991

992+
staticvoidMtmProcessDDLCommand(charconst*queryString)
993+
{
994+
RangeVar*rv;
995+
Relationrel;
996+
TupleDesctupDesc;
997+
HeapTupletup;
998+
Datumvalues[Natts_mtm_ddl_log];
999+
boolnulls[Natts_mtm_ddl_log];
1000+
TimestampTzts=GetCurrentTimestamp();
1001+
1002+
rv=makeRangeVar(MULTIMASTER_SCHEMA_NAME,MULTIMASTER_DDL_TABLE,-1);
1003+
rel=heap_openrv_extended(rv,RowExclusiveLock, true);
1004+
1005+
if (rel==NULL) {
1006+
return;
1007+
}
1008+
1009+
tupDesc=RelationGetDescr(rel);
1010+
1011+
/* Form a tuple. */
1012+
memset(nulls, false,sizeof(nulls));
1013+
1014+
values[Anum_mtm_ddl_log_issued-1]=TimestampTzGetDatum(ts);
1015+
values[Anum_mtm_ddl_log_query-1]=CStringGetTextDatum(queryString);
1016+
1017+
tup=heap_form_tuple(tupDesc,values,nulls);
1018+
1019+
/* Insert the tuple to the catalog. */
1020+
simple_heap_insert(rel,tup);
1021+
1022+
/* Update the indexes. */
1023+
CatalogUpdateIndexes(rel,tup);
1024+
1025+
/* Cleanup. */
1026+
heap_freetuple(tup);
1027+
heap_close(rel,RowExclusiveLock);
1028+
1029+
elog(WARNING,"Replicate command: '%s'",queryString);
1030+
1031+
dtmTx.containsDML= true;
1032+
}
1033+
9891034
staticvoidMtmProcessUtility(Node*parsetree,constchar*queryString,
9901035
ProcessUtilityContextcontext,ParamListInfoparams,
9911036
DestReceiver*dest,char*completionTag)
@@ -1013,22 +1058,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
10131058
skipCommand= false;
10141059
break;
10151060
}
1016-
if (skipCommand||IsTransactionBlock()) {
1017-
if (PreviousProcessUtilityHook!=NULL)
1018-
{
1019-
PreviousProcessUtilityHook(parsetree,queryString,context,
1020-
params,dest,completionTag);
1021-
}
1022-
else
1023-
{
1024-
standard_ProcessUtility(parsetree,queryString,context,
1025-
params,dest,completionTag);
1026-
}
1027-
if (!skipCommand) {
1028-
dtmTx.isDistributed= false;
1029-
}
1030-
}else {
1031-
MtmBroadcastUtilityStmt(queryString, false);
1061+
if (!skipCommand&& !dtmTx.isReplicated) {
1062+
MtmProcessDDLCommand(queryString);
1063+
}
1064+
if (PreviousProcessUtilityHook!=NULL)
1065+
{
1066+
PreviousProcessUtilityHook(parsetree,queryString,context,
1067+
params,dest,completionTag);
1068+
}
1069+
else
1070+
{
1071+
standard_ProcessUtility(parsetree,queryString,context,
1072+
params,dest,completionTag);
10321073
}
10331074
}
10341075

‎multimaster.control

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
comment = 'Multimaster'
22
default_version = '1.0'
33
module_pathname = '$libdir/multimaster'
4-
relocatable = true
4+
schema = mtm
5+
relocatable = false

‎multimaster.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include"bgwpool.h"
66

77
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8-
#defineMTM_TRACE(fmt, ...)
8+
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
99
#defineMTM_TUPLE_TRACE(fmt, ...)
1010
/*
1111
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -14,7 +14,14 @@
1414

1515
#defineBIT_SET(mask,bit) ((mask) & ((int64)1 << (bit)))
1616

17-
#defineMULTIMASTER_NAME "mmts"
17+
#defineMULTIMASTER_NAME "mtm"
18+
#defineMULTIMASTER_SCHEMA_NAME "mtm"
19+
#defineMULTIMASTER_DDL_TABLE "ddl_log"
20+
21+
#defineNatts_mtm_ddl_log 2
22+
#defineAnum_mtm_ddl_log_issued1
23+
#defineAnum_mtm_ddl_log_query2
24+
1825

1926
typedefuint64csn_t;/* commit serial number */
2027
#defineINVALID_CSN ((csn_t)-1)

‎pglogical_apply.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,13 +472,14 @@ process_remote_commit(StringInfo s)
472472
staticvoid
473473
process_remote_insert(StringInfos,Relationrel)
474474
{
475-
EState*estate;
476-
TupleDatanew_tuple;
475+
EState*estate;
476+
TupleDatanew_tuple;
477477
TupleTableSlot*newslot;
478478
TupleTableSlot*oldslot;
479479
ResultRelInfo*relinfo;
480-
ScanKey*index_keys;
481-
inti;
480+
ScanKey*index_keys;
481+
char*relname=RelationGetRelationName(rel);
482+
inti;
482483

483484
estate=create_rel_estate(rel);
484485
newslot=ExecInitExtraTupleSlot(estate);
@@ -560,6 +561,18 @@ process_remote_insert(StringInfo s, Relation rel)
560561
FreeExecutorState(estate);
561562

562563
CommandCounterIncrement();
564+
565+
if (strcmp(relname,MULTIMASTER_DDL_TABLE)==0) {
566+
char*ddl=TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
567+
intrc;
568+
SPI_connect();
569+
rc=SPI_execute(ddl, false,0);
570+
SPI_finish();
571+
if (rc!=SPI_OK_UTILITY) {
572+
elog(ERROR,"Failed to execute utility statement %s",ddl);
573+
}
574+
}
575+
563576
}
564577

565578
staticvoid

‎tests/dtmbench.cpp

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,28 +180,20 @@ void* writer(void* arg)
180180
voidinitializeDatabase()
181181
{
182182
connectionconn(cfg.connections[0]);
183-
printf("creating extension\n");
184-
{
185-
nontransactiontxn(conn);
186-
exec(txn,"drop extension if exists multimaster");
187-
exec(txn,"create extension multimaster");
188-
}
189-
printf("extension created\n");
190-
191-
printf("creating table t\n");
183+
time_t start =getCurrentTime();
184+
printf("Creating database schema...\n");
192185
{
193186
nontransactiontxn(conn);
194187
exec(txn,"drop table if exists t");
195188
exec(txn,"create table t(u int primary key, v int)");
196189
}
197-
printf("table t created\n");
198-
printf("inserting stuff into t\n");
190+
printf("Populating data...\n");
199191
{
200192
worktxn(conn);
201193
exec(txn,"insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1,0);
202194
txn.commit();
203195
}
204-
printf("stuff inserted\n");
196+
printf("Initialization completed in %f seconds\n", (start -getCurrentTime())/100000.0);
205197
}
206198

207199
intmain (int argc,char* argv[])

‎tests/perf.results

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,24 @@ Bench started at Пн. февр. 15 17:26:11 MSK 2016
117117
astro5:{tps:96460.088384, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
118118
Bench finished at Пн. февр. 15 17:26:22 MSK 2016
119119
Bench started at Пн. февр. 15 17:26:41 MSK 2016
120+
Bench started at Пн. февр. 15 17:58:14 MSK 2016
121+
astro5:{tps:93430.358394, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
122+
Bench finished at Пн. февр. 15 17:58:28 MSK 2016
123+
Bench started at Пн. февр. 15 17:59:11 MSK 2016
124+
astro5:{tps:81893.902409, transactions:1500000, selects:3000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:300, update_percent:0, accounts:500000, iterations:5000, hosts:3}
125+
Bench finished at Пн. февр. 15 17:59:29 MSK 2016
126+
Bench started at Пн. февр. 15 17:59:59 MSK 2016
127+
astro5:{tps:105707.597142, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
128+
Bench finished at Пн. февр. 15 18:00:09 MSK 2016
129+
Bench started at Пн. февр. 15 18:00:54 MSK 2016
130+
astro5:{tps:92668.464039, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
131+
Bench finished at Пн. февр. 15 18:01:08 MSK 2016
132+
Bench started at Пн. февр. 15 18:06:22 MSK 2016
133+
astro5:{tps:121069.442298, transactions:125000000, selects:250000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:500000, hosts:3}
134+
Bench finished at Пн. февр. 15 18:23:35 MSK 2016
135+
Bench started at Пн. февр. 15 18:24:11 MSK 2016
136+
astro5:{tps:122202.228254, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
137+
Bench finished at Пн. февр. 15 18:37:50 MSK 2016
138+
Bench started at Пн. февр. 15 18:44:02 MSK 2016
139+
astro5:{tps:121774.204222, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
140+
Bench finished at Пн. февр. 15 18:57:44 MSK 2016

‎tests/perf.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
-name:run transfers
4747
shell:>
4848
~/pg_cluster/install/bin/dtmbench {{connections}}
49-
-w {{ nconns }} -r 0 -n5000 -a 500000 -p {{ up }} |
49+
-w {{ nconns }} -r 0 -n500000 -a 500000 -p {{ up }} |
5050
tee -a perf.results |
5151
sed "s/^/`hostname`:/"
5252
register:transfers_result

‎tests/reinit-mm.sh

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ sep=""
1010
for((i=1;i<=n_nodes;i++))
1111
do
1212
port=$((5431+i))
13-
conn_str="$conn_str${sep}dbname=postgres host=127.0.0.1 port=$port sslmode=disable"
13+
conn_str="$conn_str${sep}dbname=postgres host=localhost port=$port sslmode=disable"
1414
sep=","
1515
initdb node$i
1616
done
1717

18-
echo Start DTM
19-
~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm2> dtm.log&
20-
sleep 2
18+
#echo Start DTM
19+
#~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm 2> dtm.log &
20+
#sleep 2
21+
echo"Starting nodes..."
2122

2223
echo Start nodes
2324
for((i=1;i<=n_nodes;i++))
@@ -31,7 +32,13 @@ do
3132
done
3233

3334
sleep 5
34-
echo Initialize database schema
35-
psql postgres -f init.sql
35+
echo"Create multimaster extension..."
36+
37+
for((i=1;i<=n_nodes;i++))
38+
do
39+
port=$((5431+i))
40+
psql postgres -p$port -c"create extension multimaster"
41+
done
42+
3643

3744
echo Done

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp