@@ -244,7 +244,7 @@ pglogical_receiver_main(Datum main_arg)
244
244
resetPQExpBuffer (query );
245
245
246
246
/* Start logical replication at specified position */
247
- appendPQExpBuffer (query ,"START_REPLICATION SLOT \"%s\" LOGICAL 0/0 " ,
247
+ appendPQExpBuffer (query ,"START_REPLICATION SLOT \"%s\" LOGICAL 0/0(\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1') " ,
248
248
args -> receiver_slot );
249
249
res = PQexec (conn ,query -> data );
250
250
if (PQresultStatus (res )!= PGRES_COPY_BOTH )
@@ -376,51 +376,49 @@ pglogical_receiver_main(Datum main_arg)
376
376
walEnd = fe_recvint64 (& copybuf [hdr_len ]);
377
377
hdr_len += 8 ;/* WALEnd */
378
378
hdr_len += 8 ;/* sendTime */
379
- if (rc < hdr_len + 1 )
380
- {
381
- ereport (LOG , (errmsg ("%s: Streaming header too small" ,
382
- worker_proc )));
383
- proc_exit (1 );
384
- }
385
379
386
- stmt = copybuf + hdr_len ;
380
+ /*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
381
+
382
+ Assert (rc >=hdr_len );
383
+
384
+ if (rc > hdr_len )
385
+ {
386
+ stmt = copybuf + hdr_len ;
387
387
388
388
#ifdef USE_PGLOGICAL_OUTPUT
389
- ByteBufferAppend (& buf ,stmt ,rc - hdr_len );
390
- if (stmt [0 ]== 'C' )
391
- {
392
- MMExecute (buf .data ,buf .used );
393
- ByteBufferReset (& buf );
394
- }
389
+ ByteBufferAppend (& buf ,stmt ,rc - hdr_len );
390
+ if (stmt [0 ]== 'C' )/* commit */
391
+ {
392
+ MMExecute (buf .data ,buf .used );
393
+ ByteBufferReset (& buf );
394
+ }
395
395
#else
396
- if (strncmp (stmt ,"BEGIN " ,6 )== 0 ) {
397
- TransactionId xid ;
398
- int rc = sscanf (stmt + 6 ,"%u" ,& xid );
399
- Assert (rc == 1 );
400
- ByteBufferAppendInt32 (& buf ,xid );
401
- Assert (!insideTrans );
402
- insideTrans = true;
403
- }else if (strncmp (stmt ,"COMMIT;" ,7 )== 0 ) {
404
- Assert (insideTrans );
405
- Assert (buf .used > 4 );
406
- buf .data [buf .used - 1 ]= '\0' ;/* replace last ';' with '\0' to make string zero terminated */
407
- MMExecute (buf .data ,buf .used );
408
- ByteBufferReset (& buf );
409
- insideTrans = false;
410
- }else {
411
- Assert (insideTrans );
412
- ByteBufferAppend (& buf ,stmt ,rc - hdr_len /*strlen(stmt)*/ );
413
- }
396
+ if (strncmp (stmt ,"BEGIN " ,6 )== 0 ) {
397
+ TransactionId xid ;
398
+ int rc = sscanf (stmt + 6 ,"%u" ,& xid );
399
+ Assert (rc == 1 );
400
+ ByteBufferAppendInt32 (& buf ,xid );
401
+ Assert (!insideTrans );
402
+ insideTrans = true;
403
+ }else if (strncmp (stmt ,"COMMIT;" ,7 )== 0 ) {
404
+ Assert (insideTrans );
405
+ Assert (buf .used > 4 );
406
+ buf .data [buf .used - 1 ]= '\0' ;/* replace last ';' with '\0' to make string zero terminated */
407
+ MMExecute (buf .data ,buf .used );
408
+ ByteBufferReset (& buf );
409
+ insideTrans = false;
410
+ }else {
411
+ Assert (insideTrans );
412
+ ByteBufferAppend (& buf ,stmt ,rc - hdr_len /*strlen(stmt)*/ );
413
+ }
414
414
#endif
415
+ }
415
416
/* Update written position */
416
417
output_written_lsn = Max (walEnd ,output_written_lsn );
417
418
output_fsync_lsn = output_written_lsn ;
418
419
output_applied_lsn = output_written_lsn ;
419
420
}
420
421
421
- /* Finish process */
422
- pgstat_report_activity (STATE_IDLE ,NULL );
423
-
424
422
/* No data, move to next loop */
425
423
if (rc == 0 )
426
424
{