Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit98075f5

Browse files
knizhnikkelvich
authored andcommitted
Fix start of pglogical_receiver
1 parent8fefcf6 commit98075f5

File tree

3 files changed

+37
-23
lines changed

3 files changed

+37
-23
lines changed

‎multimaster.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,13 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
278278
{
279279
if (ts->csn>dtmTx.snapshot) {
280280
MTM_TUPLE_TRACE("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld\n",
281-
getpid(),xid,ts->csn,dtmTx.snapshot);
281+
MyProcPid,xid,ts->csn,dtmTx.snapshot);
282282
MtmUnlock();
283283
return true;
284284
}
285285
if (ts->status==TRANSACTION_STATUS_UNKNOWN)
286286
{
287-
MTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot);
287+
MTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n",MyProcPid,xid,dtmTx.snapshot);
288288
MtmUnlock();
289289
#ifTRACE_SLEEP_TIME
290290
{
@@ -316,14 +316,14 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
316316
{
317317
boolinvisible=ts->status!=TRANSACTION_STATUS_COMMITTED;
318318
MTM_TUPLE_TRACE("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld\n",
319-
getpid(),xid,ts->csn,invisible ?"rollbacked" :"committed",dtmTx.snapshot);
319+
MyProcPid,xid,ts->csn,invisible ?"rollbacked" :"committed",dtmTx.snapshot);
320320
MtmUnlock();
321321
returninvisible;
322322
}
323323
}
324324
else
325325
{
326-
MTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n",getpid(),xid,dtmTx.snapshot);
326+
MTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n",MyProcPid,xid,dtmTx.snapshot);
327327
break;
328328
}
329329
}
@@ -493,7 +493,7 @@ MtmXactCallback(XactEvent event, void *arg)
493493
staticbool
494494
MtmIsUserTransaction()
495495
{
496-
returnIsNormalProcessingMode()&&dtm->status==MTM_ONLINE&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
496+
returnIsNormalProcessingMode()&&MtmDoReplication&& !am_walsender&& !IsBackgroundWorker&& !IsAutoVacuumWorkerProcess();
497497
}
498498

499499
staticvoid
@@ -504,6 +504,10 @@ MtmBeginTransaction(MtmCurrentTrans* x)
504504
x->xid=GetCurrentTransactionIdIfAny();
505505
x->isReplicated= false;
506506
x->isDistributed=MtmIsUserTransaction();
507+
if (x->isDistributed&&dtm->status!=MTM_ONLINE) {
508+
MtmUnlock();
509+
elog(ERROR,"Multimaster node is offline");
510+
}
507511
x->containsDML= false;
508512
x->isPrepared= false;
509513
x->snapshot=MtmAssignCSN();
@@ -614,7 +618,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
614618

615619
MtmUnlock();
616620

617-
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",getpid(),x->xid,ts->csn);
621+
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n",MyProcPid,x->xid,ts->csn);
618622
}
619623

620624
staticvoid
@@ -728,14 +732,14 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
728732
ts=hash_search(xid2state,&xid,HASH_FIND,NULL);
729733
Assert(ts!=NULL);/* should be created by MtmPrepareTransaction */
730734

731-
MTM_TRACE("%d: MtmCommitTransaction begin commit of %d CSN=%ld\n",getpid(),xid,ts->csn);
735+
MTM_TRACE("%d: MtmCommitTransaction begin commit of %d CSN=%ld\n",MyProcPid,xid,ts->csn);
732736
MtmAddSubtransactions(ts,subxids,nsubxids);
733737

734738
MtmVoteForTransaction(ts);
735739

736740
MtmUnlock();
737741

738-
MTM_TRACE("%d: MtmCommitTransaction %d status=%d\n",getpid(),xid,ts->status);
742+
MTM_TRACE("%d: MtmCommitTransaction %d status=%d\n",MyProcPid,xid,ts->status);
739743

740744
returnts->status!=TRANSACTION_STATUS_ABORTED;
741745
}
@@ -768,7 +772,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
768772
MtmSendNotificationMessage(ts);
769773
}
770774
MtmUnlock();
771-
MTM_TRACE("%d: MtmFinishTransaction %d CSN=%ld, status=%d\n",getpid(),xid,ts->csn,status);
775+
MTM_TRACE("%d: MtmFinishTransaction %d CSN=%ld, status=%d\n",MyProcPid,xid,ts->csn,status);
772776
}
773777
}
774778

@@ -777,7 +781,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
777781
staticvoid
778782
MtmSetTransactionStatus(TransactionIdxid,intnsubxids,TransactionId*subxids,XidStatusstatus,XLogRecPtrlsn)
779783
{
780-
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n",getpid(),xid,dtmTx.xid,status,dtmTx.isDistributed);
784+
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n",MyProcPid,xid,dtmTx.xid,status,dtmTx.isDistributed);
781785
if (xid==dtmTx.xid&&dtmTx.isDistributed&& !dtmTx.isPrepared)
782786
{
783787
if (status==TRANSACTION_STATUS_ABORTED|| !dtmTx.containsDML||dtm->status==MTM_RECOVERY)
@@ -1100,7 +1104,7 @@ void MtmReceiverStarted(int nodeId)
11001104
{
11011105
SpinLockAcquire(&dtm->spinlock);
11021106
if (!BIT_CHECK(dtm->pglogicalNodeMask,nodeId-1)) {
1103-
dtm->pglogicalNodeMask |= (int64)1 << (nodeId-1);
1107+
dtm->pglogicalNodeMask |= (nodemask_t)1 << (nodeId-1);
11041108
if (++dtm->nReceivers==dtm->nNodes-1) {
11051109
elog(WARNING,"All receivers are started, switch to normal mode");
11061110
Assert(dtm->status==MTM_CONNECTED);
@@ -1478,14 +1482,14 @@ MtmVoteForTransaction(MtmTransState* ts)
14781482
MtmSendNotificationMessage(ts);
14791483
}
14801484
}
1481-
MTM_TRACE("%d: Node %d waiting latch...\n",getpid(),MtmNodeId);
1485+
MTM_TRACE("%d: Node %d waiting latch...\n",MyProcPid,MtmNodeId);
14821486
while (!ts->done) {
14831487
MtmUnlock();
14841488
WaitLatch(&MyProc->procLatch,WL_LATCH_SET,-1);
14851489
ResetLatch(&MyProc->procLatch);
14861490
MtmLock(LW_SHARED);
14871491
}
1488-
MTM_TRACE("%d: Node %d receives response...\n",getpid(),MtmNodeId);
1492+
MTM_TRACE("%d: Node %d receives response...\n",MyProcPid,MtmNodeId);
14891493
}
14901494

14911495
HTAB*MtmCreateHash(void)

‎pglogical_receiver.c

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include<unistd.h>
1818
#include"postgres.h"
1919
#include"fmgr.h"
20+
#include"miscadmin.h"
2021
#include"libpq-fe.h"
2122
#include"pqexpbuffer.h"
2223
#include"access/xact.h"
@@ -38,7 +39,8 @@
3839
/* Allow load of this module in shared libs */
3940

4041
typedefstructReceiverArgs {
41-
intreceiver_node;
42+
intlocal_node;
43+
intremote_node;
4244
char*receiver_conn_string;
4345
charreceiver_slot[16];
4446
}ReceiverArgs;
@@ -55,7 +57,7 @@ static bool receiver_sync_mode = false;
5557

5658
/* Worker name */
5759
staticchar*worker_name="multimaster";
58-
charworker_proc[16];
60+
charworker_proc[BGW_MAXLEN];
5961

6062
/* Lastly written positions */
6163
staticXLogRecPtroutput_written_lsn=InvalidXLogRecPtr;
@@ -216,7 +218,7 @@ pglogical_receiver_main(Datum main_arg)
216218
pqsignal(SIGHUP,receiver_raw_sighup);
217219
pqsignal(SIGTERM,receiver_raw_sigterm);
218220

219-
sprintf(worker_proc,"mtm_recv_%d",getpid());
221+
sprintf(worker_proc,"mtm_pglogical_receiver_%d_%d",args->local_node,args->remote_node);
220222

221223
/* We're now ready to receive signals */
222224
BackgroundWorkerUnblockSignals();
@@ -229,7 +231,7 @@ pglogical_receiver_main(Datum main_arg)
229231
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
230232
* Slots at other nodes should be removed
231233
*/
232-
mode=MtmReceiverSlotMode(args->receiver_node);
234+
mode=MtmReceiverSlotMode(args->remote_node);
233235

234236
/* Establish connection to remote server */
235237
conn=PQconnectdb(args->receiver_conn_string);
@@ -266,11 +268,18 @@ pglogical_receiver_main(Datum main_arg)
266268
PQclear(res);
267269
resetPQExpBuffer(query);
268270
}
271+
269272
/* Start logical replication at specified position */
273+
StartTransactionCommand();
270274
originId=replorigin_by_name(args->receiver_slot, true);
271275
if (originId!=InvalidRepOriginId) {
272276
originStartPos=replorigin_get_progress(originId, false);
277+
elog(WARNING,"Restart logical receiver at position %lx from node %d",originStartPos,args->remote_node);
278+
}else {
279+
elog(WARNING,"Start logical receiver from node %d",args->remote_node);
273280
}
281+
CommitTransactionCommand();
282+
274283
appendPQExpBuffer(query,"START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
275284
args->receiver_slot,
276285
(uint32) (originStartPos >>32),
@@ -282,14 +291,14 @@ pglogical_receiver_main(Datum main_arg)
282291
if (PQresultStatus(res)!=PGRES_COPY_BOTH)
283292
{
284293
PQclear(res);
285-
ereport(LOG, (errmsg("%s: Could not start logical replication",
286-
worker_proc)));
294+
ereport(WARNING, (errmsg("%s: Could not start logical replication",
295+
worker_proc)));
287296
proc_exit(1);
288297
}
289298
PQclear(res);
290299
resetPQExpBuffer(query);
291300

292-
MtmReceiverStarted(args->receiver_node);
301+
MtmReceiverStarted(args->remote_node);
293302
ByteBufferAlloc(&buf);
294303
ds=MtmGetState();
295304

@@ -576,10 +585,11 @@ int MtmStartReceivers(char* conns, int node_id)
576585
}
577586
ctx->receiver_conn_string=psprintf("replication=database %.*s", (int)(p-conn_str),conn_str);
578587
sprintf(ctx->receiver_slot,"mtm_slot_%d",node_id);
579-
ctx->receiver_node=node_id;
588+
ctx->local_node=node_id;
589+
ctx->remote_node=i;
580590

581591
/* Worker parameter and registration */
582-
snprintf(worker.bgw_name,BGW_MAXLEN,"mtm_worker_%d_%d",node_id,i);
592+
snprintf(worker.bgw_name,BGW_MAXLEN,"mtm_pglogical_receiver_%d_%d",node_id,i);
583593

584594
worker.bgw_main_arg= (Datum)ctx;
585595
RegisterBackgroundWorker(&worker);

‎tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ void initializeDatabase()
184184
printf("Creating database schema...\n");
185185
{
186186
nontransactiontxn(conn);
187-
exec(txn,"drop extension if existsmultimsater");
187+
exec(txn,"drop extension if existsmultimaster");
188188
exec(txn,"create extension multimaster");
189189
exec(txn,"drop table if exists t");
190190
exec(txn,"create table t(u int primary key, v int)");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp