1
+ #include "postgres.h"
2
+ #include "fmgr.h"
3
+ #include "miscadmin.h"
4
+ #include "postmaster/postmaster.h"
5
+ #include "postmaster/bgworker.h"
6
+ #include "storage/s_lock.h"
7
+ #include "storage/spin.h"
8
+ #include "storage/pg_sema.h"
9
+ #include "storage/shmem.h"
10
+
1
11
#include "bgwpool.h"
2
12
3
13
typedef struct
@@ -6,7 +16,7 @@ typedef struct
6
16
int id ;
7
17
}BgwExecutorCtx ;
8
18
9
- static void BgwMainLoop (Datum arg )
19
+ static void BgwPoolMainLoop (Datum arg )
10
20
{
11
21
BgwExecutorCtx * ctx = (BgwExecutorCtx * )arg ;
12
22
int id = ctx -> id ;
@@ -19,34 +29,34 @@ static void BgwMainLoop(Datum arg)
19
29
while (true) {
20
30
PGSemaphoreLock (& pool -> available );
21
31
SpinLockAcquire (& pool -> lock );
22
- Assert ( pool -> head != pool -> tail ) ;
23
- size = ( int * ) & pool -> queue [ pool -> head ] ;
24
- void * work = palloc (len );
32
+ size = * ( int * ) & pool -> queue [ pool -> head ] ;
33
+ Assert ( size < pool -> size ) ;
34
+ work = palloc (size );
25
35
if (pool -> head + size + 4 > pool -> size ) {
26
36
memcpy (work ,pool -> queue ,size );
27
- pool -> head = (size & 3 ) & ~ 3 ;
37
+ pool -> head = INTALIGN (size ) ;
28
38
}else {
29
39
memcpy (work ,& pool -> queue [pool -> head + 4 ],size );
30
- pool -> head += 4 + (( size & 3 ) & ~ 3 );
40
+ pool -> head += 4 + INTALIGN ( size );
31
41
}
32
42
if (pool -> size == pool -> head ) {
33
43
pool -> head = 0 ;
34
44
}
35
45
if (pool -> producerBlocked ) {
36
- PGSemaphoreUnlock (& pool -> overflow );
37
46
pool -> producerBlocked = false;
47
+ PGSemaphoreUnlock (& pool -> overflow );
38
48
}
39
49
SpinLockRelease (& pool -> lock );
40
50
pool -> executor (id ,work ,size );
41
51
pfree (work );
42
52
}
43
53
}
44
54
45
- BGWPool * BgwPoolCreate (BgwExecutor executor ,char const * dbname ,size_t queueSize ,size_t nWorkers );
55
+ BgwPool * BgwPoolCreate (BgwExecutor executor ,char const * dbname ,size_t queueSize ,int nWorkers )
46
56
{
47
57
int i ;
48
58
BackgroundWorker worker ;
49
- BGWPool * pool = (BGWPool * )ShmemAlloc (queueSize + sizeof (BGWPool ));
59
+ BgwPool * pool = (BgwPool * )ShmemAlloc (queueSize + sizeof (BgwPool ));
50
60
pool -> executor = executor ;
51
61
PGSemaphoreCreate (& pool -> available );
52
62
PGSemaphoreCreate (& pool -> overflow );
@@ -76,13 +86,13 @@ BGWPool* BgwPoolCreate(BgwExecutor executor, char const* dbname, size_t queueSiz
76
86
return pool ;
77
87
}
78
88
79
- void BgwPoolExecute (BgwPool * pool ,void * work ,size_t size );
89
+ void BgwPoolExecute (BgwPool * pool ,void * work ,size_t size )
80
90
{
81
91
Assert (size + 4 <=pool -> size );
82
92
83
93
SpinLockAcquire (& pool -> lock );
84
94
while (true) {
85
- if ((pool -> head < pool -> tail && pool -> size - pool -> tail < size + 4 && pool -> head < size )
95
+ if ((pool -> head <= pool -> tail && pool -> size - pool -> tail < size + 4 && pool -> head < size )
86
96
|| (pool -> head > pool -> tail && pool -> head - pool -> tail < size + 4 ))
87
97
{
88
98
pool -> producerBlocked = true;
@@ -93,13 +103,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
93
103
* (int * )& pool -> queue [pool -> tail ]= size ;
94
104
if (pool -> size - pool -> tail >=size + 4 ) {
95
105
memcpy (& pool -> queue [pool -> tail + 4 ],work ,size );
96
- pool -> tail += 4 + (size + 3 ) & ~ 3 ;
106
+ pool -> tail += 4 + INTALIGN (size ) ;
97
107
}else {
98
108
memcpy (pool -> queue ,work ,size );
99
- pool -> tail = (size + 3 )& ~3 ;
109
+ pool -> tail = INTALIGN (size );
110
+ }
111
+ if (pool -> tail == pool -> size ) {
112
+ pool -> tail = 0 ;
100
113
}
101
114
PGSemaphoreUnlock (& pool -> available );
115
+ break ;
102
116
}
103
117
}
118
+ SpinLockRelease (& pool -> lock );
104
119
}
105
120