Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitac43c4e

Browse files
committed
Add mmts
0 parents  commitac43c4e

File tree

55 files changed

+8364
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+8364
-0
lines changed

‎Makefile

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
MODULE_big = multimaster
2+
OBJS = multimaster.o ../arbiter/lib/libarbiter.a ../arbiter/sockhub/libsockhub.a bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o
3+
#OBJS = multimaster.o pglogical_receiver.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
4+
5+
overrideCPPFLAGS += -I../arbiter/api -I../arbiter/sockhub
6+
7+
EXTENSION = multimaster
8+
DATA = multimaster--1.0.sql
9+
10+
.PHONY: all
11+
12+
all: multimaster.o multimaster.so
13+
14+
../arbiter/sockhub/libsockhub.a:
15+
make -C ../arbiter/sockhub
16+
17+
../arbiter/lib/libarbiter.a:
18+
make -C ../arbiter
19+
20+
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
21+
SHLIB_LINK =$(libpq)
22+
23+
ifdefUSE_PGXS
24+
PG_CONFIG = pg_config
25+
PGXS :=$(shell$(PG_CONFIG) --pgxs)
26+
include$(PGXS)
27+
else
28+
subdir = contrib/multimaster
29+
top_builddir = ../..
30+
include$(top_builddir)/src/Makefile.global
31+
include$(top_srcdir)/contrib/contrib-global.mk
32+
endif
33+

‎README.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#pg_dtm
2+
3+
###Design
4+
5+
This repo implements distributed transaction manager using Snapshot Sharing mechanism. General concepts and alternative approaches described in postgres wikihttps://wiki.postgresql.org/wiki/DTM.
6+
7+
Backend-DTM protocol description can be found in[dtmd/README](dtmd/README).
8+
9+
###Installation
10+
11+
* Patch postgres using xtm.patch. After that build and install postgres in usual way.
12+
```bash
13+
cd~/code/postgres
14+
patch -p1<~/code/pg_dtm/xtm.patch
15+
```
16+
* Install pg_dtm extension.
17+
```bash
18+
export PATH=/path/to/pgsql/bin/:$PATH
19+
cd~/code/pg_dtm
20+
make&& make install
21+
```
22+
* Run dtmd.
23+
```bash
24+
cd~/code/pg_dtm/dtmd
25+
make
26+
mkdir /tmp/clog
27+
./bin/dtmd&
28+
```
29+
* To run something meaningful you need at leat two postgres instances. Also pg_dtm requires presense in```shared_preload_libraries```.
30+
```bash
31+
initdb -D ./install/data1
32+
initdb -D ./install/data2
33+
echo"port = 5433">> ./install/data2/postgresql.conf
34+
echo"shared_preload_libraries = 'pg_dtm'">> ./install/data1/postgresql.conf
35+
echo"shared_preload_libraries = 'pg_dtm'">> ./install/data2/postgresql.conf
36+
pg_ctl -D ./install/data1 -l ./install/data1/log start
37+
pg_ctl -D ./install/data2 -l ./install/data2/log start
38+
```
39+
40+
####Automatic provisioning
41+
42+
For a cluster-wide deploy we use ansible, more details in tests/deploy_layouts. (Ansible instructions will be later)
43+
44+
###Usage
45+
46+
Now cluster is running and you can use global tx between two nodes. Let's connect to postgres instances at different ports:
47+
48+
```sql
49+
create extension pg_dtm;-- node1
50+
createtableaccounts(user_idint, amountint);-- node1
51+
insert into accounts (select2*generate_series(1,100)-1,0);-- node1, odd user_id's
52+
create extension pg_dtm;-- node2
53+
createtableaccounts(user_idint, amountint);-- node2
54+
insert into accounts (select2*generate_series(1,100),0);-- node2, even user_id's
55+
select dtm_begin_transaction();-- node1, returns global xid, e.g. 42
56+
select dtm_join_transaction(42);-- node2, join global tx
57+
begin;-- node1
58+
begin;-- node2
59+
update accountsset amount=amount-100where user_id=1;-- node1, transfer money from user#1
60+
update accountsset amount=amount+100where user_id=2;-- node2, to user#2
61+
commit;-- node1, blocks until second commit happend
62+
commit;-- node2
63+
```
64+
65+
###Consistency testing
66+
67+
To ensure consistency we use simple bank test: perform a lot of simultaneous transfers between accounts on different servers, while constantly checking total amount of money on all accounts. This test can be found in tests/perf.
68+
69+
```bash
70+
> go run ./tests/perf/*
71+
-C value
72+
Connection string (repeatfor multiple connections)
73+
-a int
74+
The number of bank accounts (default 100000)
75+
-b string
76+
Backend to use. Possible optinos: transfers, fdw, pgshard, readers. (default"transfers")
77+
-gUse DTM to keep global consistency
78+
-iInit database
79+
-lUse'repeatable read' isolation level instead of'read committed'
80+
-n int
81+
The number updates each writer (readerincase of Reades backend) performs (default 10000)
82+
-pUse parallel execs
83+
-r int
84+
The number of readers (default 1)
85+
-s int
86+
StartID. Script will update rows starting from this value
87+
-vShow progress and other stufffor mortals
88+
-w int
89+
The number of writers (default 8)
90+
```
91+
92+
So previous installation can be initialized with:
93+
```
94+
go run ./tests/perf/*.go \
95+
-C"dbname=postgres port=5432" \
96+
-C"dbname=postgres port=5433" \
97+
-g -i
98+
```
99+
and tested with:
100+
```
101+
go run ./tests/perf/*.go \
102+
-C"dbname=postgres port=5432" \
103+
-C"dbname=postgres port=5433" \
104+
-g
105+
```
106+
107+
### Using with postres_fdw.
108+
109+
We also provide a patch, that enables support of global transactions with postres_fdw. After patching and installing postres_fdw it is possible to run sametest via fdw usig key```-b fdw```.
110+
111+
### Using with pg_shard
112+
113+
Citus Data have branchin their pg_shard repo, that interacts with transaction manager. https://github.com/citusdata/pg_shard/tree/transaction_manager_integration
114+
To use this feature one should have following linein postgresql.conf (orset it via GUC)
115+
```
116+
pg_shard.use_dtm_transactions = 1
117+
```

‎bgwpool.c

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#include"postgres.h"
2+
#include"fmgr.h"
3+
#include"miscadmin.h"
4+
#include"postmaster/postmaster.h"
5+
#include"postmaster/bgworker.h"
6+
#include"storage/s_lock.h"
7+
#include"storage/spin.h"
8+
#include"storage/pg_sema.h"
9+
#include"storage/shmem.h"
10+
11+
#include"bgwpool.h"
12+
13+
typedefstruct
14+
{
15+
BgwPoolConstructorconstructor;
16+
intid;
17+
}BgwPoolExecutorCtx;
18+
19+
size_tn_snapshots;
20+
size_tn_active;
21+
22+
staticvoidBgwPoolMainLoop(Datumarg)
23+
{
24+
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)arg;
25+
intid=ctx->id;
26+
BgwPool*pool=ctx->constructor();
27+
intsize;
28+
void*work;
29+
30+
BackgroundWorkerUnblockSignals();
31+
BackgroundWorkerInitializeConnection(pool->dbname,NULL);
32+
33+
elog(WARNING,"Start background worker %d",id);
34+
35+
while(true) {
36+
PGSemaphoreLock(&pool->available);
37+
SpinLockAcquire(&pool->lock);
38+
size=*(int*)&pool->queue[pool->head];
39+
Assert(size<pool->size);
40+
work=palloc(size);
41+
pool->active-=1;
42+
if (pool->head+size+4>pool->size) {
43+
memcpy(work,pool->queue,size);
44+
pool->head=INTALIGN(size);
45+
}else {
46+
memcpy(work,&pool->queue[pool->head+4],size);
47+
pool->head+=4+INTALIGN(size);
48+
}
49+
if (pool->size==pool->head) {
50+
pool->head=0;
51+
}
52+
if (pool->producerBlocked) {
53+
pool->producerBlocked= false;
54+
PGSemaphoreUnlock(&pool->overflow);
55+
}
56+
SpinLockRelease(&pool->lock);
57+
pool->executor(id,work,size);
58+
pfree(work);
59+
}
60+
}
61+
62+
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize)
63+
{
64+
pool->queue= (char*)ShmemAlloc(queueSize);
65+
pool->executor=executor;
66+
PGSemaphoreCreate(&pool->available);
67+
PGSemaphoreCreate(&pool->overflow);
68+
PGSemaphoreReset(&pool->available);
69+
PGSemaphoreReset(&pool->overflow);
70+
SpinLockInit(&pool->lock);
71+
pool->producerBlocked= false;
72+
pool->head=0;
73+
pool->tail=0;
74+
pool->size=queueSize;
75+
pool->active=0;
76+
strcpy(pool->dbname,dbname);
77+
}
78+
79+
voidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor)
80+
{
81+
inti;
82+
BackgroundWorkerworker;
83+
84+
MemSet(&worker,0,sizeof(BackgroundWorker));
85+
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
86+
worker.bgw_start_time=BgWorkerStart_ConsistentState;
87+
worker.bgw_main=BgwPoolMainLoop;
88+
worker.bgw_restart_time=10;/* Wait 10 seconds for restart before crash */
89+
90+
for (i=0;i<nWorkers;i++) {
91+
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
92+
snprintf(worker.bgw_name,BGW_MAXLEN,"bgw_pool_worker_%d",i+1);
93+
ctx->id=i;
94+
ctx->constructor=constructor;
95+
worker.bgw_main_arg= (Datum)ctx;
96+
RegisterBackgroundWorker(&worker);
97+
}
98+
}
99+
100+
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize)
101+
{
102+
Assert(size+4 <=pool->size);
103+
104+
SpinLockAcquire(&pool->lock);
105+
while (true) {
106+
if ((pool->head <=pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
107+
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
108+
{
109+
pool->producerBlocked= true;
110+
SpinLockRelease(&pool->lock);
111+
PGSemaphoreLock(&pool->overflow);
112+
SpinLockAcquire(&pool->lock);
113+
}else {
114+
pool->active+=1;
115+
n_snapshots+=1;
116+
n_active+=pool->active;
117+
*(int*)&pool->queue[pool->tail]=size;
118+
if (pool->size-pool->tail >=size+4) {
119+
memcpy(&pool->queue[pool->tail+4],work,size);
120+
pool->tail+=4+INTALIGN(size);
121+
}else {
122+
memcpy(pool->queue,work,size);
123+
pool->tail=INTALIGN(size);
124+
}
125+
if (pool->tail==pool->size) {
126+
pool->tail=0;
127+
}
128+
PGSemaphoreUnlock(&pool->available);
129+
break;
130+
}
131+
}
132+
SpinLockRelease(&pool->lock);
133+
}
134+

‎bgwpool.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#ifndef__BGWPOOL_H__
2+
#define__BGWPOOL_H__
3+
4+
#include"storage/s_lock.h"
5+
#include"storage/spin.h"
6+
#include"storage/pg_sema.h"
7+
8+
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
9+
10+
#defineMAX_DBNAME_LEN 30
11+
12+
typedefstruct
13+
{
14+
BgwPoolExecutorexecutor;
15+
volatileslock_tlock;
16+
PGSemaphoreDataavailable;
17+
PGSemaphoreDataoverflow;
18+
size_thead;
19+
size_ttail;
20+
size_tsize;
21+
size_tactive;
22+
boolproducerBlocked;
23+
chardbname[MAX_DBNAME_LEN];
24+
char*queue;
25+
}BgwPool;
26+
27+
typedefBgwPool*(*BgwPoolConstructor)(void);
28+
29+
externvoidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor);
30+
31+
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize);
32+
33+
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
34+
35+
#endif

‎bytebuf.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include"postgres.h"
2+
#include"bytebuf.h"
3+
4+
#defineINIT_BUF_SIZE 1024
5+
6+
voidByteBufferAlloc(ByteBuffer*buf)
7+
{
8+
buf->size=INIT_BUF_SIZE;
9+
buf->data=palloc(buf->size);
10+
buf->used=0;
11+
}
12+
13+
voidByteBufferAppend(ByteBuffer*buf,void*data,intlen)
14+
{
15+
if (buf->used+len>buf->size) {
16+
buf->size=buf->used+len>buf->size*2 ?buf->used+len :buf->size*2;
17+
buf->data= (char*)repalloc(buf->data,buf->size);
18+
}
19+
memcpy(&buf->data[buf->used],data,len);
20+
buf->used+=len;
21+
}
22+
23+
voidByteBufferAppendInt32(ByteBuffer*buf,intdata)
24+
{
25+
ByteBufferAppend(buf,&data,sizeofdata);
26+
}
27+
28+
voidByteBufferFree(ByteBuffer*buf)
29+
{
30+
pfree(buf->data);
31+
}
32+
33+
voidByteBufferReset(ByteBuffer*buf)
34+
{
35+
buf->used=0;
36+
}

‎bytebuf.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef__BYTEBUF_H__
2+
#define__BYTEBUF_H__
3+
4+
typedefstruct
5+
{
6+
char*data;
7+
intsize;
8+
intused;
9+
}ByteBuffer;
10+
11+
externvoidByteBufferAlloc(ByteBuffer*buf);
12+
externvoidByteBufferAppend(ByteBuffer*buf,void*data,intlen);
13+
externvoidByteBufferAppendInt32(ByteBuffer*buf,intdata);
14+
externvoidByteBufferFree(ByteBuffer*buf);
15+
externvoidByteBufferReset(ByteBuffer*buf);
16+
17+
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp