@@ -302,13 +302,15 @@ WalSndShutdown(void)
302
302
static void
303
303
IdentifySystem (void )
304
304
{
305
- StringInfoData buf ;
306
305
char sysid [32 ];
307
- char tli [11 ];
308
306
char xpos [MAXFNAMELEN ];
309
307
XLogRecPtr logptr ;
310
308
char * dbname = NULL ;
311
- Size len ;
309
+ DestReceiver * dest ;
310
+ TupOutputState * tstate ;
311
+ TupleDesc tupdesc ;
312
+ Datum values [4 ];
313
+ bool nulls [4 ];
312
314
313
315
/*
314
316
* Reply with a result set with one row, four columns. First col is system
@@ -328,8 +330,6 @@ IdentifySystem(void)
328
330
else
329
331
logptr = GetFlushRecPtr ();
330
332
331
- snprintf (tli ,sizeof (tli ),"%u" ,ThisTimeLineID );
332
-
333
333
snprintf (xpos ,sizeof (xpos ),"%X/%X" , (uint32 ) (logptr >>32 ), (uint32 )logptr );
334
334
335
335
if (MyDatabaseId != InvalidOid )
@@ -346,79 +346,42 @@ IdentifySystem(void)
346
346
MemoryContextSwitchTo (cur );
347
347
}
348
348
349
- /* Send a RowDescription message */
350
- pq_beginmessage (& buf ,'T' );
351
- pq_sendint (& buf ,4 ,2 );/* 4 fields */
352
-
353
- /* first field */
354
- pq_sendstring (& buf ,"systemid" );/* col name */
355
- pq_sendint (& buf ,0 ,4 );/* table oid */
356
- pq_sendint (& buf ,0 ,2 );/* attnum */
357
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
358
- pq_sendint (& buf ,-1 ,2 );/* typlen */
359
- pq_sendint (& buf ,0 ,4 );/* typmod */
360
- pq_sendint (& buf ,0 ,2 );/* format code */
361
-
362
- /* second field */
363
- pq_sendstring (& buf ,"timeline" );/* col name */
364
- pq_sendint (& buf ,0 ,4 );/* table oid */
365
- pq_sendint (& buf ,0 ,2 );/* attnum */
366
- pq_sendint (& buf ,INT4OID ,4 );/* type oid */
367
- pq_sendint (& buf ,4 ,2 );/* typlen */
368
- pq_sendint (& buf ,0 ,4 );/* typmod */
369
- pq_sendint (& buf ,0 ,2 );/* format code */
370
-
371
- /* third field */
372
- pq_sendstring (& buf ,"xlogpos" );/* col name */
373
- pq_sendint (& buf ,0 ,4 );/* table oid */
374
- pq_sendint (& buf ,0 ,2 );/* attnum */
375
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
376
- pq_sendint (& buf ,-1 ,2 );/* typlen */
377
- pq_sendint (& buf ,0 ,4 );/* typmod */
378
- pq_sendint (& buf ,0 ,2 );/* format code */
349
+ dest = CreateDestReceiver (DestRemoteSimple );
350
+ MemSet (nulls , false,sizeof (nulls ));
379
351
380
- /* fourth field */
381
- pq_sendstring (& buf ,"dbname" );/* col name */
382
- pq_sendint (& buf ,0 ,4 );/* table oid */
383
- pq_sendint (& buf ,0 ,2 );/* attnum */
384
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
385
- pq_sendint (& buf ,-1 ,2 );/* typlen */
386
- pq_sendint (& buf ,0 ,4 );/* typmod */
387
- pq_sendint (& buf ,0 ,2 );/* format code */
388
- pq_endmessage (& buf );
352
+ /* need a tuple descriptor representing four columns */
353
+ tupdesc = CreateTemplateTupleDesc (4 , false);
354
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )1 ,"systemid" ,
355
+ TEXTOID ,-1 ,0 );
356
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )2 ,"timeline" ,
357
+ INT4OID ,-1 ,0 );
358
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )3 ,"xlogpos" ,
359
+ TEXTOID ,-1 ,0 );
360
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )4 ,"dbname" ,
361
+ TEXTOID ,-1 ,0 );
389
362
390
- /* Send a DataRow message */
391
- pq_beginmessage (& buf ,'D' );
392
- pq_sendint (& buf ,4 ,2 );/* # of columns */
363
+ /* prepare for projection of tuples */
364
+ tstate = begin_tup_output_tupdesc (dest ,tupdesc );
393
365
394
366
/* column 1: system identifier */
395
- len = strlen (sysid );
396
- pq_sendint (& buf ,len ,4 );
397
- pq_sendbytes (& buf , (char * )& sysid ,len );
367
+ values [0 ]= CStringGetTextDatum (sysid );
398
368
399
369
/* column 2: timeline */
400
- len = strlen (tli );
401
- pq_sendint (& buf ,len ,4 );
402
- pq_sendbytes (& buf , (char * )tli ,len );
370
+ values [1 ]= Int32GetDatum (ThisTimeLineID );
403
371
404
372
/* column 3: xlog position */
405
- len = strlen (xpos );
406
- pq_sendint (& buf ,len ,4 );
407
- pq_sendbytes (& buf , (char * )xpos ,len );
373
+ values [2 ]= CStringGetTextDatum (xpos );
408
374
409
375
/* column 4: database name, or NULL if none */
410
376
if (dbname )
411
- {
412
- len = strlen (dbname );
413
- pq_sendint (& buf ,len ,4 );
414
- pq_sendbytes (& buf , (char * )dbname ,len );
415
- }
377
+ values [3 ]= CStringGetTextDatum (dbname );
416
378
else
417
- {
418
- pq_sendint (& buf ,-1 ,4 );
419
- }
379
+ nulls [3 ]= true;
420
380
421
- pq_endmessage (& buf );
381
+ /* send it to dest */
382
+ do_tup_output (tstate ,values ,nulls );
383
+
384
+ end_tup_output (tstate );
422
385
}
423
386
424
387
@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
695
658
*/
696
659
if (sendTimeLineIsHistoric )
697
660
{
698
- char tli_str [11 ];
699
661
char startpos_str [8 + 1 + 8 + 1 ];
700
- Size len ;
662
+ DestReceiver * dest ;
663
+ TupOutputState * tstate ;
664
+ TupleDesc tupdesc ;
665
+ Datum values [2 ];
666
+ bool nulls [2 ];
701
667
702
- snprintf (tli_str ,sizeof (tli_str ),"%u" ,sendTimeLineNextTLI );
703
668
snprintf (startpos_str ,sizeof (startpos_str ),"%X/%X" ,
704
669
(uint32 ) (sendTimeLineValidUpto >>32 ),
705
670
(uint32 )sendTimeLineValidUpto );
706
671
707
- pq_beginmessage (& buf ,'T' );/* RowDescription */
708
- pq_sendint (& buf ,2 ,2 );/* 2 fields */
709
-
710
- /* Field header */
711
- pq_sendstring (& buf ,"next_tli" );
712
- pq_sendint (& buf ,0 ,4 );/* table oid */
713
- pq_sendint (& buf ,0 ,2 );/* attnum */
672
+ dest = CreateDestReceiver (DestRemoteSimple );
673
+ MemSet (nulls , false,sizeof (nulls ));
714
674
715
675
/*
676
+ * Need a tuple descriptor representing two columns.
716
677
* int8 may seem like a surprising data type for this, but in theory
717
678
* int4 would not be wide enough for this, as TimeLineID is unsigned.
718
679
*/
719
- pq_sendint (& buf ,INT8OID ,4 );/* type oid */
720
- pq_sendint (& buf ,-1 ,2 );
721
- pq_sendint (& buf ,0 ,4 );
722
- pq_sendint (& buf ,0 ,2 );
723
-
724
- pq_sendstring (& buf ,"next_tli_startpos" );
725
- pq_sendint (& buf ,0 ,4 );/* table oid */
726
- pq_sendint (& buf ,0 ,2 );/* attnum */
727
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
728
- pq_sendint (& buf ,-1 ,2 );
729
- pq_sendint (& buf ,0 ,4 );
730
- pq_sendint (& buf ,0 ,2 );
731
- pq_endmessage (& buf );
680
+ tupdesc = CreateTemplateTupleDesc (2 , false);
681
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )1 ,"next_tli" ,
682
+ INT8OID ,-1 ,0 );
683
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )2 ,"next_tli_startpos" ,
684
+ TEXTOID ,-1 ,0 );
732
685
733
- /* Data row */
734
- pq_beginmessage (& buf ,'D' );
735
- pq_sendint (& buf ,2 ,2 );/* number of columns */
686
+ /* prepare for projection of tuple */
687
+ tstate = begin_tup_output_tupdesc (dest ,tupdesc );
736
688
737
- len = strlen (tli_str );
738
- pq_sendint (& buf ,len ,4 );/* length */
739
- pq_sendbytes (& buf ,tli_str ,len );
689
+ values [0 ]= Int64GetDatum ((int64 )sendTimeLineNextTLI );
690
+ values [1 ]= CStringGetTextDatum (startpos_str );
740
691
741
- len = strlen (startpos_str );
742
- pq_sendint (& buf ,len ,4 );/* length */
743
- pq_sendbytes (& buf ,startpos_str ,len );
692
+ /* send it to dest */
693
+ do_tup_output (tstate ,values ,nulls );
744
694
745
- pq_endmessage ( & buf );
695
+ end_tup_output ( tstate );
746
696
}
747
697
748
698
/* Send CommandComplete message */
@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
790
740
{
791
741
const char * snapshot_name = NULL ;
792
742
char xpos [MAXFNAMELEN ];
793
- StringInfoData buf ;
794
- Size len ;
743
+ char * slot_name ;
744
+ DestReceiver * dest ;
745
+ TupOutputState * tstate ;
746
+ TupleDesc tupdesc ;
747
+ Datum values [4 ];
748
+ bool nulls [4 ];
795
749
796
750
Assert (!MyReplicationSlot );
797
751
@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
868
822
(uint32 ) (MyReplicationSlot -> data .confirmed_flush >>32 ),
869
823
(uint32 )MyReplicationSlot -> data .confirmed_flush );
870
824
871
- pq_beginmessage (& buf ,'T' );
872
- pq_sendint (& buf ,4 ,2 );/* 4 fields */
873
-
874
- /* first field: slot name */
875
- pq_sendstring (& buf ,"slot_name" );/* col name */
876
- pq_sendint (& buf ,0 ,4 );/* table oid */
877
- pq_sendint (& buf ,0 ,2 );/* attnum */
878
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
879
- pq_sendint (& buf ,-1 ,2 );/* typlen */
880
- pq_sendint (& buf ,0 ,4 );/* typmod */
881
- pq_sendint (& buf ,0 ,2 );/* format code */
882
-
883
- /* second field: LSN at which we became consistent */
884
- pq_sendstring (& buf ,"consistent_point" );/* col name */
885
- pq_sendint (& buf ,0 ,4 );/* table oid */
886
- pq_sendint (& buf ,0 ,2 );/* attnum */
887
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
888
- pq_sendint (& buf ,-1 ,2 );/* typlen */
889
- pq_sendint (& buf ,0 ,4 );/* typmod */
890
- pq_sendint (& buf ,0 ,2 );/* format code */
891
-
892
- /* third field: exported snapshot's name */
893
- pq_sendstring (& buf ,"snapshot_name" );/* col name */
894
- pq_sendint (& buf ,0 ,4 );/* table oid */
895
- pq_sendint (& buf ,0 ,2 );/* attnum */
896
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
897
- pq_sendint (& buf ,-1 ,2 );/* typlen */
898
- pq_sendint (& buf ,0 ,4 );/* typmod */
899
- pq_sendint (& buf ,0 ,2 );/* format code */
900
-
901
- /* fourth field: output plugin */
902
- pq_sendstring (& buf ,"output_plugin" );/* col name */
903
- pq_sendint (& buf ,0 ,4 );/* table oid */
904
- pq_sendint (& buf ,0 ,2 );/* attnum */
905
- pq_sendint (& buf ,TEXTOID ,4 );/* type oid */
906
- pq_sendint (& buf ,-1 ,2 );/* typlen */
907
- pq_sendint (& buf ,0 ,4 );/* typmod */
908
- pq_sendint (& buf ,0 ,2 );/* format code */
825
+ dest = CreateDestReceiver (DestRemoteSimple );
826
+ MemSet (nulls , false,sizeof (nulls ));
909
827
910
- pq_endmessage (& buf );
911
-
912
- /* Send a DataRow message */
913
- pq_beginmessage (& buf ,'D' );
914
- pq_sendint (& buf ,4 ,2 );/* # of columns */
828
+ /*
829
+ * Need a tuple descriptor representing four columns:
830
+ * - first field: the slot name
831
+ * - second field: LSN at which we became consistent
832
+ * - third field: exported snapshot's name
833
+ * - fourth field: output plugin
834
+ */
835
+ tupdesc = CreateTemplateTupleDesc (4 , false);
836
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )1 ,"slot_name" ,
837
+ TEXTOID ,-1 ,0 );
838
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )2 ,"consistent_point" ,
839
+ TEXTOID ,-1 ,0 );
840
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )3 ,"snapshot_name" ,
841
+ TEXTOID ,-1 ,0 );
842
+ TupleDescInitBuiltinEntry (tupdesc , (AttrNumber )4 ,"output_plugin" ,
843
+ TEXTOID ,-1 ,0 );
844
+
845
+ /* prepare for projection of tuples */
846
+ tstate = begin_tup_output_tupdesc (dest ,tupdesc );
915
847
916
848
/* slot_name */
917
- len = strlen (NameStr (MyReplicationSlot -> data .name ));
918
- pq_sendint (& buf ,len ,4 );/* col1 len */
919
- pq_sendbytes (& buf ,NameStr (MyReplicationSlot -> data .name ),len );
849
+ slot_name = NameStr (MyReplicationSlot -> data .name );
850
+ values [0 ]= CStringGetTextDatum (slot_name );
920
851
921
852
/* consistent wal location */
922
- len = strlen (xpos );
923
- pq_sendint (& buf ,len ,4 );
924
- pq_sendbytes (& buf ,xpos ,len );
853
+ values [1 ]= CStringGetTextDatum (xpos );
925
854
926
855
/* snapshot name, or NULL if none */
927
856
if (snapshot_name != NULL )
928
- {
929
- len = strlen (snapshot_name );
930
- pq_sendint (& buf ,len ,4 );
931
- pq_sendbytes (& buf ,snapshot_name ,len );
932
- }
857
+ values [2 ]= CStringGetTextDatum (snapshot_name );
933
858
else
934
- pq_sendint ( & buf , -1 , 4 ) ;
859
+ nulls [ 2 ] = true ;
935
860
936
861
/* plugin, or NULL if none */
937
862
if (cmd -> plugin != NULL )
938
- {
939
- len = strlen (cmd -> plugin );
940
- pq_sendint (& buf ,len ,4 );
941
- pq_sendbytes (& buf ,cmd -> plugin ,len );
942
- }
863
+ values [3 ]= CStringGetTextDatum (cmd -> plugin );
943
864
else
944
- pq_sendint ( & buf , -1 , 4 ) ;
865
+ nulls [ 3 ] = true ;
945
866
946
- pq_endmessage (& buf );
867
+ /* send it to dest */
868
+ do_tup_output (tstate ,values ,nulls );
869
+ end_tup_output (tstate );
947
870
948
871
ReplicationSlotRelease ();
949
872
}