Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit1157be8

Browse files
committed
Add client -> stream pointer update while moving streams in memory in arbiter code.
1 parentb34d59e commit1157be8

File tree

5 files changed

+48
-7
lines changed

5 files changed

+48
-7
lines changed

‎contrib/pg_dtm/dtmd/src/main.c‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
165165
#defineCHECK(COND,CLIENT,MSG) \
166166
do { \
167167
if (!(COND)) { \
168-
shout("[%d] %s, returning'-'\n", CLIENT_ID(CLIENT), MSG); \
168+
shout("[%d] %s, returningRES_FAILED\n", CLIENT_ID(CLIENT), MSG); \
169169
client_message_shortcut(CLIENT, RES_FAILED); \
170170
return; \
171171
} \

‎contrib/pg_dtm/dtmd/src/server.c‎

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,16 @@ static void server_stream_destroy(server_t server, stream_t stream) {
214214
free(stream->output.data);
215215
}
216216

217+
staticvoidstream_move(stream_tdst,stream_tsrc) {
218+
inti;
219+
*dst=*src;
220+
for (i=0;i<MAX_TRANSACTIONS;i++) {
221+
if (dst->clients[i].stream) {
222+
dst->clients[i].stream=dst;
223+
}
224+
}
225+
}
226+
217227
staticvoidserver_close_bad_streams(server_tserver) {
218228
inti;
219229
for (i=server->streamsnum-1;i >=0;i--) {
@@ -223,6 +233,7 @@ static void server_close_bad_streams(server_t server) {
223233
if (i!=server->streamsnum-1) {
224234
// move the last one here
225235
*stream=server->streams[server->streamsnum-1];
236+
stream_move(stream,server->streams+server->streamsnum-1);
226237
}
227238
server->streamsnum--;
228239
}
@@ -372,6 +383,15 @@ static client_t stream_get_client(stream_t stream, unsigned int chan, bool *isne
372383
returnclient;
373384
}
374385

386+
staticvoidhexdump(intlen,char*data) {
387+
fprintf(stderr,"hex:");
388+
inti;
389+
for (i=0;i<len;i++) {
390+
fprintf(stderr," %02x",data[i]);
391+
}
392+
fprintf(stderr,"\n");
393+
}
394+
375395
staticboolserver_stream_handle(server_tserver,stream_tstream) {
376396
debug("a stream ready to recv\n");
377397

@@ -466,8 +486,8 @@ void server_loop(server_t server) {
466486
}
467487
}
468488

469-
server_flush(server);
470489
server_close_bad_streams(server);
490+
server_flush(server);
471491
}
472492
}
473493

‎contrib/pg_dtm/libdtm.c‎

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results) {
131131
elog(ERROR,"Failed to recv results header from arbiter");
132132
return0;
133133
}
134+
if (newbytes==0) {
135+
elog(ERROR,"Arbiter closed connection during recv");
136+
return0;
137+
}
134138
recved+=newbytes;
135139
}
136140

@@ -147,6 +151,10 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results) {
147151
elog(ERROR,"Failed to recv results body from arbiter");
148152
return0;
149153
}
154+
if (newbytes==0) {
155+
elog(ERROR,"Arbiter closed connection during recv");
156+
return0;
157+
}
150158
recved+=newbytes;
151159
}
152160
returnneeded /sizeof(xid_t);
@@ -156,6 +164,7 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
156164
{
157165
va_listargv;
158166
inti;
167+
intsent;
159168
charbuf[COMMAND_BUFFER_SIZE];
160169
intdatasize;
161170
char*cursor=buf;
@@ -181,7 +190,16 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
181190
assert(msg->size+sizeof(ShubMessageHdr)==datasize);
182191
assert(datasize <=COMMAND_BUFFER_SIZE);
183192

184-
returnwrite(dtm->sock,buf,datasize)==datasize;
193+
sent=0;
194+
while (sent<datasize) {
195+
intnewbytes=write(dtm->sock,buf+sent,datasize-sent);
196+
if (newbytes==-1) {
197+
elog(ERROR,"Failed to send a command to arbiter");
198+
return false;
199+
}
200+
sent+=newbytes;
201+
}
202+
return true;
185203
}
186204

187205
voidDtmGlobalConfig(char*host,intport,char*sock_dir) {
@@ -395,7 +413,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
395413
caseRES_TRANSACTION_INPROGRESS:
396414
returnTRANSACTION_STATUS_IN_PROGRESS;
397415
caseRES_TRANSACTION_UNKNOWN:
398-
returnTRANSACTION_STATUS_IN_PROGRESS;
416+
returnTRANSACTION_STATUS_UNKNOWN;
399417
default:
400418
gotofailure;
401419
}

‎contrib/pg_dtm/tests/transfers.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func prepare_one(connstr string, wg *sync.WaitGroup) {
207207
}
208208
exec(conn,"drop table if exists t")
209209
exec(conn,"create table t(u int primary key, v int)")
210-
exec(conn,"insert into t (select generate_series(0,1000000), $1)",cfg.Accounts.Balance)
210+
exec(conn,"insert into t (select generate_series(0,$1-1), $2)",cfg.Accounts.Num,cfg.Accounts.Balance)
211211
/*
212212
exec(conn, "begin transaction isolation level " + cfg.Isolation)
213213

‎contrib/pg_dtm/tests/transfers.sh‎

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#!/bin/sh
2+
23
go run transfers.go \
34
-d'dbname=postgres port=5432' \
45
-d'dbname=postgres port=5433' \
5-
-m \
6-
-w 128 \
6+
-v \
7+
-m \
8+
-u 1000 \
9+
-w 10 \
710
-g

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp