Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc19018d

Browse files
committed
Add bgwpool
1 parent13682d8 commitc19018d

File tree

8 files changed

+150
-112
lines changed

8 files changed

+150
-112
lines changed

‎contrib/multimaster/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.o sockhub/sockhub.o
2+
OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.obytebuf.o bgwpool.osockhub/sockhub.o
33

44
EXTENSION = multimaster
55
DATA = multimaster--1.0.sql

‎contrib/multimaster/bgwpool.c‎

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
#include"postgres.h"
2+
#include"fmgr.h"
3+
#include"miscadmin.h"
4+
#include"postmaster/postmaster.h"
5+
#include"postmaster/bgworker.h"
6+
#include"storage/s_lock.h"
7+
#include"storage/spin.h"
8+
#include"storage/pg_sema.h"
9+
#include"storage/shmem.h"
10+
111
#include"bgwpool.h"
212

313
typedefstruct
@@ -6,7 +16,7 @@ typedef struct
616
intid;
717
}BgwExecutorCtx;
818

9-
staticvoidBgwMainLoop(Datumarg)
19+
staticvoidBgwPoolMainLoop(Datumarg)
1020
{
1121
BgwExecutorCtx*ctx= (BgwExecutorCtx*)arg;
1222
intid=ctx->id;
@@ -19,34 +29,34 @@ static void BgwMainLoop(Datum arg)
1929
while(true) {
2030
PGSemaphoreLock(&pool->available);
2131
SpinLockAcquire(&pool->lock);
22-
Assert(pool->head!=pool->tail);
23-
size= (int*)&pool->queue[pool->head];
24-
void*work=palloc(len);
32+
size=*(int*)&pool->queue[pool->head];
33+
Assert(size<pool->size);
34+
work=palloc(size);
2535
if (pool->head+size+4>pool->size) {
2636
memcpy(work,pool->queue,size);
27-
pool->head= (size&3)& ~3;
37+
pool->head=INTALIGN(size);
2838
}else {
2939
memcpy(work,&pool->queue[pool->head+4],size);
30-
pool->head+=4+((size&3)& ~3);
40+
pool->head+=4+INTALIGN(size);
3141
}
3242
if (pool->size==pool->head) {
3343
pool->head=0;
3444
}
3545
if (pool->producerBlocked) {
36-
PGSemaphoreUnlock(&pool->overflow);
3746
pool->producerBlocked= false;
47+
PGSemaphoreUnlock(&pool->overflow);
3848
}
3949
SpinLockRelease(&pool->lock);
4050
pool->executor(id,work,size);
4151
pfree(work);
4252
}
4353
}
4454

45-
BGWPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,size_tnWorkers);
55+
BgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers)
4656
{
4757
inti;
4858
BackgroundWorkerworker;
49-
BGWPool*pool= (BGWPool*)ShmemAlloc(queueSize+sizeof(BGWPool));
59+
BgwPool*pool= (BgwPool*)ShmemAlloc(queueSize+sizeof(BgwPool));
5060
pool->executor=executor;
5161
PGSemaphoreCreate(&pool->available);
5262
PGSemaphoreCreate(&pool->overflow);
@@ -76,13 +86,13 @@ BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
7686
returnpool;
7787
}
7888

79-
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
89+
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize)
8090
{
8191
Assert(size+4 <=pool->size);
8292

8393
SpinLockAcquire(&pool->lock);
8494
while (true) {
85-
if ((pool->head<pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
95+
if ((pool->head <=pool->tail&&pool->size-pool->tail<size+4&&pool->head<size)
8696
|| (pool->head>pool->tail&&pool->head-pool->tail<size+4))
8797
{
8898
pool->producerBlocked= true;
@@ -93,13 +103,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
93103
*(int*)&pool->queue[pool->tail]=size;
94104
if (pool->size-pool->tail >=size+4) {
95105
memcpy(&pool->queue[pool->tail+4],work,size);
96-
pool->tail+=4+ (size+3)& ~3;
106+
pool->tail+=4+INTALIGN(size);
97107
}else {
98108
memcpy(pool->queue,work,size);
99-
pool->tail= (size+3)& ~3;
109+
pool->tail=INTALIGN(size);
110+
}
111+
if (pool->tail==pool->size) {
112+
pool->tail=0;
100113
}
101114
PGSemaphoreUnlock(&pool->available);
115+
break;
102116
}
103117
}
118+
SpinLockRelease(&pool->lock);
104119
}
105120

‎contrib/multimaster/bgwpool.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ typedef struct
1919
charqueue[1];
2020
}BgwPool;
2121

22-
BgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers);
22+
externBgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers);
2323

24-
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
24+
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
2525

2626
#endif

‎contrib/multimaster/bytebuf.c‎

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include"postgres.h"
2+
#include"bytebuf.h"
3+
4+
#defineINIT_BUF_SIZE 1024
5+
6+
voidByteBufferAlloc(ByteBuffer*buf)
7+
{
8+
buf->size=INIT_BUF_SIZE;
9+
buf->data=palloc(buf->size);
10+
buf->used=0;
11+
}
12+
13+
voidByteBufferAppend(ByteBuffer*buf,void*data,intlen)
14+
{
15+
if (buf->used+len>buf->size) {
16+
buf->size=buf->used+len>buf->size*2 ?buf->used+len :buf->size*2;
17+
buf->data= (char*)repalloc(buf->data,buf->size);
18+
}
19+
memcpy(&buf->data[buf->used],data,len);
20+
buf->used+=len;
21+
}
22+
23+
voidByteBufferAppendInt32(ByteBuffer*buf,intdata)
24+
{
25+
ByteBufferAppend(buf,&data,sizeofdata);
26+
}
27+
28+
voidByteBufferFree(ByteBuffer*buf)
29+
{
30+
pfree(buf->data);
31+
}
32+
33+
voidByteBufferReset(ByteBuffer*buf)
34+
{
35+
buf->used=0;
36+
}

‎contrib/multimaster/bytebuf.h‎

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#ifndef__BYTEBUF_H__
2+
#define__BYTEBUF_H__
3+
4+
typedefstruct
5+
{
6+
char*data;
7+
intsize;
8+
intused;
9+
}ByteBuffer;
10+
11+
externvoidByteBufferAlloc(ByteBuffer*buf);
12+
externvoidByteBufferAppend(ByteBuffer*buf,void*data,intlen);
13+
externvoidByteBufferAppendInt32(ByteBuffer*buf,intdata);
14+
externvoidByteBufferFree(ByteBuffer*buf);
15+
externvoidByteBufferReset(ByteBuffer*buf);
16+
17+
#endif

‎contrib/multimaster/multimaster.c‎

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include"access/xlog.h"
2929
#include"storage/proc.h"
3030
#include"storage/procarray.h"
31+
#include"executor/spi.h"
3132
#include"executor/executor.h"
3233
#include"access/twophase.h"
3334
#include"utils/guc.h"
@@ -48,6 +49,7 @@
4849

4950
#include"libdtm.h"
5051
#include"multimaster.h"
52+
#include"bgwpool.h"
5153

5254
typedefstruct
5355
{
@@ -59,16 +61,9 @@ typedef struct
5961
intnNodes;
6062
pg_atomic_uint32nReceivers;
6163
boolinitialized;
62-
64+
BgwPool*pool;
6365
}DtmState;
6466

65-
typedefstruct
66-
{
67-
char*data;
68-
intsize;
69-
intused;
70-
}ByteBuffer;
71-
7267
typedefstruct
7368
{
7469
TransactionIdxid;
@@ -107,12 +102,8 @@ static bool TransactionIdIsInDoubt(TransactionId xid);
107102
staticvoidDtmShmemStartup(void);
108103
staticvoidDtmBackgroundWorker(Datumarg);
109104

110-
staticvoidByteBufferAlloc(ByteBuffer*buf);
111-
staticvoidByteBufferAppend(ByteBuffer*buf,void*data,intlen);
112-
staticvoidByteBufferAppendInt32(ByteBuffer*buf,intdata);
113-
staticvoidByteBufferFree(ByteBuffer*buf);
114-
115105
staticvoidMMMarkTransAsLocal(TransactionIdxid);
106+
staticvoidMMExecutor(intid,void*work,size_tsize);
116107

117108
staticshmem_startup_hook_typeprev_shmem_startup_hook;
118109

@@ -139,6 +130,7 @@ static TransactionManager DtmTM = {
139130
};
140131

141132
boolMMDoReplication;
133+
char*MMDatabaseName;
142134

143135
staticchar*MMConnStrs;
144136
staticintMMNodeId;
@@ -147,8 +139,8 @@ static int MMQueueSize;
147139
staticintMMWorkers;
148140

149141
staticchar*DtmHost;
150-
staticintDtmPort;
151-
staticintDtmBufferSize;
142+
staticintDtmPort;
143+
staticintDtmBufferSize;
152144

153145
staticExecutorFinish_hook_typePreviousExecutorFinishHook=NULL;
154146
staticvoidMMExecutorFinish(QueryDesc*queryDesc);
@@ -1092,33 +1084,6 @@ void DtmBackgroundWorker(Datum arg)
10921084
ShubLoop(&shub);
10931085
}
10941086

1095-
staticvoidByteBufferAlloc(ByteBuffer*buf)
1096-
{
1097-
buf->size=1024;
1098-
buf->data=palloc(buf->size);
1099-
buf->used=0;
1100-
}
1101-
1102-
staticvoidByteBufferAppend(ByteBuffer*buf,void*data,intlen)
1103-
{
1104-
if (buf->used+len>buf->size) {
1105-
buf->size=buf->used+len>buf->size*2 ?buf->used+len :buf->size*2;
1106-
buf->data= (char*)repalloc(buf->data,buf->size);
1107-
}
1108-
memcpy(&buf->data[buf->used],data,len);
1109-
buf->used+=len;
1110-
}
1111-
1112-
staticvoidByteBufferAppendInt32(ByteBuffer*buf,intdata)
1113-
{
1114-
ByteBufferAppend(buf,&data,sizeofdata);
1115-
}
1116-
1117-
staticvoidByteBufferFree(ByteBuffer*buf)
1118-
{
1119-
pfree(buf->data);
1120-
}
1121-
11221087
staticvoidDtmSerializeLock(PROCLOCK*proclock,void*arg)
11231088
{
11241089
ByteBuffer*buf= (ByteBuffer*)arg;
@@ -1219,3 +1184,45 @@ MMExecutorFinish(QueryDesc *queryDesc)
12191184
}
12201185
}
12211186

1187+
staticvoidMMExecutor(intid,void*work,size_tsize)
1188+
{
1189+
TransactionIdxid=*(TransactionId*)work;
1190+
char*stmts= (char*)work+4;
1191+
intrc=SPI_ERROR_TRANSACTION;
1192+
MMJoinTransaction(xid);
1193+
1194+
SetCurrentStatementStartTimestamp();
1195+
StartTransactionCommand();
1196+
SPI_connect();
1197+
PushActiveSnapshot(GetTransactionSnapshot());
1198+
1199+
PG_TRY();
1200+
{
1201+
rc=SPI_execute(stmts, false,0);
1202+
SPI_finish();
1203+
PopActiveSnapshot();
1204+
if (rc!=SPI_OK_INSERT&&rc!=SPI_OK_UPDATE&&rc!=SPI_OK_DELETE) {
1205+
FlushErrorState();
1206+
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
1207+
id,xid)));
1208+
AbortCurrentTransaction();
1209+
}else {
1210+
CommitTransactionCommand();
1211+
}
1212+
}
1213+
PG_CATCH();
1214+
{
1215+
if (rc==SPI_ERROR_TRANSACTION) {
1216+
SPI_finish();
1217+
PopActiveSnapshot();
1218+
}
1219+
AbortCurrentTransaction();
1220+
}
1221+
PG_END_TRY();
1222+
}
1223+
1224+
externvoidMMExecute(void*work,intsize)
1225+
{
1226+
BgwPoolExecute(dtm->pool,work,size);
1227+
}
1228+

‎contrib/multimaster/multimaster.h‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef__MULTIMASTER_H__
22
#define__MULTIMASTER_H__
33

4+
#include"bytebuf.h"
5+
46
#defineXTM_TRACE(fmt, ...)
57
#defineXTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
68
//#define XTM_INFO(fmt, ...)
@@ -10,7 +12,8 @@ extern void MMBeginTransaction(void);
1012
externvoidMMJoinTransaction(TransactionIdxid);
1113
externboolMMIsLocalTransaction(TransactionIdxid);
1214
externvoidMMReceiverStarted(void);
15+
externvoidMMExecute(void*work,intsize);
1316

14-
externcharconst*MMDatabaseName;
17+
externchar*MMDatabaseName;
1518

1619
#endif

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp