Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit02f0b9e

Browse files
author
Mikhail Rutman
committed
Disable timeouts for multimaster
To avoid disconnection between multimaster componentsthe following timeouts were disabled for the multimaster:- statement_timeout;- lock_timeout;- idle_in_transaction_session_timeout;- idle_session_timeout.
1 parent524f915 commit02f0b9e

11 files changed

+189
-9
lines changed

‎Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ OBJS = src/multimaster.o src/dmq.o src/commit.o src/bytebuf.o src/bgwpool.o \
44
src/pglogical_output.o src/pglogical_proto.o src/pglogical_receiver.o\
55
src/pglogical_apply.o src/pglogical_hooks.o src/pglogical_config.o\
66
src/pglogical_relid_map.o src/ddd.o src/bkb.o src/spill.o src/state.o\
7-
src/resolver.o src/ddl.o src/syncpoint.o src/global_tx.o
7+
src/resolver.o src/ddl.o src/syncpoint.o src/global_tx.o src/mtm_utils.o
88
MODULE_big = multimaster
99

1010
ifndefUSE_PGXS# hmm, user didn't requested to use pgxs

‎src/bgwpool.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include"multimaster.h"
3232
#include"state.h"
3333
#include"logger.h"
34+
#include"mtm_utils.h"
3435

3536
/*
3637
* Store the size of tx body, position of it in the tx list and transaction
@@ -324,6 +325,7 @@ BgwPoolMainLoop(BgwPool *poolDesc)
324325
void
325326
BgwPoolDynamicWorkerMainLoop(Datumarg)
326327
{
328+
MtmDisableTimeouts();
327329
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
328330
}
329331

‎src/ddl.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,6 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
666666
QueryEnvironment*queryEnv,DestReceiver*dest,
667667
QueryCompletion*qc)
668668
{
669-
670669
/*
671670
* Quick exit if multimaster is not enabled.
672671
* XXX it's better to do MtmIsEnabled here, but this needs cache access

‎src/dmq.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include"dmq.h"
3232
#include"logger.h"
3333
#include"compat.h"
34+
#include"mtm_utils.h"
3435

3536
#include"access/transam.h"
3637
#include"libpq/libpq.h"
@@ -527,6 +528,8 @@ dmq_sender_main(Datum main_arg)
527528
pqsignal(SIGTERM,die);
528529
BackgroundWorkerUnblockSignals();
529530

531+
MtmDisableTimeouts();
532+
530533
memcpy(&heartbeat_send_timeout,MyBgworkerEntry->bgw_extra,sizeof(int));
531534
memcpy(&connect_timeout,MyBgworkerEntry->bgw_extra+sizeof(int),sizeof(int));
532535

@@ -796,7 +799,7 @@ dmq_sender_main(Datum main_arg)
796799
intpos=event.pos;
797800

798801
pqtime=dmq_now();
799-
status=PQconnectPoll(conns[conn_id].pgconn);
802+
status=MtmPQconnectPoll(conns[conn_id].pgconn);
800803
mtm_log(DmqPqTiming,"[DMQ] [TIMING] pqp = %f ms",dmq_now()-pqtime);
801804

802805
mtm_log(DmqStateIntermediate,
@@ -1386,6 +1389,11 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
13861389
doublelast_message_at=dmq_now();
13871390
void*extra=NULL;
13881391

1392+
/*
1393+
* We do not call MtmDisbaleTimeouts() here because of connection to this
1394+
* client is made by MtmPQconnectPoll() that sets all needed timeouts.
1395+
*/
1396+
13891397
sender_name=text_to_cstring(PG_GETARG_TEXT_PP(0));
13901398
recv_timeout=PG_GETARG_INT32(1);
13911399

‎src/include/mtm_utils.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* mtm_utils.h
4+
*Utility functions:
5+
*- disable global timeouts settings;
6+
*- libpq connect function wrappers.
7+
*
8+
*
9+
* Copyright (c) 2022, Postgres Professional
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#ifndefMTM_UTILS_H
14+
#defineMTM_UTILS_H
15+
16+
#include"libpq/pqformat.h"
17+
#include"libpq-fe.h"
18+
19+
externvoidMtmDisableTimeouts(void);
20+
21+
externPostgresPollingStatusTypeMtmPQconnectPoll(PGconn*conn);
22+
externPGconn*MtmPQconnectdb(constchar*conninfo);
23+
24+
#endif

‎src/mtm_utils.c

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*----------------------------------------------------------------------------
2+
*
3+
* mtm_utils.c
4+
* Utility functions
5+
*
6+
* Copyright (c) 2022, Postgres Professional
7+
*
8+
*----------------------------------------------------------------------------
9+
*/
10+
11+
#include"logger.h"
12+
#include"mtm_utils.h"
13+
14+
#include"utils/timeout.h"
15+
16+
/*
17+
* Disables timeouts on a client side:
18+
* - statement_timeout;
19+
* - lock_timeout;
20+
* - idle_in_transaction_session_timeout;
21+
* - idle_session_timeout.
22+
*
23+
* This timeouts, when set in the postgres config file, affect all process.
24+
* The multimaster needs his sessions not to be interrupted, so we disable
25+
* these timeouts.
26+
*
27+
* This function raises an error on PQExec failed.
28+
*/
29+
staticvoid
30+
disable_client_timeouts(PGconn*conn)
31+
{
32+
PGresult*res;
33+
34+
res=PQexec(conn,"SET statement_timeout = 0");
35+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
36+
{
37+
char*msg=pchomp(PQerrorMessage(conn));
38+
mtm_log(ERROR,"failed to set statement_timeout: %s",msg);
39+
}
40+
41+
res=PQexec(conn,"SET lock_timeout = 0");
42+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
43+
{
44+
char*msg=pchomp(PQerrorMessage(conn));
45+
mtm_log(ERROR,"failed to set lock_timeout: %s",msg);
46+
}
47+
48+
res=PQexec(conn,"SET idle_in_transaction_session_timeout = 0");
49+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
50+
{
51+
char*msg=pchomp(PQerrorMessage(conn));
52+
mtm_log(ERROR,"failed to set idle_in_transaction_session_timeout: %s",msg);
53+
}
54+
55+
res=PQexec(conn,"SET idle_session_timeout = 0");
56+
if (PQresultStatus(res)!=PGRES_COMMAND_OK)
57+
{
58+
char*msg=pchomp(PQerrorMessage(conn));
59+
mtm_log(ERROR,"failed to set idle_session_timeout: %s",msg);
60+
}
61+
}
62+
63+
/*
64+
* Disable timeouts for a current process
65+
* - statement_timeout;
66+
* - lock_timeout;
67+
* - idle_in_transaction_session_timeout;
68+
* - idle_session_timeout.
69+
*
70+
* We disable these timeout for the same reason as in the disable_client_timeout()
71+
*/
72+
externvoid
73+
MtmDisableTimeouts(void)
74+
{
75+
if (get_timeout_active(STATEMENT_TIMEOUT))
76+
{
77+
disable_timeout(STATEMENT_TIMEOUT, false);
78+
}
79+
80+
if (get_timeout_active(LOCK_TIMEOUT))
81+
{
82+
disable_timeout(LOCK_TIMEOUT, false);
83+
}
84+
85+
if (get_timeout_active(IDLE_IN_TRANSACTION_SESSION_TIMEOUT))
86+
{
87+
disable_timeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT, false);
88+
}
89+
90+
if (get_timeout_active(IDLE_SESSION_TIMEOUT))
91+
{
92+
disable_timeout(IDLE_SESSION_TIMEOUT, false);
93+
}
94+
}
95+
96+
/*
97+
* Wrapper on PQconnectPoll
98+
*
99+
* On connect disables timeouts on a client side
100+
*/
101+
PostgresPollingStatusType
102+
MtmPQconnectPoll(PGconn*conn)
103+
{
104+
PostgresPollingStatusTypestatus;
105+
106+
status=PQconnectPoll(conn);
107+
if (status!=PGRES_POLLING_OK)
108+
returnstatus;
109+
110+
disable_client_timeouts(conn);
111+
112+
returnstatus;
113+
}
114+
115+
/*
116+
* Wrapper on PQconnectdb
117+
*
118+
* On connect disables timeouts on a client side
119+
*/
120+
PGconn*
121+
MtmPQconnectdb(constchar*conninfo)
122+
{
123+
PGconn*conn;
124+
125+
conn=PQconnectdb(conninfo);
126+
if (PQstatus(conn)!=CONNECTION_OK)
127+
returnconn;
128+
129+
disable_client_timeouts(conn);
130+
131+
returnconn;
132+
}
133+

‎src/multimaster.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include"commit.h"
4949
#include"messaging.h"
5050
#include"syncpoint.h"
51+
#include"mtm_utils.h"
5152

5253
#include"compat.h"
5354

@@ -333,7 +334,6 @@ MtmSleep(int64 usec)
333334
}
334335
}
335336

336-
337337
/*
338338
* These were once used to setup mtm state in parallel workers, but as long as
339339
* they are read-only we don't really need it (historically it imported csn
@@ -970,7 +970,7 @@ mtm_init_cluster(PG_FUNCTION_ARGS)
970970
intj;
971971

972972
/* connect */
973-
peer_conns[i]=PQconnectdb(conninfo);
973+
peer_conns[i]=MtmPQconnectdb(conninfo);
974974
if (PQstatus(peer_conns[i])!=CONNECTION_OK)
975975
{
976976
char*msg=pchomp(PQerrorMessage(peer_conns[i]));
@@ -1300,7 +1300,7 @@ mtm_join_node(PG_FUNCTION_ARGS)
13001300
if (new_node==NULL)
13011301
mtm_log(ERROR,"new node %d not found",new_node_id);
13021302
conninfo=new_node->conninfo;
1303-
conn=PQconnectdb(conninfo);
1303+
conn=MtmPQconnectdb(conninfo);
13041304
if (PQstatus(conn)!=CONNECTION_OK)
13051305
{
13061306
char*msg=pchomp(PQerrorMessage(conn));
@@ -1495,7 +1495,7 @@ mtm_ping(PG_FUNCTION_ARGS)
14951495
if (!BIT_CHECK(curr_gen.members,peer->node_id-1))
14961496
continue;
14971497

1498-
conn=PQconnectdb(peer->conninfo);
1498+
conn=MtmPQconnectdb(peer->conninfo);
14991499
if (PQstatus(conn)!=CONNECTION_OK)
15001500
{
15011501
char*msg=pchomp(PQerrorMessage(conn));
@@ -2554,7 +2554,7 @@ _mtm_get_snapshots(const MtmConfig *mcfg, PGconn **conns, char **snapnames,
25542554
for (i=0;i<mcfg->n_nodes;i++)
25552555
{
25562556
/* Establish connection to each node */
2557-
conns[i]=PQconnectdb(mcfg->nodes[i].conninfo);
2557+
conns[i]=MtmPQconnectdb(mcfg->nodes[i].conninfo);
25582558

25592559
if (conns[i]==NULL||PQstatus(conns[i])==CONNECTION_BAD)
25602560
{
@@ -2680,7 +2680,7 @@ mtm_check_query(PG_FUNCTION_ARGS)
26802680
intpos=index[i];
26812681

26822682
/* Establish connection to each online node */
2683-
conn=PQconnectdb(cfg->nodes[pos].conninfo);
2683+
conn=MtmPQconnectdb(cfg->nodes[pos].conninfo);
26842684

26852685
if (conn==NULL||PQstatus(conn)==CONNECTION_BAD)
26862686
{

‎src/pglogical_output.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
#include"multimaster.h"
5858
#include"logger.h"
5959
#include"state.h"
60+
#include"mtm_utils.h"
6061

6162
externvoid_PG_output_plugin_init(OutputPluginCallbacks*cb);
6263

@@ -143,6 +144,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
143144
cb->shutdown_cb=pg_decode_shutdown;
144145
cb->message_cb=pg_decode_message;
145146
cb->caughtup_cb=pg_decode_caughtup;
147+
148+
MtmDisableTimeouts();
146149
}
147150

148151
#if0

‎src/pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include"compat.h"
5757
#include"syncpoint.h"
5858
#include"global_tx.h"
59+
#include"mtm_utils.h"
5960

6061
#defineERRCODE_DUPLICATE_OBJECT_STR "42710"
6162

@@ -584,6 +585,8 @@ pglogical_receiver_main(Datum main_arg)
584585
*/
585586
on_shmem_exit(pglogical_receiver_at_exit,PointerGetDatum(rctx));
586587

588+
MtmDisableTimeouts();
589+
587590
MtmIsReceiver= true;
588591
/* Run as replica session replication role. */
589592
SetConfigOption("session_replication_role","replica",

‎src/resolver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include"commit.h"
3131
#include"global_tx.h"
3232
#include"messaging.h"
33+
#include"mtm_utils.h"
3334

3435
staticMtmConfig*mtm_cfg=NULL;
3536
staticboolsend_requests;
@@ -637,6 +638,8 @@ ResolverMain(Datum main_arg)
637638
Oiddb_id,
638639
user_id;
639640

641+
MtmDisableTimeouts();
642+
640643
/* init this worker */
641644
pqsignal(SIGHUP,ResolverSigHupHandler);
642645
pqsignal(SIGTERM,die);

‎src/state.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include"syncpoint.h"
4646
#include"logger.h"
4747
#include"messaging.h"
48+
#include"mtm_utils.h"
4849

4950
charconst*constMtmNeighborEventMnem[]=
5051
{
@@ -1672,6 +1673,8 @@ CampaignerMain(Datum main_arg)
16721673
TimestampTzlast_campaign_at=0;
16731674
intrc=WL_TIMEOUT;
16741675

1676+
MtmDisableTimeouts();
1677+
16751678
MtmBackgroundWorker= true;
16761679
mtm_log(MtmStateMessage,"campaigner started");
16771680
before_shmem_exit(CampaignerOnExit, (Datum)0);
@@ -3417,6 +3420,8 @@ ReplierMain(Datum main_arg)
34173420
ALLOCSET_DEFAULT_SIZES);
34183421
booljob_pending;
34193422

3423+
MtmDisableTimeouts();
3424+
34203425
MtmBackgroundWorker= true;
34213426
before_shmem_exit(ReplierOnExit, (Datum)0);
34223427
mtm_log(MtmStateMessage,"replier started");

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp