Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit517d806

Browse files
committed
Make sockhub working again.
1 parent0dc4435 commit517d806

File tree

5 files changed

+81
-45
lines changed

5 files changed

+81
-45
lines changed

‎contrib/pg_dtm/dtmd/src/clog.c‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,5 +207,8 @@ xid_t clog_find_last_used(clog_t clog) {
207207
last_used=xid;
208208
}
209209
}
210+
if (last_used<MIN_XID) {
211+
last_used=MIN_XID;
212+
}
210213
returnlast_used;
211214
}

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ static DTMConn GetConnection()
310310
}
311311
else
312312
{
313-
elog(WARNING,"Failed to connect to DTMD at unix%d",c->port);
313+
elog(WARNING,"Failed to connect to DTMD at unixsocket");
314314
}
315315
}
316316
}

‎contrib/pg_dtm/pg_dtm.c‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ static TransactionManager DtmTM = {
120120
};
121121

122122
staticchar*DtmServers;
123+
staticchar*DtmServersCopy;
123124
staticintDtmBufferSize;
124125

125126
staticBackgroundWorkerDtmWorker= {
@@ -836,7 +837,7 @@ _PG_init(void)
836837

837838
DefineCustomStringVariable(
838839
"dtm.servers",
839-
"Thespace separated host:port pairs where DTM daemons reside",
840+
"Thecomma separated host:port pairs where DTM daemons reside",
840841
NULL,
841842
&DtmServers,
842843
"127.0.0.1:5431",
@@ -847,6 +848,7 @@ _PG_init(void)
847848
NULL// GucShowHook show_hook
848849
);
849850

851+
DtmServersCopy=strdup(DtmServers);
850852
if (DtmBufferSize!=0)
851853
{
852854
DtmGlobalConfig(DtmServers,Unix_socket_directories);
@@ -956,7 +958,7 @@ void DtmBackgroundWorker(Datum arg)
956958

957959
ShubInitParams(&params);
958960

959-
params.hosts=DtmServers;
961+
ShubParamsSetHosts(&params,DtmServersCopy);
960962
params.file=unix_sock_path;
961963
params.buffer_size=DtmBufferSize;
962964

‎contrib/pg_dtm/sockhub/sockhub.c‎

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,58 @@ void ShubInitParams(ShubParams* params)
3535
params->queue_size=100;
3636
params->max_attempts=10;
3737
params->error_handler=default_error_handler;
38+
params->leader=NULL;
3839
}
3940

41+
voidShubParamsSetHosts(ShubParams*params,char*hoststring)
42+
{
43+
char*hstate,*pstate;
44+
char*hostport,*host,*portstr;
45+
intport;
46+
47+
char*hosts=strdup(hoststring);
48+
fprintf(stderr,"sockhub parsing hosts = '%s'\n",hosts);
49+
hostport=strtok_r(hosts,",",&hstate);
50+
51+
while (hostport) {
52+
fprintf(stderr,"hostport = '%s'\n",hostport);
53+
host=strtok_r(hostport,":",&pstate);
54+
if (!host) {
55+
fprintf(stderr,"wrong host in host list\n");
56+
break;
57+
}
58+
59+
portstr=strtok_r(NULL,":",&pstate);
60+
if (portstr) {
61+
port=atoi(portstr);
62+
}else {
63+
port=5431;
64+
}
65+
66+
fprintf(stderr,"adding host %s:%d\n",host,port);
67+
host_t*h=malloc(sizeof(host_t));
68+
h->host=strdup(host);
69+
h->port=port;
70+
if (params->leader) {
71+
// update pointers from
72+
h->prev=params->leader->prev;
73+
h->next=params->leader;
74+
75+
// update pointers to
76+
h->prev->next=h;
77+
h->next->prev=h;
78+
}else {
79+
// the list is empty
80+
params->leader=h;
81+
h->prev=h;
82+
h->next=h;
83+
}
84+
85+
hostport=strtok_r(NULL,",",&hstate);
86+
}
87+
88+
free(hosts);
89+
}
4090

4191
staticintresolve_host_by_name(constchar*hostname,unsigned*addrs,unsigned*n_addrs)
4292
{
@@ -103,57 +153,37 @@ int ShubWriteSocket(int sd, void const* buf, int size)
103153
return1;
104154
}
105155

106-
107156
staticvoidreconnect(Shub*shub)
108157
{
109-
staticintskip_hosts=0;
110-
printf("will connect to host #%d\n",skip_hosts);
111-
112158
structsockaddr_insock_inet;
113159
unsignedaddrs[128];
114160
unsignedi,n_addrs=sizeof(addrs) /sizeof(addrs[0]);
115161
intmax_attempts=shub->params->max_attempts;
116-
char*hosts=strdup(shub->params->hosts);
117162
if (shub->output >=0) {
118163
close_socket(shub,shub->output);
119164
}
120165

121166
sock_inet.sin_family=AF_INET;
122167

123-
char*hstate,*pstate;
124-
char*hostport,*host,*portstr;
125-
intport;
126-
hostport=strtok_r(hosts,",",&hstate);
127-
inthosti=0;
128-
while (hostport) {
129-
ShubErrorSeverityseverity=SHUB_RECOVERABLE_ERROR;
168+
while (shub->params->leader) {
169+
char*host=shub->params->leader->host;
170+
intport=shub->params->leader->port;
130171

131-
if (hosti<skip_hosts) {
132-
gototrynext;
133-
}
172+
fprintf(stderr,"shub leader = %s:%d\n",host,port);
134173

135-
host=strtok_r(hostport,":",&pstate);
136-
if (!host) {
137-
severity=SHUB_FATAL_ERROR;
138-
break;
139-
}
174+
shub->params->leader=shub->params->leader->next;
140175

141-
portstr=strtok_r(NULL,":",&pstate);
142-
if (portstr) {
143-
port=atoi(portstr);
144-
}else {
145-
port=5431;
146-
}
176+
ShubErrorSeverityseverity=SHUB_RECOVERABLE_ERROR;
147177
sock_inet.sin_port=htons(port);
148178

149179
if (!resolve_host_by_name(host,addrs,&n_addrs)) {
150180
shub->params->error_handler("Failed to resolve host by name",severity);
151-
gototrynext;
181+
continue;
152182
}
153183
shub->output=socket(AF_INET,SOCK_STREAM,0);
154184
if (shub->output<0) {
155185
shub->params->error_handler("Failed to create inet socket",severity);
156-
gototrynext;
186+
continue;
157187
}
158188
while (1) {
159189
intrc=-1;
@@ -170,32 +200,22 @@ static void reconnect(Shub* shub)
170200
if (rc<0) {
171201
if (errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS) {
172202
shub->params->error_handler("Connection can not be establish",severity);
173-
gototrynext;
203+
continue;
174204
}
175205
if (max_attempts--!=0) {
176206
sleep(1);
177207
}else {
178208
shub->params->error_handler("Failed to connect to host",severity);
179-
gototrynext;
209+
continue;
180210
}
181211
}else {
182212
intoptval=1;
183213
setsockopt(shub->output,IPPROTO_TCP,TCP_NODELAY, (charconst*)&optval,sizeof(optval));
184214
FD_SET(shub->output,&shub->inset);
185-
gotofinish;
215+
return;
186216
}
187217
}
188-
trynext:
189-
hostport=strtok_r(NULL,",",&hstate);
190-
hosti++;
191-
}
192-
finish:
193-
if (hosti<skip_hosts) {
194-
skip_hosts=0;
195-
}else {
196-
skip_hosts++;
197218
}
198-
free(hosts);
199219
}
200220

201221
staticvoidnotify_disconnect(Shub*shub,intchan)

‎contrib/pg_dtm/sockhub/sockhub.h‎

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,22 @@ typedef enum
2424

2525
typedefvoid(*ShubErrorHandler)(charconst*msg,ShubErrorSeverityseverity);
2626

27+
typedefstructhost_t
28+
{
29+
char*host;
30+
intport;
31+
structhost_t*next;
32+
structhost_t*prev;
33+
}host_t;
34+
2735
typedefstruct
2836
{
2937
intbuffer_size;
3038
intdelay;
3139
intqueue_size;
3240
intmax_attempts;
3341
charconst*file;
34-
charconst*hosts;
42+
host_t*leader;
3543
ShubErrorHandlererror_handler;
3644
}ShubParams;
3745

@@ -53,7 +61,10 @@ int ShubReadSocket(int sd, void* buf, int size);
5361
intShubWriteSocket(intsd,voidconst*buf,intsize);
5462

5563
voidShubInitParams(ShubParams*params);
64+
voidShubParamsSetHosts(ShubParams*params,char*hoststring);
5665
voidShubInitialize(Shub*shub,ShubParams*params);
5766
voidShubLoop(Shub*shub);
5867

5968
#endif
69+
70+
// vim: sts=4 ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp