Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitbbd8550

Browse files
committed
Refactor other replication commands to use DestRemoteSimple.
Commita84069d added a new type ofDestReceiver to avoid duplicating the existing code for the SHOWcommand, but it turns out we can leverage that new DestReceivertype in a few more places, saving some code.Michael Paquier, reviewed by Andres Freund and by me.Discussion:http://postgr.es/m/CAB7nPqSdFOQC0evc0r1nJeQyGBqjBrR41MC4rcMqUUpoJaZbtQ%40mail.gmail.comDiscussion:http://postgr.es/m/CAB7nPqT2K4XFT1JgqufFBjsOc-NUKXg5qBDucHPMbk6Xi1kYaA@mail.gmail.com
1 parentc3e3844 commitbbd8550

File tree

3 files changed

+114
-162
lines changed

3 files changed

+114
-162
lines changed

‎src/backend/access/common/printsimple.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include"catalog/pg_type.h"
2323
#include"fmgr.h"
2424
#include"libpq/pqformat.h"
25+
#include"utils/builtins.h"
2526

2627
/*
2728
* At startup time, send a RowDescription message.
@@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
99100
}
100101
break;
101102

103+
caseINT4OID:
104+
{
105+
int32num=DatumGetInt32(value);
106+
charstr[12];/* sign, 10 digits and '\0' */
107+
108+
pg_ltoa(num,str);
109+
pq_sendcountedtext(&buf,str,strlen(str), false);
110+
}
111+
break;
112+
113+
caseINT8OID:
114+
{
115+
int64num=DatumGetInt64(value);
116+
charstr[23];/* sign, 21 digits and '\0' */
117+
118+
pg_lltoa(num,str);
119+
pq_sendcountedtext(&buf,str,strlen(str), false);
120+
}
121+
break;
122+
102123
default:
103124
elog(ERROR,"unsupported type OID: %u",attr->atttypid);
104125
}

‎src/backend/access/common/tupdesc.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
629629
att->attstorage='p';
630630
att->attcollation=InvalidOid;
631631
break;
632+
633+
caseINT8OID:
634+
att->attlen=8;
635+
att->attbyval=FLOAT8PASSBYVAL;
636+
att->attalign='d';
637+
att->attstorage='p';
638+
att->attcollation=InvalidOid;
639+
break;
632640
}
633641
}
634642

‎src/backend/replication/walsender.c

Lines changed: 85 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,15 @@ WalSndShutdown(void)
302302
staticvoid
303303
IdentifySystem(void)
304304
{
305-
StringInfoDatabuf;
306305
charsysid[32];
307-
chartli[11];
308306
charxpos[MAXFNAMELEN];
309307
XLogRecPtrlogptr;
310308
char*dbname=NULL;
311-
Sizelen;
309+
DestReceiver*dest;
310+
TupOutputState*tstate;
311+
TupleDesctupdesc;
312+
Datumvalues[4];
313+
boolnulls[4];
312314

313315
/*
314316
* Reply with a result set with one row, four columns. First col is system
@@ -328,8 +330,6 @@ IdentifySystem(void)
328330
else
329331
logptr=GetFlushRecPtr();
330332

331-
snprintf(tli,sizeof(tli),"%u",ThisTimeLineID);
332-
333333
snprintf(xpos,sizeof(xpos),"%X/%X", (uint32) (logptr >>32), (uint32)logptr);
334334

335335
if (MyDatabaseId!=InvalidOid)
@@ -346,79 +346,42 @@ IdentifySystem(void)
346346
MemoryContextSwitchTo(cur);
347347
}
348348

349-
/* Send a RowDescription message */
350-
pq_beginmessage(&buf,'T');
351-
pq_sendint(&buf,4,2);/* 4 fields */
352-
353-
/* first field */
354-
pq_sendstring(&buf,"systemid");/* col name */
355-
pq_sendint(&buf,0,4);/* table oid */
356-
pq_sendint(&buf,0,2);/* attnum */
357-
pq_sendint(&buf,TEXTOID,4);/* type oid */
358-
pq_sendint(&buf,-1,2);/* typlen */
359-
pq_sendint(&buf,0,4);/* typmod */
360-
pq_sendint(&buf,0,2);/* format code */
361-
362-
/* second field */
363-
pq_sendstring(&buf,"timeline");/* col name */
364-
pq_sendint(&buf,0,4);/* table oid */
365-
pq_sendint(&buf,0,2);/* attnum */
366-
pq_sendint(&buf,INT4OID,4);/* type oid */
367-
pq_sendint(&buf,4,2);/* typlen */
368-
pq_sendint(&buf,0,4);/* typmod */
369-
pq_sendint(&buf,0,2);/* format code */
370-
371-
/* third field */
372-
pq_sendstring(&buf,"xlogpos");/* col name */
373-
pq_sendint(&buf,0,4);/* table oid */
374-
pq_sendint(&buf,0,2);/* attnum */
375-
pq_sendint(&buf,TEXTOID,4);/* type oid */
376-
pq_sendint(&buf,-1,2);/* typlen */
377-
pq_sendint(&buf,0,4);/* typmod */
378-
pq_sendint(&buf,0,2);/* format code */
349+
dest=CreateDestReceiver(DestRemoteSimple);
350+
MemSet(nulls, false,sizeof(nulls));
379351

380-
/* fourth field */
381-
pq_sendstring(&buf,"dbname");/* col name */
382-
pq_sendint(&buf,0,4);/* table oid */
383-
pq_sendint(&buf,0,2);/* attnum */
384-
pq_sendint(&buf,TEXTOID,4);/* type oid */
385-
pq_sendint(&buf,-1,2);/* typlen */
386-
pq_sendint(&buf,0,4);/* typmod */
387-
pq_sendint(&buf,0,2);/* format code */
388-
pq_endmessage(&buf);
352+
/* need a tuple descriptor representing four columns */
353+
tupdesc=CreateTemplateTupleDesc(4, false);
354+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1,"systemid",
355+
TEXTOID,-1,0);
356+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)2,"timeline",
357+
INT4OID,-1,0);
358+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)3,"xlogpos",
359+
TEXTOID,-1,0);
360+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)4,"dbname",
361+
TEXTOID,-1,0);
389362

390-
/* Send a DataRow message */
391-
pq_beginmessage(&buf,'D');
392-
pq_sendint(&buf,4,2);/* # of columns */
363+
/* prepare for projection of tuples */
364+
tstate=begin_tup_output_tupdesc(dest,tupdesc);
393365

394366
/* column 1: system identifier */
395-
len=strlen(sysid);
396-
pq_sendint(&buf,len,4);
397-
pq_sendbytes(&buf, (char*)&sysid,len);
367+
values[0]=CStringGetTextDatum(sysid);
398368

399369
/* column 2: timeline */
400-
len=strlen(tli);
401-
pq_sendint(&buf,len,4);
402-
pq_sendbytes(&buf, (char*)tli,len);
370+
values[1]=Int32GetDatum(ThisTimeLineID);
403371

404372
/* column 3: xlog position */
405-
len=strlen(xpos);
406-
pq_sendint(&buf,len,4);
407-
pq_sendbytes(&buf, (char*)xpos,len);
373+
values[2]=CStringGetTextDatum(xpos);
408374

409375
/* column 4: database name, or NULL if none */
410376
if (dbname)
411-
{
412-
len=strlen(dbname);
413-
pq_sendint(&buf,len,4);
414-
pq_sendbytes(&buf, (char*)dbname,len);
415-
}
377+
values[3]=CStringGetTextDatum(dbname);
416378
else
417-
{
418-
pq_sendint(&buf,-1,4);
419-
}
379+
nulls[3]= true;
420380

421-
pq_endmessage(&buf);
381+
/* send it to dest */
382+
do_tup_output(tstate,values,nulls);
383+
384+
end_tup_output(tstate);
422385
}
423386

424387

@@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd)
695658
*/
696659
if (sendTimeLineIsHistoric)
697660
{
698-
chartli_str[11];
699661
charstartpos_str[8+1+8+1];
700-
Sizelen;
662+
DestReceiver*dest;
663+
TupOutputState*tstate;
664+
TupleDesctupdesc;
665+
Datumvalues[2];
666+
boolnulls[2];
701667

702-
snprintf(tli_str,sizeof(tli_str),"%u",sendTimeLineNextTLI);
703668
snprintf(startpos_str,sizeof(startpos_str),"%X/%X",
704669
(uint32) (sendTimeLineValidUpto >>32),
705670
(uint32)sendTimeLineValidUpto);
706671

707-
pq_beginmessage(&buf,'T');/* RowDescription */
708-
pq_sendint(&buf,2,2);/* 2 fields */
709-
710-
/* Field header */
711-
pq_sendstring(&buf,"next_tli");
712-
pq_sendint(&buf,0,4);/* table oid */
713-
pq_sendint(&buf,0,2);/* attnum */
672+
dest=CreateDestReceiver(DestRemoteSimple);
673+
MemSet(nulls, false,sizeof(nulls));
714674

715675
/*
676+
* Need a tuple descriptor representing two columns.
716677
* int8 may seem like a surprising data type for this, but in theory
717678
* int4 would not be wide enough for this, as TimeLineID is unsigned.
718679
*/
719-
pq_sendint(&buf,INT8OID,4);/* type oid */
720-
pq_sendint(&buf,-1,2);
721-
pq_sendint(&buf,0,4);
722-
pq_sendint(&buf,0,2);
723-
724-
pq_sendstring(&buf,"next_tli_startpos");
725-
pq_sendint(&buf,0,4);/* table oid */
726-
pq_sendint(&buf,0,2);/* attnum */
727-
pq_sendint(&buf,TEXTOID,4);/* type oid */
728-
pq_sendint(&buf,-1,2);
729-
pq_sendint(&buf,0,4);
730-
pq_sendint(&buf,0,2);
731-
pq_endmessage(&buf);
680+
tupdesc=CreateTemplateTupleDesc(2, false);
681+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1,"next_tli",
682+
INT8OID,-1,0);
683+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)2,"next_tli_startpos",
684+
TEXTOID,-1,0);
732685

733-
/* Data row */
734-
pq_beginmessage(&buf,'D');
735-
pq_sendint(&buf,2,2);/* number of columns */
686+
/* prepare for projection of tuple */
687+
tstate=begin_tup_output_tupdesc(dest,tupdesc);
736688

737-
len=strlen(tli_str);
738-
pq_sendint(&buf,len,4);/* length */
739-
pq_sendbytes(&buf,tli_str,len);
689+
values[0]=Int64GetDatum((int64)sendTimeLineNextTLI);
690+
values[1]=CStringGetTextDatum(startpos_str);
740691

741-
len=strlen(startpos_str);
742-
pq_sendint(&buf,len,4);/* length */
743-
pq_sendbytes(&buf,startpos_str,len);
692+
/* send it to dest */
693+
do_tup_output(tstate,values,nulls);
744694

745-
pq_endmessage(&buf);
695+
end_tup_output(tstate);
746696
}
747697

748698
/* Send CommandComplete message */
@@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
790740
{
791741
constchar*snapshot_name=NULL;
792742
charxpos[MAXFNAMELEN];
793-
StringInfoDatabuf;
794-
Sizelen;
743+
char*slot_name;
744+
DestReceiver*dest;
745+
TupOutputState*tstate;
746+
TupleDesctupdesc;
747+
Datumvalues[4];
748+
boolnulls[4];
795749

796750
Assert(!MyReplicationSlot);
797751

@@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
868822
(uint32) (MyReplicationSlot->data.confirmed_flush >>32),
869823
(uint32)MyReplicationSlot->data.confirmed_flush);
870824

871-
pq_beginmessage(&buf,'T');
872-
pq_sendint(&buf,4,2);/* 4 fields */
873-
874-
/* first field: slot name */
875-
pq_sendstring(&buf,"slot_name");/* col name */
876-
pq_sendint(&buf,0,4);/* table oid */
877-
pq_sendint(&buf,0,2);/* attnum */
878-
pq_sendint(&buf,TEXTOID,4);/* type oid */
879-
pq_sendint(&buf,-1,2);/* typlen */
880-
pq_sendint(&buf,0,4);/* typmod */
881-
pq_sendint(&buf,0,2);/* format code */
882-
883-
/* second field: LSN at which we became consistent */
884-
pq_sendstring(&buf,"consistent_point");/* col name */
885-
pq_sendint(&buf,0,4);/* table oid */
886-
pq_sendint(&buf,0,2);/* attnum */
887-
pq_sendint(&buf,TEXTOID,4);/* type oid */
888-
pq_sendint(&buf,-1,2);/* typlen */
889-
pq_sendint(&buf,0,4);/* typmod */
890-
pq_sendint(&buf,0,2);/* format code */
891-
892-
/* third field: exported snapshot's name */
893-
pq_sendstring(&buf,"snapshot_name");/* col name */
894-
pq_sendint(&buf,0,4);/* table oid */
895-
pq_sendint(&buf,0,2);/* attnum */
896-
pq_sendint(&buf,TEXTOID,4);/* type oid */
897-
pq_sendint(&buf,-1,2);/* typlen */
898-
pq_sendint(&buf,0,4);/* typmod */
899-
pq_sendint(&buf,0,2);/* format code */
900-
901-
/* fourth field: output plugin */
902-
pq_sendstring(&buf,"output_plugin");/* col name */
903-
pq_sendint(&buf,0,4);/* table oid */
904-
pq_sendint(&buf,0,2);/* attnum */
905-
pq_sendint(&buf,TEXTOID,4);/* type oid */
906-
pq_sendint(&buf,-1,2);/* typlen */
907-
pq_sendint(&buf,0,4);/* typmod */
908-
pq_sendint(&buf,0,2);/* format code */
825+
dest=CreateDestReceiver(DestRemoteSimple);
826+
MemSet(nulls, false,sizeof(nulls));
909827

910-
pq_endmessage(&buf);
911-
912-
/* Send a DataRow message */
913-
pq_beginmessage(&buf,'D');
914-
pq_sendint(&buf,4,2);/* # of columns */
828+
/*
829+
* Need a tuple descriptor representing four columns:
830+
* - first field: the slot name
831+
* - second field: LSN at which we became consistent
832+
* - third field: exported snapshot's name
833+
* - fourth field: output plugin
834+
*/
835+
tupdesc=CreateTemplateTupleDesc(4, false);
836+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)1,"slot_name",
837+
TEXTOID,-1,0);
838+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)2,"consistent_point",
839+
TEXTOID,-1,0);
840+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)3,"snapshot_name",
841+
TEXTOID,-1,0);
842+
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber)4,"output_plugin",
843+
TEXTOID,-1,0);
844+
845+
/* prepare for projection of tuples */
846+
tstate=begin_tup_output_tupdesc(dest,tupdesc);
915847

916848
/* slot_name */
917-
len=strlen(NameStr(MyReplicationSlot->data.name));
918-
pq_sendint(&buf,len,4);/* col1 len */
919-
pq_sendbytes(&buf,NameStr(MyReplicationSlot->data.name),len);
849+
slot_name=NameStr(MyReplicationSlot->data.name);
850+
values[0]=CStringGetTextDatum(slot_name);
920851

921852
/* consistent wal location */
922-
len=strlen(xpos);
923-
pq_sendint(&buf,len,4);
924-
pq_sendbytes(&buf,xpos,len);
853+
values[1]=CStringGetTextDatum(xpos);
925854

926855
/* snapshot name, or NULL if none */
927856
if (snapshot_name!=NULL)
928-
{
929-
len=strlen(snapshot_name);
930-
pq_sendint(&buf,len,4);
931-
pq_sendbytes(&buf,snapshot_name,len);
932-
}
857+
values[2]=CStringGetTextDatum(snapshot_name);
933858
else
934-
pq_sendint(&buf,-1,4);
859+
nulls[2]= true;
935860

936861
/* plugin, or NULL if none */
937862
if (cmd->plugin!=NULL)
938-
{
939-
len=strlen(cmd->plugin);
940-
pq_sendint(&buf,len,4);
941-
pq_sendbytes(&buf,cmd->plugin,len);
942-
}
863+
values[3]=CStringGetTextDatum(cmd->plugin);
943864
else
944-
pq_sendint(&buf,-1,4);
865+
nulls[3]= true;
945866

946-
pq_endmessage(&buf);
867+
/* send it to dest */
868+
do_tup_output(tstate,values,nulls);
869+
end_tup_output(tstate);
947870

948871
ReplicationSlotRelease();
949872
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp