Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit5b85db5

Browse files
committed
Bugfix: huge memory allocations on performance benchmark
1 parente8d526b commit5b85db5

File tree

7 files changed

+66
-45
lines changed

7 files changed

+66
-45
lines changed

‎contrib/pg_exchange/dmq.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,7 +1379,7 @@ DmqSenderId
13791379
dmq_attach_receiver(constchar*sender_name,intmask_pos)
13801380
{
13811381
inti;
1382-
inthandle_id;
1382+
inthandle_id=-1;
13831383

13841384
/* Search for existed receiver. */
13851385
for (i=0;i<dmq_local.n_inhandles;i++)
@@ -1520,19 +1520,19 @@ dmq_remote_id(const char *name)
15201520
}
15211521

15221522
/*
1523-
* Get a message from input queue.Execution blocking until message will not
1524-
* received. Returns false, if an error is occured.
1523+
* Get a message from input queue.If waitMsg = true, execution blocking until
1524+
*message will notreceived. Returns false, if an error is occured.
15251525
*
15261526
* sender_id - identifier of the received message sender.
15271527
* msg - pointer to local buffer that contains received message.
15281528
* len - size of received message.
15291529
*/
15301530
constchar*
1531-
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask,
1531+
dmq_pop(DmqSenderId*sender_id,constvoid**msg,Size*len,uint64mask,
15321532
boolwaitMsg)
15331533
{
15341534
shm_mq_resultres;
1535-
constchar*stream;
1535+
char*stream;
15361536

15371537
Assert(msg&&len);
15381538

@@ -1559,12 +1559,14 @@ dmq_pop(DmqSenderId *sender_id, void **msg, Size *len, uint64 mask,
15591559
if (res==SHM_MQ_SUCCESS)
15601560
{
15611561
/*
1562+
* Set message pointer and length.
15621563
* Stream name is first null-terminated string in
15631564
* the message buffer.
15641565
*/
1565-
stream=data;
1566-
*msg= (void*) (stream+strlen(stream)+1);
1567-
*len-= (char*)(*msg)-stream;
1566+
stream= (char*)data;
1567+
*len-= (strlen(stream)+1);
1568+
Assert(*len>0);
1569+
*msg= ((char*)data+strlen(stream)+1);
15681570
*sender_id=i;
15691571

15701572
mtm_log(DmqTraceIncoming,

‎contrib/pg_exchange/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ extern char *dmq_receiver_name(DmqDestinationId dest_id);
4242
externDmqDestinationIddmq_remote_id(constchar*name);
4343

4444
externconstchar*
45-
dmq_pop(DmqSenderId*sender_id,void**msg,Size*len,uint64mask,
45+
dmq_pop(DmqSenderId*sender_id,constvoid**msg,Size*len,uint64mask,
4646
boolwaitMsg);
4747
externbooldmq_pop_nb(DmqSenderId*sender_id,StringInfomsg,uint64mask);
4848

‎contrib/pg_exchange/exchange.c

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include"optimizer/cost.h"
2929
#include"optimizer/pathnode.h"
3030
#include"partitioning/partbounds.h"
31+
#include"postgres_fdw.h"
3132
#include"utils/lsyscache.h"
3233
#include"utils/rel.h"
3334
#include"utils/syscache.h"
@@ -514,9 +515,17 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
514515
break;
515516

516517
caseT_ForeignPath:
518+
{
519+
PgFdwRelationInfo*fpinfo=
520+
(PgFdwRelationInfo*)subpath->parent->fdw_private;
521+
517522
serverid=subpath->parent->serverid;
518523
tmpPath=make_local_scan_path(tmpLocalScanPath,
519524
subpath->parent,&indexinfo);
525+
Assert(subpath->parent->fdw_private!=NULL);
526+
tmpPath->rows=fpinfo->rows;
527+
tmpPath->total_cost+=fpinfo->total_cost-fpinfo->startup_cost;
528+
}
520529
break;
521530

522531
default:
@@ -539,7 +548,6 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
539548
PATH_REQ_OUTER(tmpLocalScanPath),0, false,
540549
((AppendPath*)path)->partitioned_rels,-1);
541550
path= (Path*)create_exchange_path(root,rel, (Path*)ap,EXCH_GATHER);
542-
543551
set_exchange_altrel(EXCH_GATHER, (ExchangePath*)path,rel,NULL,NULL,
544552
servers);
545553

@@ -559,6 +567,7 @@ exchange_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEnt
559567
path= (Path*)create_distexec_path(root,rel,path,servers);
560568

561569
distributed_pathlist=lappend(distributed_pathlist,path);
570+
bms_free(servers);
562571
}
563572
returndistributed_pathlist;
564573
}
@@ -607,13 +616,13 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, ExchangePath *expath)
607616
* subtree M/N local tuples, send to network [M-M/N] tuples and same to
608617
* receive.
609618
*/
610-
path->rows /=expath->altrel.nparts;
619+
//path->rows /= expath->altrel.nparts;
611620
instances=expath->altrel.nparts;
612621
send_rows=path->rows- (path->rows/instances);
613622
received_rows=send_rows;
614623
local_rows=path->rows/instances;
615624
path->total_cost+= (send_rows+local_rows)*cpu_tuple_cost;
616-
path->total_cost+= (received_rows)*cpu_tuple_cost*4.;
625+
path->total_cost+= (received_rows)*cpu_tuple_cost*10.;
617626
}
618627
break;
619628
default:
@@ -1062,11 +1071,10 @@ init_state_ifany(ExchangeState *state)
10621071
state->hasLocal= true;
10631072
state->init= true;
10641073
}
1065-
1074+
intprint1=0;
10661075
staticTupleTableSlot*
10671076
EXCHANGE_Execute(CustomScanState*node)
10681077
{
1069-
ScanState*ss=&node->ss;
10701078
ScanState*subPlanState=linitial(node->custom_ps);
10711079
ExchangeState*state= (ExchangeState*)node;
10721080
boolreadRemote= false;
@@ -1080,32 +1088,34 @@ EXCHANGE_Execute(CustomScanState *node)
10801088

10811089
readRemote= !readRemote;
10821090

1083-
if ((state->activeRemotes>0)&&readRemote)
1091+
if ((state->activeRemotes>0)/*&& readRemote */)
10841092
{
10851093
intstatus;
1094+
status=RecvTuple(state->stream,node->ss.ss_ScanTupleSlot);
10861095

1087-
slot=RecvTuple(ss->ss_ScanTupleSlot->tts_tupleDescriptor,
1088-
state->stream,&status);
10891096
switch (status)
10901097
{
10911098
case-1:
10921099
/* No tuples currently */
10931100
break;
10941101
case0:
1095-
Assert(!TupIsNull(slot));
1102+
Assert(!TupIsNull(node->ss.ss_ScanTupleSlot));
10961103
state->rtuples++;
1097-
returnslot;
1104+
returnnode->ss.ss_ScanTupleSlot;
10981105
case1:
10991106
state->activeRemotes--;
1100-
//elog(LOG, "[%s] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d", state->stream,
1101-
//state->activeRemotes, state->ltuples, state->rtuples, state->hasLocal, state->stuples);
1107+
//elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",\
1108+
//state->stream, state->mode, state->activeRemotes,
1109+
//state->ltuples,
1110+
//state->rtuples, state->hasLocal, state->stuples);
11021111
break;
11031112
case2:/* Close EXCHANGE channel */
11041113
break;
11051114
default:
11061115
/* Any system message */
11071116
break;
11081117
}
1118+
slot=NULL;
11091119
}
11101120

11111121
if ((state->hasLocal)&& (!readRemote))
@@ -1170,7 +1180,6 @@ EXCHANGE_Execute(CustomScanState *node)
11701180
{
11711181
state->stuples++;
11721182
SendTuple(dest,state->stream,slot, false);
1173-
//elog(LOG, "Send tuple: %d", state->stuples);
11741183
}
11751184
}
11761185
returnNULL;
@@ -1241,7 +1250,7 @@ EXCHANGE_Explain(CustomScanState *node, List *ancestors, ExplainState *es)
12411250
}
12421251

12431252
appendStringInfo(&str,"mode: %s, stream: %s. ",mode,state->stream);
1244-
appendStringInfo(&str,"qual: %s.",nodeToString(state->partexprs));
1253+
//appendStringInfo(&str, "qual: %s.", nodeToString(state->partexprs));
12451254
ExplainPropertyText("Exchange",str.data,es);
12461255
}
12471256

‎contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,7 @@ init_exchange_channel(PlanState *node, void *context)
864864
if (j >=0)
865865
{
866866
charc;
867+
867868
while ((c=RecvByteMessage(state->stream,dmq_data->dests[j].node))==0);
868869
Assert(c=='I');
869870
}

‎contrib/pg_exchange/stream.c

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ Stream_unsubscribe(const char *streamName)
100100
istreams=list_delete_ptr(istreams,istream);
101101
ostreams=list_delete_ptr(ostreams,ostream);
102102
dmq_stream_unsubscribe(streamName);
103+
list_free(istream->deliveries);
104+
list_free(istream->msgs);
103105
pfree(istream);
104106
pfree(ostream);
105107
return true;
@@ -112,23 +114,29 @@ static void
112114
RecvIfAny(void)
113115
{
114116
constchar*streamName;
115-
SendBuf*msg;
117+
constSendBuf*msg;
116118
Sizelen;
117119
DmqSenderIdsender_id;
118120
IStream*istream;
119121

120122
/* Try to receive a message */
121123
for (;;)
122124
{
123-
streamName=dmq_pop(&sender_id, (void**)(&msg),&len,UINT64_MAX, false);
125+
streamName=dmq_pop(&sender_id, (constvoid**)&msg,&len,UINT64_MAX, false);
124126
if (!streamName)
125-
/* No messagesarrived*/
127+
/* No messages */
126128
return;
127129

128-
/*Any message was received */
130+
/*Message has been received */
129131
Assert(len >=MinSizeOfSendBuf);
130132
istream= (IStream*)get_stream(istreams,streamName);
131-
Assert(istream!=NULL);
133+
134+
if (istream==NULL)
135+
{
136+
/* We can't lose any data except resended byte messages. */
137+
Assert(msg->datalen <=1);
138+
return;
139+
}
132140

133141
if ((msg->index>istream->indexes[sender_id])||IsDeliveryMessage(msg))
134142
{
@@ -137,13 +145,13 @@ RecvIfAny(void)
137145
buf=palloc(MinSizeOfRecvBuf+len-MinSizeOfSendBuf);
138146
buf->index=msg->index;
139147
buf->sid=sender_id;
148+
buf->datalen=msg->datalen;
140149

141150
if (IsDeliveryMessage(msg))
142151
istream->deliveries=lappend(istream->deliveries,buf);
143152
else
144153
{
145-
buf->datalen=len-MinSizeOfSendBuf;
146-
memcpy(&buf->data,&msg->data,buf->datalen);
154+
memcpy(buf->data,msg->data,buf->datalen);
147155
istream->msgs=lappend(istream->msgs,buf);
148156
istream->indexes[sender_id]=buf->index;
149157
}
@@ -249,6 +257,8 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
249257

250258
if (buf->needConfirm)
251259
ostream->buf=buf;
260+
else
261+
pfree(buf);
252262

253263
returnostream;
254264
}
@@ -291,11 +301,11 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
291301

292302
ostream->buf=buf;
293303
ostream->dest_id=dest_id;
294-
304+
//elog(LOG, "-> [%s] SendByteMessage: dest_id=%d, tag=%c", stream, dest_id, tag);
295305
while (!dmq_push_buffer(dest_id,stream,buf,buf_len(buf), true))
296306
RecvIfAny();
297-
298307
wait_for_delivery(ostream);
308+
//elog(LOG, "-> SendByteMessage: CONFIRMED");
299309
pfree(ostream->buf);
300310
ostream->buf=NULL;
301311
}
@@ -352,19 +362,19 @@ SendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
352362
* Receive tuple or message from any remote instance.
353363
* Returns NULL, if end-of-transfer received from a instance.
354364
*/
355-
TupleTableSlot*
356-
RecvTuple(TupleDesctupdesc,char*streamName,int*status)
365+
int
366+
RecvTuple(char*streamName,TupleTableSlot*slot)
357367
{
358368
IStream*istream;
359369
ListCell*lc;
360-
TupleTableSlot*slot=NULL;
361370
List*temp=NIL;
371+
intstatus;
362372

363373
RecvIfAny();
364374

365375
istream= (IStream*)get_stream(istreams,streamName);
366376
Assert(istream);
367-
*status=-1;/* No tuples from network */
377+
status=-1;/* No tuples from network */
368378

369379
foreach(lc,istream->msgs)
370380
{
@@ -382,25 +392,24 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
382392
{
383393
caseEND_OF_TUPLES:
384394
/* No tuples from network */
385-
*status=1;
395+
status=1;
386396
break;
387397
case'Q':
388-
*status=2;
398+
status=2;
389399
break;
390400
default:
391-
*status=3;
401+
status=3;
392402
break;
393403
}
394404

395405
break;
396406
}
397407

398408
Assert(buf->datalen>1);
399-
*status=0;
409+
status=0;
400410
tup=palloc(buf->datalen);
401-
memcpy(tup,buf->data,buf->datalen);
411+
memcpy(tup,&buf->data[0],buf->datalen);
402412
tup->t_data= (HeapTupleHeader) ((char*)tup+tupsize);
403-
slot=MakeSingleTupleTableSlot(tupdesc);
404413
slot=ExecStoreTuple((HeapTuple)tup,slot,InvalidBuffer, true);
405414
break;
406415
}
@@ -415,5 +424,5 @@ RecvTuple(TupleDesc tupdesc, char *streamName, int *status)
415424
}
416425
list_free(temp);
417426

418-
returnslot;
427+
returnstatus;
419428
}

‎contrib/pg_exchange/stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,5 @@ extern void SendByteMessage(DmqDestinationId dest_id, char *stream, char tag);
6262
externcharRecvByteMessage(constchar*streamName,constchar*sender);
6363
externvoidSendTuple(DmqDestinationIddest_id,char*stream,TupleTableSlot*slot,
6464
boolneedConfirm);
65-
externTupleTableSlot*RecvTuple(TupleDesctupdesc,char*streamName,int*status);
65+
externintRecvTuple(char*streamName,TupleTableSlot*slot);
6666
#endif/* CONTRIB_PG_EXCHANGE_STREAM_H_ */

‎src/backend/nodes/outfuncs.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static void
6060
write_oid_field(StringInfostr,Oidoid)
6161
{
6262
inti;
63-
char*rulename;
63+
char*rulename=NULL;
6464
Oidev_class=InvalidOid;
6565

6666
if (!portable_output)

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp