Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6296fba

Browse files
committed
fix pglogical protocol between decoder and receiver
1 parentf682eef commit6296fba

File tree

2 files changed

+25
-98
lines changed

2 files changed

+25
-98
lines changed

‎contrib/multimaster/pglogical_proto.c‎

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
131131
{
132132
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
133133
if (!mm->isLocal) {
134-
uint8flags=0;
135-
136134
pq_sendbyte(out,'I');/* action INSERT */
137-
138-
/* send the flags field */
139-
pq_sendbyte(out,flags);
140-
141-
/* use Oid as relation identifier */
142-
pq_sendint(out,RelationGetRelid(rel),4);
143-
144-
pq_sendbyte(out,'N');/* new tuple follows */
145135
pglogical_write_tuple(out,data,rel,newtuple);
146136
}
147137
}
@@ -155,15 +145,7 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
155145
{
156146
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
157147
if (!mm->isLocal) {
158-
uint8flags=0;
159148
pq_sendbyte(out,'U');/* action UPDATE */
160-
161-
/* send the flags field */
162-
pq_sendbyte(out,flags);
163-
164-
/* use Oid as relation identifier */
165-
pq_sendint(out,RelationGetRelid(rel),4);
166-
167149
/* FIXME support whole tuple (O tuple type) */
168150
if (oldtuple!=NULL)
169151
{
@@ -184,18 +166,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
184166
{
185167
PGLogicalProtoMM*mm= (PGLogicalProtoMM*)data->api;
186168
if (!mm->isLocal) {
187-
uint8flags=0;
188-
189169
pq_sendbyte(out,'D');/* action DELETE */
190-
191-
/* send the flags field */
192-
pq_sendbyte(out,flags);
193-
194-
/* use Oid as relation identifier */
195-
pq_sendint(out,RelationGetRelid(rel),4);
196-
197-
/* FIXME support whole tuple (O tuple type) */
198-
pq_sendbyte(out,'K');/* old key follows */
199170
pglogical_write_tuple(out,data,rel,oldtuple);
200171
}
201172
}
@@ -207,20 +178,6 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
207178
staticvoid
208179
write_startup_message(StringInfoout,List*msg)
209180
{
210-
#if0
211-
ListCell*lc;
212-
213-
pq_sendbyte(out,'S');/* message type field */
214-
pq_sendbyte(out,1);/* startup message version */
215-
foreach (lc,msg)
216-
{
217-
DefElem*param= (DefElem*)lfirst(lc);
218-
Assert(IsA(param->arg,String)&&strVal(param->arg)!=NULL);
219-
/* null-terminated key and value pairs, in client_encoding */
220-
pq_sendstring(out,param->defname);
221-
pq_sendstring(out,strVal(param->arg));
222-
}
223-
#endif
224181
}
225182

226183
/*
@@ -289,11 +246,10 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
289246
transfer_type=decide_datum_transfer(att,typclass,
290247
data->allow_internal_basetypes,
291248
data->allow_binary_basetypes);
292-
249+
pq_sendbyte(out,transfer_type);
293250
switch (transfer_type)
294251
{
295-
case'i':
296-
pq_sendbyte(out,'i');/* internal-format binary data follows */
252+
case'b':/* internal-format binary data follows */
297253

298254
/* pass by value */
299255
if (att->attbyval)
@@ -338,13 +294,11 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
338294

339295
break;
340296

341-
case'b':
297+
case's':/* binary send/recv data follows */
342298
{
343299
bytea*outputbytes;
344300
intlen;
345301

346-
pq_sendbyte(out,'b');/* binary send/recv data follows */
347-
348302
outputbytes=OidSendFunctionCall(typclass->typsend,
349303
values[i]);
350304

@@ -360,8 +314,6 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
360314
char*outputstr;
361315
intlen;
362316

363-
pq_sendbyte(out,'t');/* 'text' data follows */
364-
365317
outputstr=OidOutputFunctionCall(typclass->typoutput,
366318
values[i]);
367319
len=strlen(outputstr)+1;
@@ -391,7 +343,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
391343
att->atttypid<FirstNormalObjectId&&
392344
typclass->typelem==InvalidOid)
393345
{
394-
return'i';
346+
return'b';
395347
}
396348
/*
397349
* Use send/recv, if allowed, if the type is plain or builtin.
@@ -404,7 +356,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
404356
(att->atttypid<FirstNormalObjectId||typclass->typtype!='c')&&
405357
(att->atttypid<FirstNormalObjectId||typclass->typelem==InvalidOid))
406358
{
407-
return'b';
359+
return's';
408360
}
409361

410362
return't';

‎contrib/multimaster/pglogical_receiver.c‎

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
6666

6767
staticvoidprocess_remote_begin(StringInfos);
6868
staticvoidprocess_remote_commit(StringInfos);
69-
staticvoidprocess_remote_insert(StringInfos);
70-
staticvoidprocess_remote_update(StringInfos);
71-
staticvoidprocess_remote_delete(StringInfos);
69+
staticvoidprocess_remote_insert(StringInfos,Relationrel);
70+
staticvoidprocess_remote_update(StringInfos,Relationrel);
71+
staticvoidprocess_remote_delete(StringInfos,Relationrel);
7272

7373
/*
7474
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
@@ -346,7 +346,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
346346
memset(tup->isnull,1,sizeof(tup->isnull));
347347
memset(tup->changed,1,sizeof(tup->changed));
348348

349-
rnatts=pq_getmsgint(s,4);
349+
rnatts=pq_getmsgint(s,2);
350350

351351
if (desc->natts!=rnatts)
352352
elog(ERROR,"tuple natts mismatch, %u vs %u",desc->natts,rnatts);
@@ -370,7 +370,6 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
370370
tup->isnull[i]= true;
371371
tup->changed[i]= false;
372372
tup->values[i]=0xdeadbeef;/* make bad usage more obvious */
373-
374373
break;
375374

376375
case'b':/* binary format */
@@ -463,25 +462,16 @@ process_remote_commit(StringInfo s)
463462
}
464463

465464
staticvoid
466-
process_remote_insert(StringInfos)
465+
process_remote_insert(StringInfos,Relationrel)
467466
{
468-
charaction;
469467
EState*estate;
470468
TupleDatanew_tuple;
471469
TupleTableSlot*newslot;
472470
TupleTableSlot*oldslot;
473-
Relationrel;
474471
ResultRelInfo*relinfo;
475472
ScanKey*index_keys;
476473
inti;
477474

478-
rel=read_rel(s,RowExclusiveLock);
479-
480-
action=pq_getmsgbyte(s);
481-
if (action!='N')
482-
elog(ERROR,"expected new tuple but got %d",
483-
action);
484-
485475
estate=create_rel_estate(rel);
486476
newslot=ExecInitExtraTupleSlot(estate);
487477
oldslot=ExecInitExtraTupleSlot(estate);
@@ -565,7 +555,7 @@ process_remote_insert(StringInfo s)
565555
}
566556

567557
staticvoid
568-
process_remote_update(StringInfos)
558+
process_remote_update(StringInfos,Relationrel)
569559
{
570560
charaction;
571561
EState*estate;
@@ -576,13 +566,10 @@ process_remote_update(StringInfo s)
576566
TupleDataold_tuple;
577567
TupleDatanew_tuple;
578568
Oididxoid;
579-
Relationrel;
580569
Relationidxrel;
581570
ScanKeyDataskey[INDEX_MAX_KEYS];
582571
HeapTupleremote_tuple=NULL;
583572

584-
rel=read_rel(s,RowExclusiveLock);
585-
586573
action=pq_getmsgbyte(s);
587574

588575
/* old key present, identifying key changed */
@@ -690,32 +677,16 @@ process_remote_update(StringInfo s)
690677
}
691678

692679
staticvoid
693-
process_remote_delete(StringInfos)
680+
process_remote_delete(StringInfos,Relationrel)
694681
{
695-
charaction;
696682
EState*estate;
697683
TupleDataoldtup;
698684
TupleTableSlot*oldslot;
699685
Oididxoid;
700-
Relationrel;
701686
Relationidxrel;
702687
ScanKeyDataskey[INDEX_MAX_KEYS];
703688
boolfound_old;
704689

705-
rel=read_rel(s,RowExclusiveLock);
706-
707-
action=pq_getmsgbyte(s);
708-
709-
if (action!='K'&&action!='E')
710-
elog(ERROR,"expected action K or E got %c",action);
711-
712-
if (action=='E')
713-
{
714-
elog(WARNING,"got delete without pkey");
715-
heap_close(rel,NoLock);
716-
return;
717-
}
718-
719690
estate=create_rel_estate(rel);
720691
oldslot=ExecInitExtraTupleSlot(estate);
721692
ExecSetSlotDescriptor(oldslot,RelationGetDescr(rel));
@@ -784,6 +755,7 @@ process_remote_delete(StringInfo s)
784755
voidMMExecutor(intid,void*work,size_tsize)
785756
{
786757
StringInfoDatas;
758+
Relationrel=NULL;
787759
initStringInfo(&s);
788760
s.data=work;
789761
s.len=size;
@@ -796,26 +768,29 @@ void MMExecutor(int id, void* work, size_t size)
796768

797769
switch (action) {
798770
/* BEGIN */
799-
case'B':
771+
case'B':
800772
process_remote_begin(&s);
801773
continue;
802774
/* COMMIT */
803-
case'C':
775+
case'C':
804776
process_remote_commit(&s);
805777
break;
806778
/* INSERT */
807-
case'I':
808-
process_remote_insert(&s);
779+
case'I':
780+
process_remote_insert(&s,rel);
809781
continue;
810782
/* UPDATE */
811-
case'U':
812-
process_remote_update(&s);
783+
case'U':
784+
process_remote_update(&s,rel);
813785
continue;
814786
/* DELETE */
815-
case'D':
816-
process_remote_delete(&s);
787+
case'D':
788+
process_remote_delete(&s,rel);
789+
continue;
790+
case'R':
791+
rel=read_rel(&s,RowExclusiveLock);
817792
continue;
818-
default:
793+
default:
819794
elog(ERROR,"unknown action of type %c",action);
820795
}
821796
break;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp