7
7
#include "storage/spin.h"
8
8
#include "storage/pg_sema.h"
9
9
#include "storage/shmem.h"
10
+ #include "datatype/timestamp.h"
10
11
11
12
#include "bgwpool.h"
12
13
13
14
bool MtmIsLogicalReceiver ;
15
+ int MtmMaxWorkers ;
14
16
15
- typedef struct
17
+ static void BgwPoolMainLoop ( BgwPool * pool )
16
18
{
17
- BgwPoolConstructor constructor ;
18
- int id ;
19
- }BgwPoolExecutorCtx ;
20
-
21
- static void BgwPoolMainLoop (Datum arg )
22
- {
23
- BgwPoolExecutorCtx * ctx = (BgwPoolExecutorCtx * )arg ;
24
- int id = ctx -> id ;
25
- BgwPool * pool = ctx -> constructor ();
26
19
int size ;
27
20
void * work ;
28
21
@@ -58,7 +51,7 @@ static void BgwPoolMainLoop(Datum arg)
58
51
pool -> lastPeakTime = 0 ;
59
52
}
60
53
SpinLockRelease (& pool -> lock );
61
- pool -> executor (id , work ,size );
54
+ pool -> executor (work ,size );
62
55
free (work );
63
56
SpinLockAcquire (& pool -> lock );
64
57
pool -> active -= 1 ;
@@ -84,6 +77,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
84
77
pool -> pending = 0 ;
85
78
pool -> nWorkers = nWorkers ;
86
79
pool -> lastPeakTime = 0 ;
80
+ pool -> lastDynamicWorkerStartTime = 0 ;
87
81
strncpy (pool -> dbname ,dbname ,MAX_DBNAME_LEN );
88
82
strncpy (pool -> dbuser ,dbuser ,MAX_DBUSER_LEN );
89
83
}
@@ -93,6 +87,17 @@ timestamp_t BgwGetLastPeekTime(BgwPool* pool)
93
87
return pool -> lastPeakTime ;
94
88
}
95
89
90
+ static void BgwPoolStaticWorkerMainLoop (Datum arg )
91
+ {
92
+ BgwPoolConstructor constructor = (BgwPoolConstructor )DatumGetPointer (arg );
93
+ BgwPoolMainLoop (constructor ());
94
+ }
95
+
96
+ static void BgwPoolDynamicWorkerMainLoop (Datum arg )
97
+ {
98
+ BgwPoolMainLoop ((BgwPool * )DatumGetPointer (arg ));
99
+ }
100
+
96
101
void BgwPoolStart (int nWorkers ,BgwPoolConstructor constructor )
97
102
{
98
103
int i ;
@@ -101,15 +106,12 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
101
106
MemSet (& worker ,0 ,sizeof (BackgroundWorker ));
102
107
worker .bgw_flags = BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION ;
103
108
worker .bgw_start_time = BgWorkerStart_ConsistentState ;
104
- worker .bgw_main = BgwPoolMainLoop ;
109
+ worker .bgw_main = BgwPoolStaticWorkerMainLoop ;
105
110
worker .bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT ;
106
111
107
112
for (i = 0 ;i < nWorkers ;i ++ ) {
108
- BgwPoolExecutorCtx * ctx = (BgwPoolExecutorCtx * )malloc (sizeof (BgwPoolExecutorCtx ));
109
113
snprintf (worker .bgw_name ,BGW_MAXLEN ,"bgw_pool_worker_%d" ,i + 1 );
110
- ctx -> id = i ;
111
- ctx -> constructor = constructor ;
112
- worker .bgw_main_arg = (Datum )ctx ;
114
+ worker .bgw_main_arg = PointerGetDatum (constructor );
113
115
RegisterBackgroundWorker (& worker );
114
116
}
115
117
}
@@ -124,14 +126,36 @@ size_t BgwPoolGetQueueSize(BgwPool* pool)
124
126
}
125
127
126
128
129
+ static void BgwStartExtraWorker (BgwPool * pool )
130
+ {
131
+ if (pool -> nWorkers < MtmMaxWorkers ) {
132
+ timestamp_t now = MtmGetSystemTime ();
133
+ if (pool -> lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT * USECS_PER_SEC < now ) {
134
+ BackgroundWorker worker ;
135
+ BackgroundWorkerHandle * handle ;
136
+ MemSet (& worker ,0 ,sizeof (BackgroundWorker ));
137
+ worker .bgw_flags = BGWORKER_SHMEM_ACCESS |BGWORKER_BACKEND_DATABASE_CONNECTION ;
138
+ worker .bgw_start_time = BgWorkerStart_ConsistentState ;
139
+ worker .bgw_main = BgwPoolDynamicWorkerMainLoop ;
140
+ worker .bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT ;
141
+ snprintf (worker .bgw_name ,BGW_MAXLEN ,"bgw_pool_dynworker_%d" , (int )++ pool -> nWorkers );
142
+ worker .bgw_main_arg = PointerGetDatum (pool );
143
+ pool -> lastDynamicWorkerStartTime = now ;
144
+ if (!RegisterDynamicBackgroundWorker (& worker ,& handle )) {
145
+ elog (WARNING ,"Failed to start dynamic background worker" );
146
+ }
147
+ }
148
+ }
149
+ }
150
+
127
151
void BgwPoolExecute (BgwPool * pool ,void * work ,size_t size )
128
152
{
129
153
if (size + 4 > pool -> size ) {
130
154
/*
131
155
* Size of work is larger than size of shared buffer:
132
156
* run it immediately
133
157
*/
134
- pool -> executor (0 , work ,size );
158
+ pool -> executor (work ,size );
135
159
return ;
136
160
}
137
161
@@ -149,6 +173,9 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
149
173
SpinLockAcquire (& pool -> lock );
150
174
}else {
151
175
pool -> pending += 1 ;
176
+ if (pool -> active == pool -> nWorkers ) {
177
+ BgwStartExtraWorker (pool );
178
+ }
152
179
if (pool -> lastPeakTime == 0 && pool -> active == pool -> nWorkers && pool -> pending != 0 ) {
153
180
pool -> lastPeakTime = MtmGetSystemTime ();
154
181
}