Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit151af9e

Browse files
committed
Fix bug in deadlock detection algorithm
1 parent480cda0 commit151af9e

File tree

5 files changed

+44
-24
lines changed

5 files changed

+44
-24
lines changed

‎contrib/multimaster/bgwpool.c‎

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,23 @@
1212

1313
typedefstruct
1414
{
15-
BgwPool*pool;
15+
BgwPoolConstructorconstructor;
1616
intid;
17-
}BgwExecutorCtx;
17+
}BgwPoolExecutorCtx;
1818

1919
staticvoidBgwPoolMainLoop(Datumarg)
2020
{
21-
BgwExecutorCtx*ctx= (BgwExecutorCtx*)arg;
21+
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)arg;
2222
intid=ctx->id;
23-
BgwPool*pool=ctx->pool;
23+
BgwPool*pool=ctx->constructor();
2424
intsize;
2525
void*work;
2626

27+
BackgroundWorkerUnblockSignals();
2728
BackgroundWorkerInitializeConnection(pool->dbname,NULL);
2829

30+
elog(WARNING,"Start background worker %d",id);
31+
2932
while(true) {
3033
PGSemaphoreLock(&pool->available);
3134
SpinLockAcquire(&pool->lock);
@@ -52,11 +55,9 @@ static void BgwPoolMainLoop(Datum arg)
5255
}
5356
}
5457

55-
BgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers)
58+
voidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize)
5659
{
57-
inti;
58-
BackgroundWorkerworker;
59-
BgwPool*pool= (BgwPool*)ShmemAlloc(queueSize+sizeof(BgwPool));
60+
pool->queue= (char*)ShmemAlloc(queueSize);
6061
pool->executor=executor;
6162
PGSemaphoreCreate(&pool->available);
6263
PGSemaphoreCreate(&pool->overflow);
@@ -68,22 +69,27 @@ BgwPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
6869
pool->tail=0;
6970
pool->size=queueSize;
7071
strcpy(pool->dbname,dbname);
72+
}
73+
74+
voidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor)
75+
{
76+
inti;
77+
BackgroundWorkerworker;
7178

7279
MemSet(&worker,0,sizeof(BackgroundWorker));
7380
worker.bgw_flags=BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION;
7481
worker.bgw_start_time=BgWorkerStart_ConsistentState;
7582
worker.bgw_main=BgwPoolMainLoop;
7683
worker.bgw_restart_time=10;/* Wait 10 seconds for restart before crash */
77-
84+
7885
for (i=0;i<nWorkers;i++) {
79-
BgwExecutorCtx*ctx= (BgwExecutorCtx*)malloc(sizeof(BgwExecutorCtx));
86+
BgwPoolExecutorCtx*ctx= (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
8087
snprintf(worker.bgw_name,BGW_MAXLEN,"bgw_pool_worker_%d",i+1);
8188
ctx->id=i;
82-
ctx->pool=pool;
89+
ctx->constructor=constructor;
8390
worker.bgw_main_arg= (Datum)ctx;
8491
RegisterBackgroundWorker(&worker);
8592
}
86-
returnpool;
8793
}
8894

8995
voidBgwPoolExecute(BgwPool*pool,void*work,size_tsize)

‎contrib/multimaster/bgwpool.h‎

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
#ifndef__BGWPOOL_H__
22
#define__BGWPOOL_H__
33

4-
typedefvoid(*BgwExecutor)(intid,void*work,size_tsize);
4+
#include"storage/s_lock.h"
5+
#include"storage/spin.h"
6+
#include"storage/pg_sema.h"
7+
8+
typedefvoid(*BgwPoolExecutor)(intid,void*work,size_tsize);
59

610
#defineMAX_DBNAME_LEN 30
711

812
typedefstruct
913
{
10-
BgwExecutorexecutor;
14+
BgwPoolExecutorexecutor;
1115
volatileslock_tlock;
1216
PGSemaphoreDataavailable;
1317
PGSemaphoreDataoverflow;
1418
size_thead;
1519
size_ttail;
1620
size_tsize;
17-
boolproducerBlocked;
18-
chardbname[MAX_DBNAME_LEN];
19-
charqueue[1];
21+
boolproducerBlocked;
22+
chardbname[MAX_DBNAME_LEN];
23+
char*queue;
2024
}BgwPool;
2125

22-
externBgwPool*BgwPoolCreate(BgwExecutorexecutor,charconst*dbname,size_tqueueSize,intnWorkers);
26+
typedefBgwPool*(*BgwPoolConstructor)(void);
27+
28+
externvoidBgwPoolStart(intnWorkers,BgwPoolConstructorconstructor);
29+
30+
externvoidBgwPoolInit(BgwPool*pool,BgwPoolExecutorexecutor,charconst*dbname,size_tqueueSize);
2331

2432
externvoidBgwPoolExecute(BgwPool*pool,void*work,size_tsize);
2533

‎contrib/multimaster/dtmd/src/ddd.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids)
116116
if (--e->dst->nIncomingEdges==0&&l2_list_is_empty(&e->dst->outgoingEdges)) {
117117
freeVertex(graph,e->dst);
118118
}
119-
if (e->src->nIncomingEdges==0&&l2_list_is_empty(&e->src->outgoingEdges)) {
119+
if (e->dst!=e->src&&e->src->nIncomingEdges==0&&l2_list_is_empty(&e->src->outgoingEdges)) {
120120
freeVertex(graph,e->src);
121121
}
122122
freeEdge(graph,e);

‎contrib/multimaster/multimaster.c‎

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ typedef struct
6161
intnNodes;
6262
pg_atomic_uint32nReceivers;
6363
boolinitialized;
64-
BgwPool*pool;
64+
BgwPoolpool;
6565
}DtmState;
6666

6767
typedefstruct
@@ -104,6 +104,7 @@ static void DtmBackgroundWorker(Datum arg);
104104

105105
staticvoidMMMarkTransAsLocal(TransactionIdxid);
106106
staticvoidMMExecutor(intid,void*work,size_tsize);
107+
staticBgwPool*MMPoolConstructor(void);
107108

108109
staticshmem_startup_hook_typeprev_shmem_startup_hook;
109110

@@ -716,7 +717,7 @@ static void DtmInitialize()
716717
dtm->nNodes=MMNodes;
717718
pg_atomic_write_u32(&dtm->nReceivers,0);
718719
dtm->initialized= false;
719-
dtm->pool=BgwPoolCreate(MMExecutor,MMDatabaseName,MMQueueSize,MMWorkers);
720+
BgwPoolInit(&dtm->pool,MMExecutor,MMDatabaseName,MMQueueSize);
720721
RegisterXactCallback(DtmXactCallback,NULL);
721722
}
722723
LWLockRelease(AddinShmemInitLock);
@@ -953,6 +954,7 @@ _PG_init(void)
953954
if (MMNodes<2) {
954955
elog(ERROR,"Multimaster should have at least two nodes");
955956
}
957+
BgwPoolStart(MMWorkers,MMPoolConstructor);
956958

957959
if (DtmBufferSize!=0)
958960
{
@@ -1203,7 +1205,6 @@ static void MMExecutor(int id, void* work, size_t size)
12031205
SPI_finish();
12041206
PopActiveSnapshot();
12051207
if (rc!=SPI_OK_INSERT&&rc!=SPI_OK_UPDATE&&rc!=SPI_OK_DELETE) {
1206-
FlushErrorState();
12071208
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
12081209
id,xid)));
12091210
AbortCurrentTransaction();
@@ -1213,6 +1214,7 @@ static void MMExecutor(int id, void* work, size_t size)
12131214
}
12141215
PG_CATCH();
12151216
{
1217+
FlushErrorState();
12161218
if (rc==SPI_ERROR_TRANSACTION) {
12171219
SPI_finish();
12181220
PopActiveSnapshot();
@@ -1224,6 +1226,10 @@ static void MMExecutor(int id, void* work, size_t size)
12241226

12251227
externvoidMMExecute(void*work,intsize)
12261228
{
1227-
BgwPoolExecute(dtm->pool,work,size);
1229+
BgwPoolExecute(&dtm->pool,work,size);
12281230
}
12291231

1232+
staticBgwPool*MMPoolConstructor(void)
1233+
{
1234+
return&dtm->pool;
1235+
}

‎contrib/pg_dtm/dtmd/src/ddd.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ void addSubgraph(Graph* graph, nodeid_t node_id, xid_t* xids, int n_xids)
116116
if (--e->dst->nIncomingEdges==0&&l2_list_is_empty(&e->dst->outgoingEdges)) {
117117
freeVertex(graph,e->dst);
118118
}
119-
if (e->src->nIncomingEdges==0&&l2_list_is_empty(&e->src->outgoingEdges)) {
119+
if (e->dst!=e->src&&e->src->nIncomingEdges==0&&l2_list_is_empty(&e->src->outgoingEdges)) {
120120
freeVertex(graph,e->src);
121121
}
122122
freeEdge(graph,e);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp