Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5c4fd94

Browse files
committed
Add bgwpool
1 parentdce42c7 commit5c4fd94

File tree

5 files changed

+173
-6
lines changed

5 files changed

+173
-6
lines changed

‎contrib/multimaster/bgwpool.c‎

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#include"bgwpool.h"
2+
3+
typedefstruct
4+
{
5+
BgwPool*pool;
6+
intid;
7+
}BgwExecutorCtx;
8+
9+
staticvoidBgwMainLoop(Datumarg)
10+
{
11+
BgwExecutorCtx*ctx= (BgwExecutorCtx*)arg;
12+
intid=ctx->id;
13+
BgwPool*pool=ctx->pool;
14+
intsize;
15+
void*work;
16+
17+
BackgroundWorkerInitializeConnection(pool->dbname,NULL);
18+
19+
while(true) {
20+
PGSemaphoreLock(&pool->available);
21+
SpinLockAcquire(&pool->lock);
22+
Assert(pool->head!=pool->tail);
23+
size= (int*)&pool->buf[pool->head];
24+
void*work=palloc(len);
25+
if (pool->head+size+4>pool->size) {
26+
memcpy(work,pool->buf,size);
27+
pool->head= (size&3)& ~3;
28+
}else {
29+
memcpy(work,&pool->buf[pool->head+4],size);
30+
pool->head+=4+ ((size&3)& ~3);
31+
}
32+
if (pool->size==pool->head) {
33+
pool->head=0;
34+
}
35+
if (pool->producerBlocked) {
36+
PGSemaphoreUnlock(&pool->overflow);
37+
pool->producerBlocked= false;
38+
}
39+
SpinLockRelease(&pool->lock);
40+
pool->executor(id,work,size);
41+
pfree(work);
42+
}
43+
}
44+
45+
BGWPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tbufSize,size_tnWorkers);
46+
{
47+
inti;
48+
BackgroundWorkerworker;
49+
BGWPool*pool= (BGWPool*)ShmemAlloc(bufSize+sizeof(BGWPool));
50+
pool->executor=executor;
51+
PGSemaphoreCreate(&pool->available);
52+
PGSemaphoreCreate(&pool->overflow);
53+
PGSemaphoreReset(&pool->available);
54+
PGSemaphoreReset(&pool->overflow);
55+
SpinLockInit(&pool->lock);
56+
pool->producerBlocked= false;
57+
pool->head=0;
58+
pool->tail=0;
59+
pool->size=bufSize;
60+
strcpy(pool->dbname,dbname);
61+
62+
MemSet(&worker,0,sizeof(BackgroundWorker));
63+
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
64+
worker.bgw_start_time=BgWorkerStart_ConsistentState;
65+
worker.bgw_main=BgwPoolMainLoop;
66+
worker.bgw_restart_time=10;/* Wait 10 seconds for restart before crash */
67+
68+
for (i=0;i<nWorkers;i++) {
69+
BgwExecutorCtx*ctx= (BgwExecutorCtx*)malloc(sizeof(BgwExecutorCtx));
70+
snprintf(worker.bgw_name,BGW_MAXLEN,"bgw_pool_worker_%d",i+1);
71+
ctx->id=i;
72+
ctx->pool=pool;
73+
worker.bgw_main_arg= (Datum)ctx;
74+
RegisterBackgroundWorker(&worker);
75+
}
76+
returnpool;
77+
}
78+
79+
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
80+
{
81+
Assert(size+4 <=pool->size);
82+
83+
SpinLockAcquire(&pool->lock);
84+
while (true) {
85+
if ((pool->head<pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
86+
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
87+
{
88+
pool->producerBlocked= true;
89+
SpinLockRelease(&pool->lock);
90+
PGSemaphoreLock(&pool->overflow);
91+
SpinLockAcquire(&pool->lock);
92+
}else {
93+
*(int*)&pool->buf[pool->tail]=size;
94+
if (pool->size-pool->tail >=size+4) {
95+
memcpy(&pool->buf[pool->tail+4],work,size);
96+
pool->tail+=4+ (size+3)& ~3;
97+
}else {
98+
memcpy(pool->buf,work,size);
99+
pool->tail= (size+3)& ~3;
100+
}
101+
PGSemaphoreUnlock(&pool->available);
102+
}
103+
}
104+
}
105+

‎contrib/multimaster/bgwpool.h‎

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#ifndef__BGWPOOL_H__
2+
#define__BGWPOOL_H__
3+
4+
typedefvoid(*BgwExecutor)(intid,void*work,size_tsize);
5+
6+
#defineMAX_DBNAME_LEN 30
7+
8+
typedefstruct
9+
{
10+
BgwExecutorexecutor;
11+
volatileslock_tlock;
12+
PGSemaphoreDataavailable;
13+
PGSemaphoreDataoverflow;
14+
size_thead;
15+
size_ttail;
16+
size_tsize;
17+
boolproducerBlocked;
18+
chardbname[MAX_DBNAME_LEN];
19+
charqueue[1];
20+
}BgwPool;
21+
22+
BgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers);
23+
24+
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
25+
26+
#endif

‎contrib/multimaster/multimaster.c‎

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef struct
5959
intnNodes;
6060
pg_atomic_uint32nReceivers;
6161
boolinitialized;
62+
6263
}DtmState;
6364

6465
typedefstruct
@@ -142,6 +143,8 @@ bool MMDoReplication;
142143
staticchar*MMConnStrs;
143144
staticintMMNodeId;
144145
staticintMMNodes;
146+
staticintMMQueueSize;
147+
staticintMMWorkers;
145148

146149
staticchar*DtmHost;
147150
staticintDtmPort;
@@ -837,6 +840,36 @@ _PG_init(void)
837840
RequestAddinShmemSpace(DTM_SHMEM_SIZE);
838841
RequestAddinLWLocks(2);
839842

843+
DefineCustomIntVariable(
844+
"multimaster.workers",
845+
"Number of multimaster executor workers per node",
846+
NULL,
847+
&MMWorkers,
848+
8,
849+
1,
850+
INT_MAX,
851+
PGC_BACKEND,
852+
0,
853+
NULL,
854+
NULL,
855+
NULL
856+
);
857+
858+
DefineCustomIntVariable(
859+
"multimaster.queue_size",
860+
"Multimaster queue size",
861+
NULL,
862+
&MMQueueSize,
863+
1024*1024,
864+
1024,
865+
INT_MAX,
866+
PGC_BACKEND,
867+
0,
868+
NULL,
869+
NULL,
870+
NULL
871+
);
872+
840873
DefineCustomIntVariable(
841874
"multimaster.local_xid_reserve",
842875
"Number of XIDs reserved by node for local transactions",
@@ -927,6 +960,8 @@ _PG_init(void)
927960
if (MMNodes<2) {
928961
elog(ERROR,"Multimaster should have at least two nodes");
929962
}
963+
dtm->pool=BgwPoolCreate(MMExecutor,MMDatabaseName,MMQueueSize,MMWorkers);
964+
930965
if (DtmBufferSize!=0)
931966
{
932967
DtmGlobalConfig(NULL,DtmPort,Unix_socket_directories);

‎contrib/multimaster/multimaster.h‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ extern void MMJoinTransaction(TransactionId xid);
1111
externboolMMIsLocalTransaction(TransactionIdxid);
1212
externvoidMMReceiverStarted(void);
1313

14+
externcharconst*MMDatabaseName;
15+
1416
#endif

‎contrib/multimaster/receiver_raw.c‎

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ static volatile sig_atomic_t got_sigterm = false;
4747
staticvolatilesig_atomic_tgot_sighup= false;
4848

4949
/* GUC variables */
50-
staticchar*receiver_database;
5150
staticintreceiver_idle_time=1;
5251
staticboolreceiver_sync_mode= true;
5352

@@ -215,7 +214,7 @@ receiver_raw_main(Datum main_arg)
215214
BackgroundWorkerUnblockSignals();
216215

217216
/* Connect to a database */
218-
BackgroundWorkerInitializeConnection(receiver_database,NULL);
217+
BackgroundWorkerInitializeConnection(MMDatabaseName,NULL);
219218

220219
/* Establish connection to remote server */
221220
conn=PQconnectdb(args->receiver_conn_string);
@@ -561,17 +560,17 @@ int MMStartReceivers(char* conns, int node_id)
561560
}
562561
if (++i!=node_id) {
563562
ReceiverArgs*ctx= (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
564-
if (receiver_database==NULL) {
563+
if (MMDatabaseName==NULL) {
565564
char*dbname=strstr(conn_str,"dbname=");
566565
char*eon;
567566
intlen;
568567
Assert(dbname!=NULL);
569568
dbname+=7;
570569
eon=strchr(dbname,' ');
571570
len=eon-dbname;
572-
receiver_database= (char*)malloc(len+1);
573-
memcpy(receiver_database,dbname,len);
574-
receiver_database[len]='\0';
571+
MMDatabaseName= (char*)malloc(len+1);
572+
memcpy(MMDatabaseName,dbname,len);
573+
MMDatabaseName[len]='\0';
575574
}
576575
*p='\0';
577576
ctx->receiver_conn_string=conn_str;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp