@@ -42,6 +42,7 @@ static int MtmTransactionRecords;
42
42
static bool MtmIsFilteredTxn ;
43
43
static TransactionId MtmCurrentXid ;
44
44
static bool DDLInProgress = false;
45
+ static Oid MtmSenderTID ;/* transaction identifier for WAL sender */
45
46
46
47
static void pglogical_write_rel (StringInfo out ,PGLogicalOutputData * data ,Relation rel );
47
48
@@ -80,6 +81,7 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
80
81
const char * relname ;
81
82
uint8 relnamelen ;
82
83
Oid relid ;
84
+ Oid tid ;
83
85
84
86
if (MtmIsFilteredTxn ) {
85
87
MTM_LOG2 ("%d: pglogical_write_message filtered" ,MyProcPid );
@@ -92,23 +94,32 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
92
94
}
93
95
94
96
relid = RelationGetRelid (rel );
97
+
95
98
pq_sendbyte (out ,'R' );/* sending RELATION */
96
99
pq_sendint (out ,relid ,sizeof relid );/* use Oid as relation identifier */
97
100
98
- nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
99
- if (nspname == NULL )
100
- elog (ERROR ,"cache lookup failed for namespace %u" ,
101
+ Assert (MtmSenderTID != InvalidOid );
102
+ tid = pglogical_relid_map_get (relid );
103
+ if (tid == MtmSenderTID ) {/* this relation was already sent in this transaction */
104
+ pq_sendbyte (out ,0 );/* do not need to send relation namespace and name in this case */
105
+ pq_sendbyte (out ,0 );
106
+ }else {
107
+ pglogical_relid_map_put (relid ,MtmSenderTID );
108
+ nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
109
+ if (nspname == NULL )
110
+ elog (ERROR ,"cache lookup failed for namespace %u" ,
101
111
rel -> rd_rel -> relnamespace );
102
- nspnamelen = strlen (nspname )+ 1 ;
103
-
104
- relname = NameStr (rel -> rd_rel -> relname );
105
- relnamelen = strlen (relname )+ 1 ;
106
-
107
- pq_sendbyte (out ,nspnamelen );/* schema name length */
108
- pq_sendbytes (out ,nspname ,nspnamelen );
109
-
110
- pq_sendbyte (out ,relnamelen );/* table name length */
111
- pq_sendbytes (out ,relname ,relnamelen );
112
+ nspnamelen = strlen (nspname )+ 1 ;
113
+
114
+ relname = NameStr (rel -> rd_rel -> relname );
115
+ relnamelen = strlen (relname )+ 1 ;
116
+
117
+ pq_sendbyte (out ,nspnamelen );/* schema name length */
118
+ pq_sendbytes (out ,nspname ,nspnamelen );
119
+
120
+ pq_sendbyte (out ,relnamelen );/* table name length */
121
+ pq_sendbytes (out ,relname ,relnamelen );
122
+ }
112
123
}
113
124
114
125
/*
@@ -128,6 +139,10 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
128
139
MtmIsFilteredTxn = true;
129
140
MTM_LOG2 ("%d: pglogical_write_begin XID=%lld filtered" ,MyProcPid , (long64 )txn -> xid );
130
141
}else {
142
+ if (++ MtmSenderTID == InvalidOid ) {
143
+ pglogical_relid_map_reset ();
144
+ MtmSenderTID += 1 ;/* skip InvalidOid */
145
+ }
131
146
MtmCurrentXid = txn -> xid ;
132
147
MtmIsFilteredTxn = false;
133
148
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%lld recovery=%d restart_decoding_lsn=%llx first_lsn=%llx end_lsn=%llx confirmed_flush=%llx" ,