@@ -301,23 +301,16 @@ pglogical_receiver_main(Datum main_arg)
301
301
}
302
302
303
303
query = createPQExpBuffer ();
304
- if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm -> nodes [nodeId - 1 ].timeline )
305
- || mode == REPLMODE_CREATE_NEW )
306
- {/* recreate slot */
307
- timestamp_t start = MtmGetSystemTime ();
308
- appendPQExpBuffer (query ,"DROP_REPLICATION_SLOT \"%s\"" ,slotName );
309
- res = PQexec (conn ,query -> data );
310
- elog (LOG ,"Drop replication slot %s: %ld milliseconds" ,slotName , (long )USEC_TO_MSEC (MtmGetSystemTime ()- start ));
311
- PQclear (res );
312
- resetPQExpBuffer (query );
313
- timeline = Mtm -> nodes [nodeId - 1 ].timeline ;
314
- }
315
- /* My original assumption was that we can perfrom recovery only from existed slot,
316
- * but unfortunately looks like slots can "disapear" together with WAL-sender.
317
- * So let's try to recreate slot always. */
318
- /* if (mode != REPLMODE_REPLICATION) */
319
- {
320
- timestamp_t start = MtmGetSystemTime ();
304
+
305
+ /* Start logical replication at specified position */
306
+ originStartPos = replorigin_get_progress (originId , false);
307
+ if (originStartPos == INVALID_LSN ) {
308
+ /*
309
+ * We are just creating new replication slot.
310
+ * It is assumed that state of local and remote nodes is the same at this moment.
311
+ * They are either empty, either new node is synchronized using base_backup.
312
+ * So we assume that LSNs are the same for local and remote node
313
+ */
321
314
appendPQExpBuffer (query ,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" ,slotName ,MULTIMASTER_NAME );
322
315
res = PQexec (conn ,query -> data );
323
316
if (PQresultStatus (res )!= PGRES_TUPLES_OK )
@@ -331,30 +324,14 @@ pglogical_receiver_main(Datum main_arg)
331
324
gotoOnError ;
332
325
}
333
326
}
334
- elog (LOG ,"Recreate replication slot %s: %ld milliseconds" ,slotName , (long )USEC_TO_MSEC (MtmGetSystemTime ()- start ));
335
327
PQclear (res );
336
328
resetPQExpBuffer (query );
337
- }
338
-
339
- /* Start logical replication at specified position */
340
- if (originStartPos == INVALID_LSN ) {
341
- originStartPos = replorigin_get_progress (originId , false);
342
- if (originStartPos == INVALID_LSN ) {
343
- /*
344
- * We are just creating new replication slot.
345
- * It is assumed that state of local and remote nodes is the same at this moment.
346
- * Them are either empty, either new node is synchronized using base_backup.
347
- * So we assume that LSNs are the same for local and remote node
348
- */
349
- originStartPos = INVALID_LSN ;
350
- MTM_LOG1 ("Start logical receiver at position %llx from node %d" ,originStartPos ,nodeId );
351
- }else {
352
- if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
353
- MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" ,nodeId ,Mtm -> nodes [nodeId - 1 ].restartLSN ,originStartPos );
354
- Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
355
- }
356
- MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" ,originStartPos ,originId ,nodeId );
329
+ }else {
330
+ if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
331
+ MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" ,nodeId ,Mtm -> nodes [nodeId - 1 ].restartLSN ,originStartPos );
332
+ Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
357
333
}
334
+ MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" ,originStartPos ,originId ,nodeId );
358
335
}
359
336
360
337
MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
@@ -373,10 +350,21 @@ pglogical_receiver_main(Datum main_arg)
373
350
res = PQexec (conn ,query -> data );
374
351
if (PQresultStatus (res )!= PGRES_COPY_BOTH )
375
352
{
376
- PQclear (res );
377
- ereport (WARNING , (MTM_ERRMSG ("%s: Could not start logical replication" ,
378
- worker_proc )));
379
- gotoOnError ;
353
+ int i ,n_deleted_slots = 0 ;
354
+
355
+ elog (WARNING ,"Can't find slot on node%d. Shutting down receiver." ,nodeId );
356
+ Mtm -> nodes [nodeId - 1 ].slotDeleted = true;
357
+ for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
358
+ {
359
+ if (Mtm -> nodes [i ].slotDeleted )
360
+ n_deleted_slots ++ ;
361
+ }
362
+ if (n_deleted_slots == Mtm -> nAllNodes - 1 )
363
+ {
364
+ elog (WARNING ,"All neighbour nopes have no replication slot for us. Exiting." );
365
+ kill (PostmasterPid ,SIGTERM );
366
+ }
367
+ proc_exit (1 );
380
368
}
381
369
PQclear (res );
382
370
resetPQExpBuffer (query );