Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5317b56

Browse files
committed
Start socklib as background worker
1 parentbc7a35e commit5317b56

File tree

6 files changed

+136
-56
lines changed

6 files changed

+136
-56
lines changed

‎contrib/pg_dtm/Makefile‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
MODULE_big = pg_dtm
2-
OBJS = pg_dtm.o libdtm.o
2+
OBJS = pg_dtm.o libdtm.o sockhub/libsockhub.a
3+
4+
sockhub/libsockhub.a:
5+
make -C sockhub
36

47
EXTENSION = pg_dtm
58
DATA = pg_dtm--1.0.sql

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ typedef struct DTMConnData
2222
intsock;
2323
}DTMConnData;
2424

25+
staticchar*dtmhost=NULL;
26+
staticintdtmport=0;
27+
staticchar*dtm_unix_sock_dir;
28+
2529
typedefunsigned long longxid_t;
2630

2731
// Returns true if the write was successful.
@@ -151,51 +155,73 @@ static bool dtm_read_status(DTMConn dtm, XidStatus *s)
151155
// Connects to the specified DTM.
152156
staticDTMConnDtmConnect(char*host,intport)
153157
{
154-
structaddrinfo*addrs=NULL;
155-
structaddrinfohint;
156-
charportstr[6];
157-
structaddrinfo*a;
158-
159-
memset(&hint,0,sizeof(hint));
160-
hint.ai_socktype=SOCK_STREAM;
161-
hint.ai_family=AF_INET;
162-
snprintf(portstr,6,"%d",port);
163-
hint.ai_protocol=getprotobyname("tcp")->p_proto;
164-
if (getaddrinfo(host,portstr,&hint,&addrs))
165-
{
166-
perror("resolve address");
167-
returnNULL;
168-
}
169-
170-
for (a=addrs;a!=NULL;a=a->ai_next)
171-
{
172-
DTMConndtm;
173-
intone=1;
174-
intsock=socket(a->ai_family,a->ai_socktype,a->ai_protocol);
175-
if (sock==-1)
176-
{
177-
perror("failed to create a socket");
178-
continue;
179-
}
180-
setsockopt(sock,IPPROTO_TCP,TCP_NODELAY,&one,sizeof(one));
181-
182-
if (connect(sock,a->ai_addr,a->ai_addrlen)==-1)
183-
{
184-
perror("failed to connect to an address");
185-
close(sock);
186-
continue;
187-
}
188-
189-
// success
190-
freeaddrinfo(addrs);
191-
dtm=malloc(sizeof(DTMConnData));
192-
dtm->sock=sock;
193-
returndtm;
194-
}
195-
196-
freeaddrinfo(addrs);
197-
fprintf(stderr,"could not connect\n");
198-
returnNULL;
158+
DTMConndtm;
159+
intsd;
160+
161+
if (strcmp(host,"localhost")==0) {
162+
structsockaddrsock;
163+
intlen= offsetof(structsockaddr,sa_data)+snprintf(sock.sa_data,sizeof(sock.sa_data),"%s/p%u",dtm_unix_sock_dir,port);
164+
sock.sa_family=AF_UNIX;
165+
166+
sd=socket(AF_UNIX,SOCK_STREAM,0);
167+
if (sd==-1)
168+
{
169+
perror("failed to create a unix socket");
170+
}
171+
if (connect(sd,&sock,len)==-1)
172+
{
173+
perror("failed to connect to an address");
174+
close(sd);
175+
returnNULL;
176+
}
177+
dtm=malloc(sizeof(DTMConnData));
178+
dtm->sock=sd;
179+
returndtm;
180+
}else {
181+
structaddrinfo*addrs=NULL;
182+
structaddrinfohint;
183+
charportstr[6];
184+
structaddrinfo*a;
185+
186+
memset(&hint,0,sizeof(hint));
187+
hint.ai_socktype=SOCK_STREAM;
188+
hint.ai_family=AF_INET;
189+
snprintf(portstr,6,"%d",port);
190+
hint.ai_protocol=getprotobyname("tcp")->p_proto;
191+
if (getaddrinfo(host,portstr,&hint,&addrs))
192+
{
193+
perror("resolve address");
194+
returnNULL;
195+
}
196+
197+
for (a=addrs;a!=NULL;a=a->ai_next)
198+
{
199+
intone=1;
200+
sd=socket(a->ai_family,a->ai_socktype,a->ai_protocol);
201+
if (sd==-1)
202+
{
203+
perror("failed to create a socket");
204+
continue;
205+
}
206+
setsockopt(sd,IPPROTO_TCP,TCP_NODELAY,&one,sizeof(one));
207+
208+
if (connect(sd,a->ai_addr,a->ai_addrlen)==-1)
209+
{
210+
perror("failed to connect to an address");
211+
close(sd);
212+
continue;
213+
}
214+
215+
// success
216+
freeaddrinfo(addrs);
217+
dtm=malloc(sizeof(DTMConnData));
218+
dtm->sock=sd;
219+
returndtm;
220+
}
221+
freeaddrinfo(addrs);
222+
}
223+
fprintf(stderr,"could not connect\n");
224+
returnNULL;
199225
}
200226

201227
/*
@@ -231,16 +257,14 @@ static bool dtm_query(DTMConn dtm, char cmd, int argc, ...)
231257
return true;
232258
}
233259

234-
staticchar*dtmhost=NULL;
235-
staticintdtmport=0;
236-
237-
voidTuneToDtm(char*host,intport) {
260+
voidDtmGlobalConfig(char*host,intport,char*sock_dir) {
238261
if (dtmhost) {
239262
free(dtmhost);
240263
dtmhost=NULL;
241264
}
242265
dtmhost=strdup(host);
243266
dtmport=port;
267+
dtm_unix_sock_dir=sock_dir;
244268
}
245269

246270
staticDTMConnGetConnection()
@@ -255,7 +279,7 @@ static DTMConn GetConnection()
255279
elog(ERROR,"Failed to connect to DTMD %s:%d",dtmhost,dtmport);
256280
}
257281
}else {
258-
elog(ERROR,"DTMD address not specified");
282+
/*elog(ERROR, "DTMD address not specified"); */
259283
}
260284
}
261285
returndtm;

‎contrib/pg_dtm/libdtm.h‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
// Sets up the host and port for DTM connection.
1212
// The defaults are "127.0.0.1" and 5431.
13-
voidTuneToDtm(char*host,intport);
13+
voidDtmGlobalConfig(char*host,intport,char*sock_dir);
1414

1515
voidDtmInitSnapshot(Snapshotsnapshot);
1616

‎contrib/pg_dtm/pg_dtm.c‎

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include"postgres.h"
1313
#include"fmgr.h"
1414
#include"miscadmin.h"
15+
#include"postmaster/postmaster.h"
16+
#include"postmaster/bgworker.h"
1517
#include"storage/s_lock.h"
1618
#include"storage/spin.h"
1719
#include"storage/lmgr.h"
@@ -39,6 +41,7 @@
3941
#include"storage/pmsignal.h"
4042
#include"storage/proc.h"
4143
#include"utils/syscache.h"
44+
#include"sockhub/sockhub.h"
4245

4346
#include"libdtm.h"
4447

@@ -74,6 +77,7 @@ static bool TransactionIdIsInSnapshot(TransactionId xid, Snapshot snapshot);
7477
staticboolTransactionIdIsInDoubt(TransactionIdxid);
7578

7679
staticvoidDtmShmemStartup(void);
80+
staticvoidDtmBackgroundWorker(Datumarg);
7781

7882
staticshmem_startup_hook_typeprev_shmem_startup_hook;
7983
staticHTAB*xid_in_doubt;
@@ -91,7 +95,17 @@ static TransactionManager DtmTM = { DtmGetTransactionStatus, DtmSetTransactionSt
9195

9296
staticchar*DtmHost;
9397
staticintDtmPort;
94-
98+
staticintDtmBufferSize;
99+
100+
staticBackgroundWorkerDtmWorker= {
101+
"DtmWorker",
102+
0,/* do not need connection to the database */
103+
BgWorkerStart_PostmasterStart,
104+
1,/* restrart in one second (is it possible to restort immediately?) */
105+
DtmBackgroundWorker
106+
};
107+
108+
95109

96110
#defineXTM_TRACE(fmt, ...)
97111
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -623,6 +637,9 @@ static void DtmInitialize()
623637
dtm->nReservedXids=0;
624638
dtm->minXid=InvalidTransactionId;
625639
RegisterXactCallback(DtmXactCallback,NULL);
640+
if (DtmBufferSize!=0) {
641+
RegisterBackgroundWorker(&DtmWorker);
642+
}
626643
}
627644
LWLockRelease(AddinShmemInitLock);
628645

@@ -716,6 +733,21 @@ _PG_init(void)
716733
NULL
717734
);
718735

736+
DefineCustomIntVariable(
737+
"dtm.buffer_size",
738+
"Size of sockhub buffer for connection to DTM daemon, if 0, then direct connection will be used",
739+
NULL,
740+
&DtmBufferSize,
741+
0,
742+
0,
743+
INT_MAX,
744+
PGC_POSTMASTER,
745+
0,
746+
NULL,
747+
NULL,
748+
NULL
749+
);
750+
719751
DefineCustomStringVariable(
720752
"dtm.host",
721753
"The host where DTM daemon resides",
@@ -744,7 +776,7 @@ _PG_init(void)
744776
NULL
745777
);
746778

747-
TuneToDtm(DtmHost,DtmPort);
779+
DtmGlobalConfig(DtmHost,DtmPort,Unix_socket_directories);
748780

749781
/*
750782
* Install hooks.
@@ -835,3 +867,24 @@ Datum dtm_join_transaction(PG_FUNCTION_ARGS)
835867
PG_RETURN_VOID();
836868
}
837869

870+
voidDtmBackgroundWorker(Datumarg)
871+
{
872+
Shubshub;
873+
ShubParamsparams;
874+
charunix_sock_path[MAXPGPATH];
875+
876+
snprintf(unix_sock_path,sizeof(unix_sock_path),"%s/p%d",Unix_socket_directories,DtmPort);
877+
878+
ShubInitParams(&params);
879+
880+
params.host=DtmHost;
881+
params.port=DtmPort;
882+
params.file=unix_sock_path;
883+
params.buffer_size=DtmBufferSize;
884+
885+
DtmGlobalConfig("localhost",DtmPort,Unix_socket_directories);
886+
887+
ShubInitialize(&shub,&params);
888+
889+
ShubLoop(&shub);
890+
}

‎contrib/pg_dtm/sockhub/Makefile‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = gcc
2-
CFLAGS = -c -I. -Wall -O2 -g
2+
CFLAGS = -c -I. -Wall -O2 -g -fPIC
33
LD =$(CC)
44
LDFLAGS = -g
55
AR = ar

‎contrib/pg_dtm/tests/transfers.sh‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
go run transfers.go \
33
-d'dbname=postgres port=5432' \
44
-d'dbname=postgres port=5433' \
5-
-d'dbname=postgres port=5434' \
5+
-m \
66
-g

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp