Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commiteeeb868

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents1e3909c +c2f8668 commiteeeb868

File tree

13 files changed

+131
-81
lines changed

13 files changed

+131
-81
lines changed

‎contrib/pg_xtm/dtmd/include/transaction.h‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ typedef struct Transaction {
1515
xid_txid;
1616

1717
intsize;// number of paritcipants
18-
intmax_size;// maximal number of participants
1918

2019
// for + against ≤ size
2120
intvotes_for;

‎contrib/pg_xtm/dtmd/src/main.c‎

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -257,18 +257,11 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
257257
);
258258

259259
CHECK(
260-
cmd->argc==1,
260+
cmd->argc==0,
261261
clientdata,
262262
"BEGIN: wrong number of arguments"
263263
);
264264

265-
intsize=cmd->argv[0];
266-
CHECK(
267-
size <=MAX_NODES,
268-
clientdata,
269-
"BEGIN: 'size' > MAX_NODES"
270-
);
271-
272265
CHECK(
273266
CLIENT_XID(clientdata)==INVALID_XID,
274267
clientdata,
@@ -280,7 +273,6 @@ static char *onbegin(void *stream, void *clientdata, cmd_t *cmd) {
280273

281274
prev_gxid=t->xid=next_gxid++;
282275
t->snapshots_count=0;
283-
t->max_size=size;
284276
t->size=1;
285277

286278
CLIENT_SNAPSENT(clientdata)=0;
@@ -438,11 +430,6 @@ static char *onsnapshot(void *stream, void *clientdata, cmd_t *cmd) {
438430
CLIENT_SNAPSENT(clientdata)=0;
439431
CLIENT_XID(clientdata)=t->xid;
440432
t->size+=1;
441-
CHECK(
442-
t->size <=t->max_size,
443-
clientdata,
444-
"SNAPSHOT: too many participants"
445-
);
446433
}
447434

448435
CHECK(

‎contrib/pg_xtm/dtmd/src/transaction.c‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ void transaction_clear(Transaction *t) {
2828

2929
t->xid=INVALID_XID;
3030
t->size=0;
31-
t->max_size=0;
3231
t->votes_for=0;
3332
t->votes_against=0;
3433
t->snapshots_count=0;

‎contrib/pg_xtm/libdtm.c‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,15 @@ void DtmInitSnapshot(Snapshot snapshot)
281281
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
282282
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
283283
// otherwise.
284-
TransactionIdDtmGlobalStartTransaction(intnParticipants,Snapshotsnapshot,TransactionId*gxmin)
284+
TransactionIdDtmGlobalStartTransaction(Snapshotsnapshot,TransactionId*gxmin)
285285
{
286286
boolok;
287287
xid_txid;
288288
xid_tnumber;
289289
DTMConndtm=GetConnection();
290290

291291
// query
292-
if (!dtm_query(dtm,'b',1,nParticipants)) gotofailure;
292+
if (!dtm_query(dtm,'b',0)) gotofailure;
293293

294294
// response
295295
if (!dtm_read_bool(dtm,&ok)) gotofailure;

‎contrib/pg_xtm/libdtm.h‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
voidDtmInitSnapshot(Snapshotsnapshot);
1212

13-
// Starts a new global transaction of nParticipants size. Returns the
13+
// Starts a new global transaction. Returns the
1414
// transaction id, fills the 'snapshot' and 'gxmin' on success. 'gxmin' is the
1515
// smallest xmin among all snapshots known to DTM. Returns INVALID_XID
1616
// otherwise.
17-
TransactionIdDtmGlobalStartTransaction(intnParticipants,Snapshotsnapshot,TransactionId*gxmin);
17+
TransactionIdDtmGlobalStartTransaction(Snapshotsnapshot,TransactionId*gxmin);
1818

1919
// Asks the DTM for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on
2020
// success. 'gxmin' is the smallest xmin among all snapshots known to DTM.

‎contrib/pg_xtm/pg_dtm--1.0.sql‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use"CREATE EXTENSION pg_dtm" to load this file. \quit
33

4-
CREATEFUNCTIONdtm_begin_transaction(n_participantsinteger) RETURNSinteger
4+
CREATEFUNCTIONdtm_begin_transaction() RETURNSinteger
55
AS'MODULE_PATHNAME','dtm_begin_transaction'
66
LANGUAGE C;
77

‎contrib/pg_xtm/pg_dtm.c‎

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ static bool DtmTransactionIdIsInProgress(TransactionId xid);
7474
staticTransactionIdDtmGetNextXid(void);
7575
staticTransactionIdDtmGetNewTransactionId(boolisSubXact);
7676
staticTransactionIdDtmGetOldestXmin(Relationrel,boolignoreVacuum);
77+
staticTransactionIdDtmGetGlobalTransactionId(void);
7778

7879
staticboolTransactionIdIsInSnapshot(TransactionIdxid,Snapshotsnapshot);
7980
staticboolTransactionIdIsInDoubt(TransactionIdxid);
@@ -92,7 +93,7 @@ static bool DtmGlobalXidAssigned;
9293
staticintDtmLocalXidReserve;
9394
staticintDtmCurcid;
9495
staticSnapshotDtmLastSnapshot;
95-
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,DtmTransactionIdIsInProgress };
96+
staticTransactionManagerDtmTM= {DtmGetTransactionStatus,DtmSetTransactionStatus,DtmGetSnapshot,DtmGetNewTransactionId,DtmGetOldestXmin,DtmTransactionIdIsInProgress,DtmGetGlobalTransactionId };
9697

9798

9899
#defineXTM_TRACE(fmt, ...)
@@ -323,6 +324,12 @@ static TransactionId DtmGetNextXid()
323324
returnxid;
324325
}
325326

327+
TransactionId
328+
DtmGetGlobalTransactionId()
329+
{
330+
returnDtmNextXid;
331+
}
332+
326333
TransactionId
327334
DtmGetNewTransactionId(boolisSubXact)
328335
{
@@ -667,8 +674,8 @@ static void DtmInitialize()
667674
staticvoid
668675
DtmXactCallback(XactEventevent,void*arg)
669676
{
677+
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
670678
if (event==XACT_EVENT_COMMIT||event==XACT_EVENT_ABORT) {
671-
XTM_INFO("%d: DtmXactCallbackevent=%d isGlobal=%d, nextxid=%d\n",getpid(),event,DtmGlobalXidAssigned,DtmNextXid);
672679
if (DtmGlobalXidAssigned) {
673680
DtmGlobalXidAssigned= false;
674681
}elseif (TransactionIdIsValid(DtmNextXid)) {
@@ -780,10 +787,9 @@ dtm_get_current_snapshot_xmax(PG_FUNCTION_ARGS)
780787
Datum
781788
dtm_begin_transaction(PG_FUNCTION_ARGS)
782789
{
783-
intnParticipants=PG_GETARG_INT32(0);
784790
Assert(!TransactionIdIsValid(DtmNextXid));
785791

786-
DtmNextXid=DtmGlobalStartTransaction(nParticipants,&DtmSnapshot,&dtm->minXid);
792+
DtmNextXid=DtmGlobalStartTransaction(&DtmSnapshot,&dtm->minXid);
787793
Assert(TransactionIdIsValid(DtmNextXid));
788794
XTM_INFO("%d: Start global transaction %d, dtm->minXid=%d\n",getpid(),DtmNextXid,dtm->minXid);
789795

‎contrib/pg_xtm/sockhub/sockhub.c‎

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,11 @@ static void reconnect(Shub* shub)
125125
}while (rc<0&&errno==EINTR);
126126

127127
if (rc >=0||errno==EINPROGRESS) {
128-
if (rc >=0) {
129-
}
130128
break;
131129
}
132130
}
133131
if (rc<0) {
134-
if (errno!=ENOENT&&errno!=ECONNREFUSED) {
132+
if (errno!=ENOENT&&errno!=ECONNREFUSED&&errno!=EINPROGRESS) {
135133
shub->params->error_handler("Connection can not be establish",SHUB_FATAL_ERROR);
136134
}
137135
if (max_attempts--!=0) {
@@ -187,6 +185,7 @@ void ShubInitialize(Shub* shub, ShubParams* params)
187185
FD_ZERO(&shub->inset);
188186
FD_SET(shub->input,&shub->inset);
189187

188+
shub->output=-1;
190189
reconnect(shub);
191190

192191
shub->in_buffer=malloc(params->buffer_size);
@@ -207,52 +206,58 @@ void ShubLoop(Shub* shub)
207206
while (1) {
208207
fd_setevents;
209208
structtimevaltm;
210-
inti,max_fd,rc;
211-
unsignedintn,size;
209+
inti,rc;
210+
intmax_fd=shub->max_fd;
212211

213212
tm.tv_sec=shub->params->delay/1000;
214213
tm.tv_usec=shub->params->delay %1000*1000;
215214

216215
events=shub->inset;
217-
rc=select(shub->max_fd+1,&events,NULL,NULL,shub->in_buffer_used==0 ?NULL :&tm);
216+
rc=select(max_fd+1,&events,NULL,NULL,shub->in_buffer_used==0 ?NULL :&tm);
218217
if (rc<0) {
219218
if (errno!=EINTR) {
220219
shub->params->error_handler("Select failed",SHUB_RECOVERABLE_ERROR);
221220
recovery(shub);
222221
}
223222
}else {
224223
if (rc>0) {
225-
for (i=0,max_fd=shub->max_fd;i <=max_fd;i++) {
224+
for (i=0;i <=max_fd;i++) {
226225
if (FD_ISSET(i,&events)) {
227-
if (i==shub->input) {
226+
if (i==shub->input) {/* accept incomming connection */
228227
ints=accept(i,NULL,NULL);
229228
if (s<0) {
230229
shub->params->error_handler("Failed to accept socket",SHUB_RECOVERABLE_ERROR);
231230
}else {
232-
if (s>max_fd) {
231+
if (s>shub->max_fd) {
233232
shub->max_fd=s;
234233
}
235234
FD_SET(s,&shub->inset);
236235
}
237-
}elseif (i==shub->output) {
236+
}elseif (i==shub->output) {/* receive response from server */
237+
/* try to read as much as possible */
238238
intavailable=recv(shub->output,shub->out_buffer+shub->out_buffer_used,buffer_size-shub->out_buffer_used,0);
239239
intpos=0;
240240
if (available <=0) {
241241
shub->params->error_handler("Failed to read inet socket",SHUB_RECOVERABLE_ERROR);
242242
reconnect(shub);
243+
continue;
243244
}
244245
shub->out_buffer_used+=available;
246+
247+
/* loop through all received responses */
245248
while (pos+sizeof(ShubMessageHdr) <=shub->out_buffer_used) {
246-
ShubMessageHdr*hdr= (ShubMessageHdr*)(shub->out_buffer+pos);
249+
ShubMessageHdr*hdr= (ShubMessageHdr*)&shub->out_buffer[pos];
247250
intchan=hdr->chan;
248-
n=pos+sizeof(ShubMessageHdr)+hdr->size <=shub->out_buffer_used ?hdr->size+sizeof(ShubMessageHdr) :shub->out_buffer_used-pos;
251+
unsignedintn=pos+sizeof(ShubMessageHdr)+hdr->size <=shub->out_buffer_used
252+
?hdr->size+sizeof(ShubMessageHdr)
253+
:shub->out_buffer_used-pos;
249254
if (!write_socket(chan, (char*)hdr,n)) {
250255
shub->params->error_handler("Failed to write to local socket",SHUB_RECOVERABLE_ERROR);
251256
close_socket(shub,chan);
252257
chan=-1;
253258
}
254-
/* read rest of message if it doesn't fit in buffer */
255259
if (n!=hdr->size+sizeof(ShubMessageHdr)) {
260+
/* read rest of message if it doesn't fit in the buffer */
256261
inttail=hdr->size+sizeof(ShubMessageHdr)-n;
257262
do {
258263
n=tail<buffer_size ?tail :buffer_size;
@@ -274,56 +279,73 @@ void ShubLoop(Shub* shub)
274279
}
275280
pos+=n;
276281
}
282+
/* Move partly fetched message header (if any) to the beginning of byffer */
277283
memcpy(shub->out_buffer,shub->out_buffer+pos,shub->out_buffer_used-pos);
278284
shub->out_buffer_used-=pos;
279-
}else {
285+
}else {/* receive request from client */
280286
ShubMessageHdr*hdr= (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
281-
if (!read_socket(i, (char*)hdr,sizeof(ShubMessageHdr))) {
287+
intchan=i;
288+
if (!read_socket(chan, (char*)hdr,sizeof(ShubMessageHdr))) {/* fetch message header */
282289
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
283290
close_socket(shub,i);
284291
}else {
285-
size=hdr->size;
286-
hdr->chan=i;
292+
unsignedintsize=hdr->size;
293+
hdr->chan=chan;/* remember socket descriptor from which this message was read */
287294
if (size+shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
288-
if (shub->in_buffer_used!=0) {
295+
/* message doesn't completely fit in buffer */
296+
if (shub->in_buffer_used!=0) {/* if buffer is not empty...*/
297+
/* ... then send it */
289298
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
290299
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
291300
reconnect(shub);
292301
}
302+
/* move received message header to the beginning of the buffer */
293303
memcpy(shub->in_buffer,shub->in_buffer+shub->in_buffer_used,sizeof(ShubMessageHdr));
294304
shub->in_buffer_used=0;
295305
}
296306
}
297307
shub->in_buffer_used+=sizeof(ShubMessageHdr);
298308

299-
while (1) {
309+
do {
300310
unsignedintn=size+shub->in_buffer_used>buffer_size ?buffer_size-shub->in_buffer_used :size;
301-
if (!read_socket(i,shub->in_buffer+shub->in_buffer_used,n)) {
311+
/* fetch message body */
312+
if (chan >=0&& !read_socket(chan,shub->in_buffer+shub->in_buffer_used,n)) {
302313
shub->params->error_handler("Failed to read local socket",SHUB_RECOVERABLE_ERROR);
303-
close_socket(shub,i);
304-
break;
305-
}else {
306-
if (n!=size) {
307-
while (!write_socket(shub->output,shub->in_buffer,n)) {
308-
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
309-
reconnect(shub);
310-
}
311-
size-=n;
312-
shub->in_buffer_used=0;
313-
}else {
314-
shub->in_buffer_used+=n;
314+
close_socket(shub,chan);
315+
if (hdr!=NULL) {/* if message header is not yet sent to the server... */
316+
/* ... then skip this message */
317+
shub->in_buffer_used= (char*)hdr-shub->in_buffer;
315318
break;
319+
}else {/* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
320+
chan=-1;/* do not try to read rest of body of this message */
316321
}
322+
}
323+
shub->in_buffer_used+=n;
324+
size-=n;
325+
/* if there is no more free space in the buffer to receive new message header... */
326+
if (shub->in_buffer_used+sizeof(ShubMessageHdr)>buffer_size) {
327+
328+
/* ... then send buffer to the server */
329+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
330+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
331+
reconnect(shub);
332+
}
333+
hdr=NULL;/* message is partly sent to the server: can not skip it any more */
334+
shub->in_buffer_used=0;
317335
}
318-
}
336+
}while (size!=0);/* repeat until all message body is received */
319337
}
320338
}
321339
}
322340
}
323-
}elseif (shub->in_buffer_used!=0) {
324-
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
325-
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
326-
reconnect(shub);
341+
}else {/* timeout expired */
342+
if (shub->in_buffer_used!=0) {/* if buffer is not empty... */
343+
/* ...then send it */
344+
while (!write_socket(shub->output,shub->in_buffer,shub->in_buffer_used)) {
345+
shub->params->error_handler("Failed to write to inet socket",SHUB_RECOVERABLE_ERROR);
346+
reconnect(shub);
347+
}
348+
shub->in_buffer_used=0;
327349
}
328350
}
329351
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp