Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit6a36538

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parentsc0ec3bf +1a8c8c5 commit6a36538

File tree

9 files changed

+184
-36
lines changed

9 files changed

+184
-36
lines changed

‎contrib/mmts/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.opglogical_relid_map.oddd.o bkb.o
33

44
overrideCPPFLAGS += -I../raftable
55

‎contrib/mmts/arbiter.c‎

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,15 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
239239
staticvoidMtmSetSocketOptions(intsd)
240240
{
241241
#ifdefTCP_NODELAY
242-
intoptval=1;
243-
if (setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval))<0) {
242+
inton=1;
243+
if (setsockopt(sd,IPPROTO_TCP,TCP_NODELAY, (charconst*)&on,sizeof(on))<0) {
244244
elog(WARNING,"Failed to set TCP_NODELAY: %m");
245245
}
246246
#endif
247+
if (setsockopt(sd,SOL_SOCKET,SO_KEEPALIVE, (charconst*)&on,sizeof(on))<0) {
248+
elog(WARNING,"Failed to set SO_KEEPALIVE: %m");
249+
}
250+
247251
if (tcp_keepalives_idle) {
248252
#ifdefTCP_KEEPIDLE
249253
if (setsockopt(sd,IPPROTO_TCP,TCP_KEEPIDLE,

‎contrib/mmts/multimaster.c‎

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -865,10 +865,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
865865

866866
voidMtmJoinTransaction(GlobalTransactionId*gtid,csn_tglobalSnapshot)
867867
{
868-
MtmLock(LW_EXCLUSIVE);
869-
MtmSyncClock(globalSnapshot);
870-
MtmUnlock();
871-
868+
if (globalSnapshot!=INVALID_CSN) {
869+
MtmLock(LW_EXCLUSIVE);
870+
MtmSyncClock(globalSnapshot);
871+
MtmUnlock();
872+
}else {
873+
globalSnapshot=MtmTx.snapshot;
874+
}
872875
if (!TransactionIdIsValid(gtid->xid)) {
873876
/* In case of recovery InvalidTransactionId is passed */
874877
Assert(Mtm->status==MTM_RECOVERY);
@@ -1877,6 +1880,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
18771880
}
18781881
}
18791882
}
1883+
staticvoid
1884+
MtmOnProcExit(intcode,Datumarg)
1885+
{
1886+
if (MtmReplicationNodeId >=0) {
1887+
elog(WARNING,"WAL-sender to %d is terminated",MtmReplicationNodeId);
1888+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1889+
}
1890+
}
18801891

18811892
staticvoid
18821893
MtmReplicationStartupHook(structPGLogicalStartupHookArgs*args)
@@ -1923,13 +1934,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
19231934
elog(NOTICE,"Node %d start logical replication to node %d in normal mode",MtmNodeId,MtmReplicationNodeId);
19241935
}
19251936
MtmUnlock();
1937+
on_proc_exit(MtmOnProcExit,0);
19261938
}
19271939

19281940
staticvoid
19291941
MtmReplicationShutdownHook(structPGLogicalShutdownHookArgs*args)
19301942
{
1931-
elog(WARNING,"Logical replication to node %d is stopped",MtmReplicationNodeId);
1932-
MtmOnNodeDisconnect(MtmReplicationNodeId);
1943+
if (MtmReplicationNodeId >=0) {
1944+
elog(WARNING,"Logical replication to node %d is stopped",MtmReplicationNodeId);
1945+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1946+
MtmReplicationNodeId=-1;/* defuse on_proc_exit hook */
1947+
}
19331948
}
19341949

19351950
staticbool
@@ -2159,14 +2174,34 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
21592174

21602175
*errmsg=palloc0(errlen);
21612176

2162-
/* Strip "ERROR:\t" from beginning and "\n" from end of error string */
2177+
/* Strip "ERROR:" from beginning and "\n" from end of error string */
21632178
strncpy(*errmsg,errstr+8,errlen-1-8);
21642179
}
21652180

21662181
PQclear(result);
21672182
returnret;
21682183
}
21692184

2185+
staticvoid
2186+
MtmNoticeReceiver(void*i,constPGresult*res)
2187+
{
2188+
char*notice=PQresultErrorMessage(res);
2189+
char*stripped_notice;
2190+
intlen=strlen(notice);
2191+
2192+
/* Skip notices from other nodes */
2193+
if ( (*(int*)i)!=MtmNodeId-1)
2194+
return;
2195+
2196+
stripped_notice=palloc0(len);
2197+
2198+
/* Strip "NOTICE: " from beginning and "\n" from end of error string */
2199+
strncpy(stripped_notice,notice+9,len-1-9);
2200+
2201+
elog(NOTICE,stripped_notice);
2202+
pfree(stripped_notice);
2203+
}
2204+
21702205
staticvoidMtmBroadcastUtilityStmt(charconst*sql,boolignoreError)
21712206
{
21722207
inti=0;
@@ -2195,6 +2230,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
21952230
elog(ERROR,"Failed to establish connection '%s' to node %d",Mtm->nodes[i].con.connStr,failedNode);
21962231
}
21972232
}
2233+
PQsetNoticeReceiver(conns[i],MtmNoticeReceiver,&i);
21982234
}
21992235
}
22002236
Assert(i==MtmNodes);
@@ -2211,9 +2247,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22112247
}
22122248
if (!MtmRunUtilityStmt(conns[i],sql,&utility_errmsg)&& !ignoreError)
22132249
{
2214-
// errorMsg = "Failed to run command at node %d";
2215-
// XXX: add check for our node
2216-
errorMsg=utility_errmsg;
2250+
if (i+1==MtmNodeId)
2251+
errorMsg=utility_errmsg;
2252+
else
2253+
errorMsg="Failed to run command at node %d";
22172254

22182255
failedNode=i;
22192256
break;
@@ -2418,6 +2455,23 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24182455
skipCommand=stmt->relation->relpersistence==RELPERSISTENCE_TEMP;
24192456
}
24202457
break;
2458+
caseT_IndexStmt:
2459+
{
2460+
Oidrelid;
2461+
Relationrel;
2462+
IndexStmt*stmt= (IndexStmt*)parsetree;
2463+
boolisTopLevel= (context==PROCESS_UTILITY_TOPLEVEL);
2464+
2465+
if (stmt->concurrent)
2466+
PreventTransactionChain(isTopLevel,
2467+
"CREATE INDEX CONCURRENTLY");
2468+
2469+
relid=RelnameGetRelid(stmt->relation->relname);
2470+
rel=heap_open(relid,ShareLock);
2471+
skipCommand=rel->rd_rel->relpersistence==RELPERSISTENCE_TEMP;
2472+
heap_close(rel,NoLock);
2473+
}
2474+
break;
24212475
default:
24222476
skipCommand= false;
24232477
break;

‎contrib/mmts/multimaster.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
#include"pglogical_output/hooks.h"
99

1010
#defineMTM_TUPLE_TRACE(fmt, ...)
11-
#if1
11+
#if0
1212
#defineMTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1313
#defineMTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1414
#else
15-
#defineMTM_INFO(fmt, ...)fprintf(stderr, fmt, ## __VA_ARGS__)
15+
#defineMTM_INFO(fmt, ...)
1616
#define MTM_TRACE(fmt, ...)
1717
#endif
1818

‎contrib/mmts/pglogical_apply.c‎

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include"parser/parse_relation.h"
5050

5151
#include"multimaster.h"
52+
#include"pglogical_relid_map.h"
5253

5354
typedefstructTupleData
5455
{
@@ -451,19 +452,28 @@ read_rel(StringInfo s, LOCKMODE mode)
451452
intrelnamelen;
452453
intnspnamelen;
453454
RangeVar*rv;
454-
Oidrelid;
455-
456-
rv=makeNode(RangeVar);
457-
458-
nspnamelen=pq_getmsgbyte(s);
459-
rv->schemaname= (char*)pq_getmsgbytes(s,nspnamelen);
460-
461-
relnamelen=pq_getmsgbyte(s);
462-
rv->relname= (char*)pq_getmsgbytes(s,relnamelen);
463-
464-
relid=RangeVarGetRelidExtended(rv,mode, false, false,NULL,NULL);
465-
466-
returnheap_open(relid,NoLock);
455+
Oidremote_relid=pq_getmsgint(s,4);
456+
Oidlocal_relid;
457+
458+
local_relid=pglogical_relid_map_get(remote_relid);
459+
if (local_relid==InvalidOid) {
460+
rv=makeNode(RangeVar);
461+
462+
nspnamelen=pq_getmsgbyte(s);
463+
rv->schemaname= (char*)pq_getmsgbytes(s,nspnamelen);
464+
465+
relnamelen=pq_getmsgbyte(s);
466+
rv->relname= (char*)pq_getmsgbytes(s,relnamelen);
467+
468+
local_relid=RangeVarGetRelidExtended(rv,mode, false, false,NULL,NULL);
469+
pglogical_relid_map_put(remote_relid,local_relid);
470+
}else {
471+
nspnamelen=pq_getmsgbyte(s);
472+
s->cursor+=nspnamelen;
473+
relnamelen=pq_getmsgbyte(s);
474+
s->cursor+=relnamelen;
475+
}
476+
returnheap_open(local_relid,NoLock);
467477
}
468478

469479
staticvoid

‎contrib/mmts/pglogical_proto.c‎

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include"utils/typcache.h"
3737

3838
#include"multimaster.h"
39+
#include"pglogical_relid_map.h"
3940

4041
staticboolMtmIsFilteredTxn;
4142

@@ -71,13 +72,15 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7172
uint8nspnamelen;
7273
constchar*relname;
7374
uint8relnamelen;
74-
75+
Oidrelid;
7576
if (MtmIsFilteredTxn) {
7677
return;
7778
}
7879

79-
pq_sendbyte(out,'R');/* sending RELATION */
80-
80+
relid=RelationGetRelid(rel);
81+
pq_sendbyte(out,'R');/* sending RELATION */
82+
pq_sendint(out,relid,sizeofrelid);/* use Oid as relation identifier */
83+
8184
nspname=get_namespace_name(rel->rd_rel->relnamespace);
8285
if (nspname==NULL)
8386
elog(ERROR,"cache lookup failed for namespace %u",
@@ -86,10 +89,10 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
8689

8790
relname=NameStr(rel->rd_rel->relname);
8891
relnamelen=strlen(relname)+1;
89-
92+
9093
pq_sendbyte(out,nspnamelen);/* schema name length */
9194
pq_sendbytes(out,nspname,nspnamelen);
92-
95+
9396
pq_sendbyte(out,relnamelen);/* table name length */
9497
pq_sendbytes(out,relname,relnamelen);
9598
}

‎contrib/mmts/pglogical_relid_map.c‎

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* pglogical_relid_map.c
4+
* Logical Replication map of local Oids to to remote
5+
*
6+
* Copyright (c) 2012-2015, PostgreSQL Global Development Group
7+
*
8+
* IDENTIFICATION
9+
* pglogical_relid_map.c
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#include"postgres.h"
14+
#include"utils/hsearch.h"
15+
#include"pglogical_relid_map.h"
16+
17+
staticHTAB*relid_map;
18+
19+
staticvoid
20+
pglogical_relid_map_init(void)
21+
{
22+
HASHCTLctl;
23+
inthash_flags=HASH_ELEM;
24+
25+
Assert(relid_map==NULL);
26+
27+
MemSet(&ctl,0,sizeof(ctl));
28+
ctl.keysize=sizeof(Oid);
29+
ctl.entrysize=sizeof(PGLRelidMapEntry);
30+
31+
#ifPG_VERSION_NUM >=90500
32+
hash_flags |=HASH_BLOBS;
33+
#else
34+
ctl.hash=tag_hash;
35+
hash_flags |=HASH_FUNCTION;
36+
#endif
37+
38+
relid_map=hash_create("pglogical_relid_map",PGL_INIT_RELID_MAP_SIZE,&ctl,hash_flags);
39+
40+
Assert(relid_map!=NULL);
41+
}
42+
43+
Oidpglogical_relid_map_get(Oidrelid)
44+
{
45+
if (relid_map!=NULL) {
46+
PGLRelidMapEntry*entry= (PGLRelidMapEntry*)hash_search(relid_map,&relid,HASH_FIND,NULL);
47+
returnentry ?entry->local_relid :InvalidOid;
48+
}
49+
returnInvalidOid;
50+
}
51+
52+
boolpglogical_relid_map_put(Oidremote_relid,Oidlocal_relid)
53+
{
54+
boolfound;
55+
PGLRelidMapEntry*entry;
56+
if (relid_map==NULL) {
57+
pglogical_relid_map_init();
58+
}
59+
entry=hash_search(relid_map,&remote_relid,HASH_ENTER,&found);
60+
if (found) {
61+
Assert(entry->local_relid==local_relid);
62+
return false;
63+
}
64+
entry->local_relid=local_relid;
65+
return true;
66+
}

‎contrib/mmts/pglogical_relid_map.h‎

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#ifndefPGLOGICAL_RELID_MAP
2+
#definePGLOGICAL_RELID_MAP
3+
4+
#definePGL_INIT_RELID_MAP_SIZE 256
5+
6+
typedefstructPGLRelidMapEntry {
7+
Oidremote_relid;
8+
Oidlocal_relid;
9+
}PGLRelidMapEntry;
10+
11+
externOidpglogical_relid_map_get(Oidrelid);
12+
externboolpglogical_relid_map_put(Oidremote_relid,Oidlocal_relid);
13+
14+
#endif

‎src/backend/replication/logical/decode.c‎

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
541541
(parsed->dbId!=InvalidOid&&parsed->dbId!=ctx->slot->data.database)||
542542
FilterByOrigin(ctx,origin_id))
543543
{
544-
elog(WARNING,"%d: WAL-SENDER ignore record %lx with origin %d: SnapBuildXactNeedsSkip=%d, FilterByOrigin=%d",
545-
getpid(),buf->origptr,origin_id,
546-
SnapBuildXactNeedsSkip(ctx->snapshot_builder,buf->origptr),FilterByOrigin(ctx,origin_id));
547544
for (i=0;i<parsed->nsubxacts;i++)
548545
{
549546
ReorderBufferForget(ctx->reorder,parsed->subxacts[i],buf->origptr);

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp