@@ -299,23 +299,16 @@ pglogical_receiver_main(Datum main_arg)
299
299
}
300
300
301
301
query = createPQExpBuffer ();
302
- if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm -> nodes [nodeId - 1 ].timeline )
303
- || mode == REPLMODE_CREATE_NEW )
304
- {/* recreate slot */
305
- timestamp_t start = MtmGetSystemTime ();
306
- appendPQExpBuffer (query ,"DROP_REPLICATION_SLOT \"%s\"" ,slotName );
307
- res = PQexec (conn ,query -> data );
308
- elog (LOG ,"Drop replication slot %s: %ld milliseconds" ,slotName , (long )USEC_TO_MSEC (MtmGetSystemTime ()- start ));
309
- PQclear (res );
310
- resetPQExpBuffer (query );
311
- timeline = Mtm -> nodes [nodeId - 1 ].timeline ;
312
- }
313
- /* My original assumption was that we can perfrom recovery only from existed slot,
314
- * but unfortunately looks like slots can "disapear" together with WAL-sender.
315
- * So let's try to recreate slot always. */
316
- /* if (mode != REPLMODE_REPLICATION) */
317
- {
318
- timestamp_t start = MtmGetSystemTime ();
302
+
303
+ /* Start logical replication at specified position */
304
+ originStartPos = replorigin_get_progress (originId , false);
305
+ if (originStartPos == INVALID_LSN ) {
306
+ /*
307
+ * We are just creating new replication slot.
308
+ * It is assumed that state of local and remote nodes is the same at this moment.
309
+ * They are either empty, either new node is synchronized using base_backup.
310
+ * So we assume that LSNs are the same for local and remote node
311
+ */
319
312
appendPQExpBuffer (query ,"CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"" ,slotName ,MULTIMASTER_NAME );
320
313
res = PQexec (conn ,query -> data );
321
314
if (PQresultStatus (res )!= PGRES_TUPLES_OK )
@@ -329,30 +322,14 @@ pglogical_receiver_main(Datum main_arg)
329
322
gotoOnError ;
330
323
}
331
324
}
332
- elog (LOG ,"Recreate replication slot %s: %ld milliseconds" ,slotName , (long )USEC_TO_MSEC (MtmGetSystemTime ()- start ));
333
325
PQclear (res );
334
326
resetPQExpBuffer (query );
335
- }
336
-
337
- /* Start logical replication at specified position */
338
- if (originStartPos == INVALID_LSN ) {
339
- originStartPos = replorigin_get_progress (originId , false);
340
- if (originStartPos == INVALID_LSN ) {
341
- /*
342
- * We are just creating new replication slot.
343
- * It is assumed that state of local and remote nodes is the same at this moment.
344
- * Them are either empty, either new node is synchronized using base_backup.
345
- * So we assume that LSNs are the same for local and remote node
346
- */
347
- originStartPos = INVALID_LSN ;
348
- MTM_LOG1 ("Start logical receiver at position %llx from node %d" ,originStartPos ,nodeId );
349
- }else {
350
- if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
351
- MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" ,nodeId ,Mtm -> nodes [nodeId - 1 ].restartLSN ,originStartPos );
352
- Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
353
- }
354
- MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" ,originStartPos ,originId ,nodeId );
327
+ }else {
328
+ if (Mtm -> nodes [nodeId - 1 ].restartLSN < originStartPos ) {
329
+ MTM_LOG1 ("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)" ,nodeId ,Mtm -> nodes [nodeId - 1 ].restartLSN ,originStartPos );
330
+ Mtm -> nodes [nodeId - 1 ].restartLSN = originStartPos ;
355
331
}
332
+ MTM_LOG1 ("Restart logical receiver at position %llx with origin=%d from node %d" ,originStartPos ,originId ,nodeId );
356
333
}
357
334
358
335
MTM_LOG1 ("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx" ,
@@ -371,10 +348,21 @@ pglogical_receiver_main(Datum main_arg)
371
348
res = PQexec (conn ,query -> data );
372
349
if (PQresultStatus (res )!= PGRES_COPY_BOTH )
373
350
{
374
- PQclear (res );
375
- ereport (WARNING , (MTM_ERRMSG ("%s: Could not start logical replication" ,
376
- worker_proc )));
377
- gotoOnError ;
351
+ int i ,n_deleted_slots = 0 ;
352
+
353
+ elog (WARNING ,"Can't find slot on node%d. Shutting down receiver." ,nodeId );
354
+ Mtm -> nodes [nodeId - 1 ].slotDeleted = true;
355
+ for (i = 0 ;i < Mtm -> nAllNodes ;i ++ )
356
+ {
357
+ if (Mtm -> nodes [i ].slotDeleted )
358
+ n_deleted_slots ++ ;
359
+ }
360
+ if (n_deleted_slots == Mtm -> nAllNodes - 1 )
361
+ {
362
+ elog (WARNING ,"All neighbour nopes have no replication slot for us. Exiting." );
363
+ kill (PostmasterPid ,SIGTERM );
364
+ }
365
+ proc_exit (1 );
378
366
}
379
367
PQclear (res );
380
368
resetPQExpBuffer (query );