|
10 | 10 | *
|
11 | 11 | *
|
12 | 12 | * IDENTIFICATION
|
13 |
| - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $ |
| 13 | + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ |
14 | 14 | *
|
15 | 15 | *-------------------------------------------------------------------------
|
16 | 16 | */
|
@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
|
54 | 54 |
|
55 | 55 | /* Prototypes for private functions */
|
56 | 56 | staticboollibpq_select(inttimeout_ms);
|
| 57 | +staticPGresult*libpqrcv_PQexec(constchar*query); |
57 | 58 |
|
58 | 59 | /*
|
59 | 60 | * Module load callback
|
@@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
|
97 | 98 | * Get the system identifier and timeline ID as a DataRow message from the
|
98 | 99 | * primary server.
|
99 | 100 | */
|
100 |
| -res=PQexec(streamConn,"IDENTIFY_SYSTEM"); |
| 101 | +res=libpqrcv_PQexec("IDENTIFY_SYSTEM"); |
101 | 102 | if (PQresultStatus(res)!=PGRES_TUPLES_OK)
|
102 | 103 | {
|
103 | 104 | PQclear(res);
|
@@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
|
149 | 150 | /* Start streaming from the point requested by startup process */
|
150 | 151 | snprintf(cmd,sizeof(cmd),"START_REPLICATION %X/%X",
|
151 | 152 | startpoint.xlogid,startpoint.xrecoff);
|
152 |
| -res=PQexec(streamConn,cmd); |
| 153 | +res=libpqrcv_PQexec(cmd); |
153 | 154 | if (PQresultStatus(res)!=PGRES_COPY_OUT)
|
| 155 | +{ |
| 156 | +PQclear(res); |
154 | 157 | ereport(ERROR,
|
155 | 158 | (errmsg("could not start WAL streaming: %s",
|
156 | 159 | PQerrorMessage(streamConn))));
|
| 160 | +} |
157 | 161 | PQclear(res);
|
158 | 162 |
|
159 | 163 | justconnected= true;
|
@@ -224,6 +228,84 @@ libpq_select(int timeout_ms)
|
224 | 228 | return true;
|
225 | 229 | }
|
226 | 230 |
|
| 231 | +/* |
| 232 | + * Send a query and wait for the results by using the asynchronous libpq |
| 233 | + * functions and the backend version of select(). |
| 234 | + * |
| 235 | + * We must not use the regular blocking libpq functions like PQexec() |
| 236 | + * since they are uninterruptible by signals on some platforms, such as |
| 237 | + * Windows. |
| 238 | + * |
| 239 | + * We must also not use vanilla select() here since it cannot handle the |
| 240 | + * signal emulation layer on Windows. |
| 241 | + * |
| 242 | + * The function is modeled on PQexec() in libpq, but only implements |
| 243 | + * those parts that are in use in the walreceiver. |
| 244 | + * |
| 245 | + * Queries are always executed on the connection in streamConn. |
| 246 | + */ |
| 247 | +staticPGresult* |
| 248 | +libpqrcv_PQexec(constchar*query) |
| 249 | +{ |
| 250 | +PGresult*result=NULL; |
| 251 | +PGresult*lastResult=NULL; |
| 252 | + |
| 253 | +/* |
| 254 | + * PQexec() silently discards any prior query results on the |
| 255 | + * connection. This is not required for walreceiver since it's |
| 256 | + * expected that walsender won't generate any such junk results. |
| 257 | + */ |
| 258 | + |
| 259 | +/* |
| 260 | + * Submit a query. Since we don't use non-blocking mode, this also |
| 261 | + * can block. But its risk is relatively small, so we ignore that |
| 262 | + * for now. |
| 263 | + */ |
| 264 | +if (!PQsendQuery(streamConn,query)) |
| 265 | +returnNULL; |
| 266 | + |
| 267 | +for (;;) |
| 268 | +{ |
| 269 | +/* |
| 270 | + * Receive data until PQgetResult is ready to get the result |
| 271 | + * without blocking. |
| 272 | + */ |
| 273 | +while (PQisBusy(streamConn)) |
| 274 | +{ |
| 275 | +/* |
| 276 | + * We don't need to break down the sleep into smaller increments, |
| 277 | + * and check for interrupts after each nap, since we can just |
| 278 | + * elog(FATAL) within SIGTERM signal handler if the signal |
| 279 | + * arrives in the middle of establishment of replication connection. |
| 280 | + */ |
| 281 | +if (!libpq_select(-1)) |
| 282 | +continue;/* interrupted */ |
| 283 | +if (PQconsumeInput(streamConn)==0) |
| 284 | +returnNULL;/* trouble */ |
| 285 | +} |
| 286 | + |
| 287 | +/* |
| 288 | + * Emulate the PQexec()'s behavior of returning the last result |
| 289 | + * when there are many. |
| 290 | + * Since walsender will never generate multiple results, we skip |
| 291 | + * the concatenation of error messages. |
| 292 | + */ |
| 293 | +result=PQgetResult(streamConn); |
| 294 | +if (result==NULL) |
| 295 | +break;/* query is complete */ |
| 296 | + |
| 297 | +PQclear(lastResult); |
| 298 | +lastResult=result; |
| 299 | + |
| 300 | +if (PQresultStatus(lastResult)==PGRES_COPY_IN|| |
| 301 | +PQresultStatus(lastResult)==PGRES_COPY_OUT|| |
| 302 | +PQstatus(streamConn)==CONNECTION_BAD) |
| 303 | +break; |
| 304 | +} |
| 305 | + |
| 306 | +returnlastResult; |
| 307 | +} |
| 308 | + |
227 | 309 | /*
|
228 | 310 | * Disconnect connection to primary, if any.
|
229 | 311 | */
|
|