Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit97ee604

Browse files
committed
Some refactoring of logical/worker.c
This moves the main operations of apply_handle_{insert|update|delete},that of inserting, updating, deleting a tuple into/from a givenrelation, into correspondingapply_handle_{insert|update|delete}_internal functions. This allowsperforming those operations on relations that are not directly thetargets of replication, which is something a later patch will use fortargeting partitioned tables.Author: Amit Langote <amitlangote09@gmail.com>Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>Discussion:https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
1 parentd40d564 commit97ee604

File tree

1 file changed

+115
-59
lines changed

1 file changed

+115
-59
lines changed

‎src/backend/replication/logical/worker.c

Lines changed: 115 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ static void store_flush_position(XLogRecPtr remote_lsn);
113113

114114
staticvoidmaybe_reread_subscription(void);
115115

116+
staticvoidapply_handle_insert_internal(ResultRelInfo*relinfo,
117+
EState*estate,TupleTableSlot*remoteslot);
118+
staticvoidapply_handle_update_internal(ResultRelInfo*relinfo,
119+
EState*estate,TupleTableSlot*remoteslot,
120+
LogicalRepTupleData*newtup,
121+
LogicalRepRelMapEntry*relmapentry);
122+
staticvoidapply_handle_delete_internal(ResultRelInfo*relinfo,EState*estate,
123+
TupleTableSlot*remoteslot,
124+
LogicalRepRelation*remoterel);
125+
116126
/*
117127
* Should this worker apply changes for given relation.
118128
*
@@ -582,6 +592,7 @@ GetRelationIdentityOrPK(Relation rel)
582592
/*
583593
* Handle INSERT message.
584594
*/
595+
585596
staticvoid
586597
apply_handle_insert(StringInfos)
587598
{
@@ -621,13 +632,10 @@ apply_handle_insert(StringInfo s)
621632
slot_fill_defaults(rel,estate,remoteslot);
622633
MemoryContextSwitchTo(oldctx);
623634

624-
ExecOpenIndices(estate->es_result_relation_info, false);
635+
Assert(rel->localrel->rd_rel->relkind==RELKIND_RELATION);
636+
apply_handle_insert_internal(estate->es_result_relation_info,estate,
637+
remoteslot);
625638

626-
/* Do the insert. */
627-
ExecSimpleRelationInsert(estate,remoteslot);
628-
629-
/* Cleanup. */
630-
ExecCloseIndices(estate->es_result_relation_info);
631639
PopActiveSnapshot();
632640

633641
/* Handle queued AFTER triggers. */
@@ -641,6 +649,20 @@ apply_handle_insert(StringInfo s)
641649
CommandCounterIncrement();
642650
}
643651

652+
/* Workhorse for apply_handle_insert() */
653+
staticvoid
654+
apply_handle_insert_internal(ResultRelInfo*relinfo,
655+
EState*estate,TupleTableSlot*remoteslot)
656+
{
657+
ExecOpenIndices(relinfo, false);
658+
659+
/* Do the insert. */
660+
ExecSimpleRelationInsert(estate,remoteslot);
661+
662+
/* Cleanup. */
663+
ExecCloseIndices(relinfo);
664+
}
665+
644666
/*
645667
* Check if the logical replication relation is updatable and throw
646668
* appropriate error if it isn't.
@@ -684,16 +706,12 @@ apply_handle_update(StringInfo s)
684706
{
685707
LogicalRepRelMapEntry*rel;
686708
LogicalRepRelIdrelid;
687-
Oididxoid;
688709
EState*estate;
689-
EPQStateepqstate;
690710
LogicalRepTupleDataoldtup;
691711
LogicalRepTupleDatanewtup;
692712
boolhas_oldtup;
693-
TupleTableSlot*localslot;
694713
TupleTableSlot*remoteslot;
695714
RangeTblEntry*target_rte;
696-
boolfound;
697715
MemoryContextoldctx;
698716

699717
ensure_transaction();
@@ -719,9 +737,6 @@ apply_handle_update(StringInfo s)
719737
remoteslot=ExecInitExtraTupleSlot(estate,
720738
RelationGetDescr(rel->localrel),
721739
&TTSOpsVirtual);
722-
localslot=table_slot_create(rel->localrel,
723-
&estate->es_tupleTable);
724-
EvalPlanQualInit(&epqstate,estate,NULL,NIL,-1);
725740

726741
/*
727742
* Populate updatedCols so that per-column triggers can fire. This could
@@ -741,28 +756,64 @@ apply_handle_update(StringInfo s)
741756
fill_extraUpdatedCols(target_rte,RelationGetDescr(rel->localrel));
742757

743758
PushActiveSnapshot(GetTransactionSnapshot());
744-
ExecOpenIndices(estate->es_result_relation_info, false);
745759

746760
/* Build the search tuple. */
747761
oldctx=MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
748762
slot_store_cstrings(remoteslot,rel,
749763
has_oldtup ?oldtup.values :newtup.values);
750764
MemoryContextSwitchTo(oldctx);
751765

766+
Assert(rel->localrel->rd_rel->relkind==RELKIND_RELATION);
767+
apply_handle_update_internal(estate->es_result_relation_info,estate,
768+
remoteslot,&newtup,rel);
769+
770+
PopActiveSnapshot();
771+
772+
/* Handle queued AFTER triggers. */
773+
AfterTriggerEndQuery(estate);
774+
775+
ExecResetTupleTable(estate->es_tupleTable, false);
776+
FreeExecutorState(estate);
777+
778+
logicalrep_rel_close(rel,NoLock);
779+
780+
CommandCounterIncrement();
781+
}
782+
783+
/* Workhorse for apply_handle_update() */
784+
staticvoid
785+
apply_handle_update_internal(ResultRelInfo*relinfo,
786+
EState*estate,TupleTableSlot*remoteslot,
787+
LogicalRepTupleData*newtup,
788+
LogicalRepRelMapEntry*relmapentry)
789+
{
790+
Relationlocalrel=relinfo->ri_RelationDesc;
791+
LogicalRepRelation*remoterel=&relmapentry->remoterel;
792+
Oididxoid;
793+
EPQStateepqstate;
794+
TupleTableSlot*localslot;
795+
boolfound;
796+
MemoryContextoldctx;
797+
798+
localslot=table_slot_create(localrel,&estate->es_tupleTable);
799+
EvalPlanQualInit(&epqstate,estate,NULL,NIL,-1);
800+
801+
ExecOpenIndices(relinfo, false);
802+
752803
/*
753804
* Try to find tuple using either replica identity index, primary key or
754805
* if needed, sequential scan.
755806
*/
756-
idxoid=GetRelationIdentityOrPK(rel->localrel);
807+
idxoid=GetRelationIdentityOrPK(localrel);
757808
Assert(OidIsValid(idxoid)||
758-
(rel->remoterel.replident==REPLICA_IDENTITY_FULL&&has_oldtup));
809+
(remoterel->replident==REPLICA_IDENTITY_FULL));
759810

760811
if (OidIsValid(idxoid))
761-
found=RelationFindReplTupleByIndex(rel->localrel,idxoid,
812+
found=RelationFindReplTupleByIndex(localrel,idxoid,
762813
LockTupleExclusive,
763814
remoteslot,localslot);
764815
else
765-
found=RelationFindReplTupleSeq(rel->localrel,LockTupleExclusive,
816+
found=RelationFindReplTupleSeq(localrel,LockTupleExclusive,
766817
remoteslot,localslot);
767818

768819
ExecClearTuple(remoteslot);
@@ -776,8 +827,8 @@ apply_handle_update(StringInfo s)
776827
{
777828
/* Process and store remote tuple in the slot */
778829
oldctx=MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
779-
slot_modify_cstrings(remoteslot,localslot,rel,
780-
newtup.values,newtup.changed);
830+
slot_modify_cstrings(remoteslot,localslot,relmapentry,
831+
newtup->values,newtup->changed);
781832
MemoryContextSwitchTo(oldctx);
782833

783834
EvalPlanQualSetSlot(&epqstate,remoteslot);
@@ -795,23 +846,12 @@ apply_handle_update(StringInfo s)
795846
elog(DEBUG1,
796847
"logical replication did not find row for update "
797848
"in replication target relation \"%s\"",
798-
RelationGetRelationName(rel->localrel));
849+
RelationGetRelationName(localrel));
799850
}
800851

801852
/* Cleanup. */
802-
ExecCloseIndices(estate->es_result_relation_info);
803-
PopActiveSnapshot();
804-
805-
/* Handle queued AFTER triggers. */
806-
AfterTriggerEndQuery(estate);
807-
853+
ExecCloseIndices(relinfo);
808854
EvalPlanQualEnd(&epqstate);
809-
ExecResetTupleTable(estate->es_tupleTable, false);
810-
FreeExecutorState(estate);
811-
812-
logicalrep_rel_close(rel,NoLock);
813-
814-
CommandCounterIncrement();
815855
}
816856

817857
/*
@@ -825,12 +865,8 @@ apply_handle_delete(StringInfo s)
825865
LogicalRepRelMapEntry*rel;
826866
LogicalRepTupleDataoldtup;
827867
LogicalRepRelIdrelid;
828-
Oididxoid;
829868
EState*estate;
830-
EPQStateepqstate;
831869
TupleTableSlot*remoteslot;
832-
TupleTableSlot*localslot;
833-
boolfound;
834870
MemoryContextoldctx;
835871

836872
ensure_transaction();
@@ -855,33 +891,64 @@ apply_handle_delete(StringInfo s)
855891
remoteslot=ExecInitExtraTupleSlot(estate,
856892
RelationGetDescr(rel->localrel),
857893
&TTSOpsVirtual);
858-
localslot=table_slot_create(rel->localrel,
859-
&estate->es_tupleTable);
860-
EvalPlanQualInit(&epqstate,estate,NULL,NIL,-1);
861894

862895
PushActiveSnapshot(GetTransactionSnapshot());
863-
ExecOpenIndices(estate->es_result_relation_info, false);
864896

865-
/*Find thetuple using the replica identity index. */
897+
/*Build thesearch tuple. */
866898
oldctx=MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
867899
slot_store_cstrings(remoteslot,rel,oldtup.values);
868900
MemoryContextSwitchTo(oldctx);
869901

902+
Assert(rel->localrel->rd_rel->relkind==RELKIND_RELATION);
903+
apply_handle_delete_internal(estate->es_result_relation_info,estate,
904+
remoteslot,&rel->remoterel);
905+
906+
PopActiveSnapshot();
907+
908+
/* Handle queued AFTER triggers. */
909+
AfterTriggerEndQuery(estate);
910+
911+
ExecResetTupleTable(estate->es_tupleTable, false);
912+
FreeExecutorState(estate);
913+
914+
logicalrep_rel_close(rel,NoLock);
915+
916+
CommandCounterIncrement();
917+
}
918+
919+
/* Workhorse for apply_handle_delete() */
920+
staticvoid
921+
apply_handle_delete_internal(ResultRelInfo*relinfo,EState*estate,
922+
TupleTableSlot*remoteslot,
923+
LogicalRepRelation*remoterel)
924+
{
925+
Relationlocalrel=relinfo->ri_RelationDesc;
926+
Oididxoid;
927+
EPQStateepqstate;
928+
TupleTableSlot*localslot;
929+
boolfound;
930+
931+
localslot=table_slot_create(localrel,&estate->es_tupleTable);
932+
EvalPlanQualInit(&epqstate,estate,NULL,NIL,-1);
933+
934+
ExecOpenIndices(relinfo, false);
935+
870936
/*
871937
* Try to find tuple using either replica identity index, primary key or
872938
* if needed, sequential scan.
873939
*/
874-
idxoid=GetRelationIdentityOrPK(rel->localrel);
940+
idxoid=GetRelationIdentityOrPK(localrel);
875941
Assert(OidIsValid(idxoid)||
876-
(rel->remoterel.replident==REPLICA_IDENTITY_FULL));
942+
(remoterel->replident==REPLICA_IDENTITY_FULL));
877943

878944
if (OidIsValid(idxoid))
879-
found=RelationFindReplTupleByIndex(rel->localrel,idxoid,
945+
found=RelationFindReplTupleByIndex(localrel,idxoid,
880946
LockTupleExclusive,
881947
remoteslot,localslot);
882948
else
883-
found=RelationFindReplTupleSeq(rel->localrel,LockTupleExclusive,
949+
found=RelationFindReplTupleSeq(localrel,LockTupleExclusive,
884950
remoteslot,localslot);
951+
885952
/* If found delete it. */
886953
if (found)
887954
{
@@ -896,23 +963,12 @@ apply_handle_delete(StringInfo s)
896963
elog(DEBUG1,
897964
"logical replication could not find row for delete "
898965
"in replication target relation \"%s\"",
899-
RelationGetRelationName(rel->localrel));
966+
RelationGetRelationName(localrel));
900967
}
901968

902969
/* Cleanup. */
903-
ExecCloseIndices(estate->es_result_relation_info);
904-
PopActiveSnapshot();
905-
906-
/* Handle queued AFTER triggers. */
907-
AfterTriggerEndQuery(estate);
908-
970+
ExecCloseIndices(relinfo);
909971
EvalPlanQualEnd(&epqstate);
910-
ExecResetTupleTable(estate->es_tupleTable, false);
911-
FreeExecutorState(estate);
912-
913-
logicalrep_rel_close(rel,NoLock);
914-
915-
CommandCounterIncrement();
916972
}
917973

918974
/*

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp