Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitf1cd506

Browse files
committed
Fix memory leaks at stream.c and dmq.c. Now pargres do huge benchmarks
1 parent5b85db5 commitf1cd506

File tree

13 files changed

+476
-182
lines changed

13 files changed

+476
-182
lines changed

‎contrib/pg_exchange/common.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include"common.h"
2525

2626

27+
MemoryContextmemory_context=NULL;
2728
ExchangeSharedState*ExchShmem=NULL;
2829

2930
staticbool

‎contrib/pg_exchange/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ typedef struct
4242
HTAB*htab;
4343
}ExchangeSharedState;
4444

45+
externMemoryContextmemory_context;
4546
externExchangeSharedState*ExchShmem;
4647

4748
boolplan_tree_walker(Plan*plan,bool (*walker) (),void*context);

‎contrib/pg_exchange/dmq.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -794,7 +794,11 @@ dmq_handle_message(StringInfo msg, shm_mq_handle **mq_handles, dsm_segment *seg)
794794
}
795795
}
796796

797-
#defineDMQ_RECV_BUFFER 8192
797+
/*
798+
* recv_buffer can be as large as possible. It is critical for message passing
799+
* effectiveness.
800+
*/
801+
#defineDMQ_RECV_BUFFER (8388608)/* 8 MB */
798802
staticcharrecv_buffer[DMQ_RECV_BUFFER];
799803
staticintrecv_bytes;
800804
staticintread_bytes;
@@ -1244,10 +1248,37 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
12441248
}
12451249

12461250
staticboolpush_state= false;
1247-
staticStringInfoDatabuf;
1251+
staticStringInfoDatabuf= {NULL,0,0,0};
1252+
1253+
/*
1254+
* _initStringInfo
1255+
*
1256+
* Replace call of initStringInfo() routine from stringinfo.c.
1257+
* We need larger strings and need to reduce memory allocations to optimize
1258+
* message passing.
1259+
*/
1260+
staticvoid
1261+
_initStringInfo(StringInfostr,size_tsize)
1262+
{
1263+
if (str->maxlen <=size)
1264+
{
1265+
size_tnewsize= (size*2<1024) ?1024 : (size*2);
1266+
1267+
if (str->data)
1268+
pfree(str->data);
1269+
1270+
/*
1271+
* We try to minimize str->data allocations. It can live all of the
1272+
* backend life.
1273+
*/
1274+
str->data= (char*)MemoryContextAlloc(TopMemoryContext,newsize);
1275+
str->maxlen=newsize;
1276+
}
1277+
resetStringInfo(str);
1278+
}
12481279

12491280
bool
1250-
dmq_push_buffer(DmqDestinationIddest_id,char*stream_name,
1281+
dmq_push_buffer(DmqDestinationIddest_id,constchar*stream_name,
12511282
constvoid*payload,size_tlen,boolnowait)
12521283
{
12531284
shm_mq_resultres;
@@ -1256,7 +1287,7 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name,
12561287
{
12571288
ensure_outq_handle();
12581289

1259-
initStringInfo(&buf);
1290+
_initStringInfo(&buf,len);
12601291
pq_sendbyte(&buf,dest_id);
12611292
pq_send_ascii_string(&buf,stream_name);
12621293
pq_sendbytes(&buf,payload,len);

‎contrib/pg_exchange/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ dmq_pop(DmqSenderId *sender_id, const void **msg, Size *len, uint64 mask,
4747
externbooldmq_pop_nb(DmqSenderId*sender_id,StringInfomsg,uint64mask);
4848

4949
externvoiddmq_push(DmqDestinationIddest_id,char*stream_name,char*msg);
50-
externbooldmq_push_buffer(DmqDestinationIddest_id,char*stream_name,
50+
externbooldmq_push_buffer(DmqDestinationIddest_id,constchar*stream_name,
5151
constvoid*buffer,size_tlen,boolnowait);
5252

5353
typedefvoid (*dmq_receiver_hook_type) (constchar*);

‎contrib/pg_exchange/exchange.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,8 @@ init_state_ifany(ExchangeState *state)
10711071
state->hasLocal= true;
10721072
state->init= true;
10731073
}
1074-
intprint1=0;
1074+
1075+
#include"postmaster/postmaster.h"
10751076
staticTupleTableSlot*
10761077
EXCHANGE_Execute(CustomScanState*node)
10771078
{
@@ -1104,13 +1105,11 @@ EXCHANGE_Execute(CustomScanState *node)
11041105
returnnode->ss.ss_ScanTupleSlot;
11051106
case1:
11061107
state->activeRemotes--;
1107-
//elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",\
1108+
//elog(LOG, "[%s %d] GOT NULL. activeRemotes: %d, lt=%d, rt=%d hasLocal=%hhu st=%d",
11081109
//state->stream, state->mode, state->activeRemotes,
11091110
//state->ltuples,
11101111
//state->rtuples, state->hasLocal, state->stuples);
11111112
break;
1112-
case2:/* Close EXCHANGE channel */
1113-
break;
11141113
default:
11151114
/* Any system message */
11161115
break;
@@ -1124,15 +1123,17 @@ EXCHANGE_Execute(CustomScanState *node)
11241123

11251124
if (TupIsNull(slot))
11261125
{
1127-
inti;
1128-
//elog(LOG, "[%s] FINISH Local store: l=%d, r=%d s=%d",
1126+
//elog(LOG, "[%s] FINISH Local store: l=%d, r=%d s=%d, activeRemotes=%d",
11291127
//state->stream, state->ltuples,
1130-
//state->rtuples, state->stuples);
1128+
//state->rtuples, state->stuples, state->activeRemotes);
11311129
if (state->mode!=EXCH_STEALTH)
1130+
{
1131+
inti;
1132+
11321133
for (i=0;i<state->dests->nservers;i++)
11331134
SendByteMessage(state->dests->dests[i].dest_id,
1134-
state->stream,END_OF_TUPLES);
1135-
1135+
state->stream,END_OF_TUPLES, false);
1136+
}
11361137
state->hasLocal= false;
11371138
continue;
11381139
}
@@ -1166,9 +1167,11 @@ EXCHANGE_Execute(CustomScanState *node)
11661167
{
11671168
inti;
11681169
state->stuples++;
1170+
11691171
/* Send tuple to each server that involved. Himself too. */
11701172
for (i=0;i<state->dests->nservers;i++)
1171-
SendTuple(state->dests->dests[i].dest_id,state->stream,slot, false);
1173+
SendTuple(state->dests->dests[i].dest_id,state->stream,slot);
1174+
11721175
returnslot;
11731176
}
11741177
else
@@ -1179,7 +1182,7 @@ EXCHANGE_Execute(CustomScanState *node)
11791182
else
11801183
{
11811184
state->stuples++;
1182-
SendTuple(dest,state->stream,slot, false);
1185+
SendTuple(dest,state->stream,slot);
11831186
}
11841187
}
11851188
returnNULL;

‎contrib/pg_exchange/nodeDistPlanExec.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ init_exchange_channel(PlanState *node, void *context)
854854
else
855855
state->indexes[i]=j;
856856

857-
SendByteMessage(dmq_data->dests[j].dest_id,state->stream,ib);
857+
SendByteMessage(dmq_data->dests[j].dest_id,state->stream,ib, true);
858858
}
859859

860860
for (i=0;i<state->nnodes;i++)

‎contrib/pg_exchange/pg_exchange.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ _PG_init(void)
101101

102102
old_dmq_receiver_stop_hook=dmq_receiver_stop_hook;
103103
dmq_receiver_stop_hook=OnNodeDisconnect;
104+
105+
memory_context=AllocSetContextCreate(TopMemoryContext,"PG_EXCHANGE_MEMCONTEXT",ALLOCSET_DEFAULT_SIZES*8);
104106
}
105107

106108
Datum

‎contrib/pg_exchange/sbuf.c

Lines changed: 179 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,187 @@
33
*
44
*/
55

6+
#include"postgres.h"
7+
8+
#include"access/htup_details.h"
9+
#include"nodes/pg_list.h"
10+
#include"utils/memutils.h"/* MemoryContexts */
11+
12+
#include"common.h"
613
#include"sbuf.h"
714

15+
typedefstruct
16+
{
17+
StreamDataPackageheader;
18+
intntuples;
19+
char*curptr;
20+
chardata[FLEXIBLE_ARRAY_MEMBER];
21+
}TupleBuffer;
22+
23+
#defineTupleBufferMinSize offsetof(TupleBuffer, data)
24+
#defineSDP_FREE_SPACE(buf) (StorageSize(buf) - (buf->curptr - &buf->data[0]))
25+
26+
staticList*freebufs=NIL;
27+
28+
staticint
29+
StorageSize(constTupleBuffer*buf)
30+
{
31+
return (SDP_Size(buf)-TupleBufferMinSize);
32+
}
33+
34+
bool
35+
SDP_IsEmpty(constStreamDataPackage*buffer)
36+
{
37+
TupleBuffer*buf= (TupleBuffer*)buffer;
38+
39+
Assert(buf->ntuples >=0);
40+
Assert(SDP_FREE_SPACE(buf) >=0);
41+
returnbuf->ntuples==0;
42+
}
43+
44+
int
45+
SDP_Actual_size(constStreamDataPackage*buffer)
46+
{
47+
TupleBuffer*buf;
48+
49+
if (buffer->datalen <=1)
50+
returnSDP_Size(buffer);
51+
Assert(IsSDPBuf(buffer));
52+
53+
buf= (TupleBuffer*)buffer;
54+
returnSDP_Size(buf)-SDP_FREE_SPACE(buf);
55+
}
56+
57+
/*
58+
* Check correctness of Stream Data Package
59+
*/
60+
bool
61+
IsSDPBuf(constStreamDataPackage*buffer)
62+
{
63+
TupleBuffer*buf;
64+
65+
if (buffer==NULL||buffer->datalen <=1)
66+
return false;
67+
68+
buf= (TupleBuffer*)buffer;
69+
70+
if (buf->curptr==NULL||buf->ntuples<0)
71+
return false;
72+
73+
return true;
74+
}
75+
76+
StreamDataPackage*
77+
SDP_Alloc(intsize)
78+
{
79+
ListCell*lc;
80+
TupleBuffer*buf=NULL;
81+
MemoryContextOldMemoryContext;
82+
83+
OldMemoryContext=MemoryContextSwitchTo(memory_context);
84+
85+
/* To avoid palloc/free overheads we can store buffers */
86+
for (lc=list_head(freebufs);lc!=NULL;lc=lnext(lc))
87+
{
88+
TupleBuffer*freebuf= (TupleBuffer*)lfirst(lc);
89+
90+
if (freebuf->header.datalen<size)
91+
continue;
92+
93+
buf=freebuf;
94+
freebufs=list_delete_ptr(freebufs,freebuf);
95+
break;
96+
}
97+
98+
if (buf==NULL)
99+
{
100+
size=Max((TupleBufferMinSize+size),DEFAULT_PACKAGE_SIZE);
101+
102+
/* No one buffer can be found */
103+
buf=palloc0(size+SDPHeaderSize);
104+
buf->header.datalen=size;
105+
}
106+
107+
buf->header.index=-1;
108+
buf->curptr=&buf->data[0];
109+
buf->ntuples=0;
110+
MemoryContextSwitchTo(OldMemoryContext);
111+
return (StreamDataPackage*)buf;
112+
}
113+
114+
/*
115+
* Return buffer to the free buffers list
116+
*/
8117
void
9-
initTupleBuffer(TupleBuffer*tbuf,size_tmem_size)
118+
SDP_Free(StreamDataPackage*buffer)
119+
{
120+
TupleBuffer*buf= (TupleBuffer*)buffer;
121+
MemoryContextOldMemoryContext;
122+
123+
OldMemoryContext=MemoryContextSwitchTo(memory_context);
124+
125+
Assert(IsSDPBuf(buffer));
126+
buf->curptr=NULL;
127+
buf->ntuples=-1;
128+
freebufs=lappend(freebufs,buf);
129+
130+
MemoryContextSwitchTo(OldMemoryContext);
131+
}
132+
133+
bool
134+
SDP_Store(StreamDataPackage*buffer,HeapTupletuple)
10135
{
11-
tbuf->curptr=&tbuf->data;
12-
/* Will corrected before send to DMQ for 'trim tails' purpose. */
13-
tbuf->size=mem_size;
136+
TupleBuffer*buf= (TupleBuffer*)buffer;
137+
HeapTupledest;
138+
139+
Assert(buf!=NULL&&tuple!=NULL);
140+
/* Check that user not pass pointer to non-allocated buffer */
141+
Assert(buf->curptr!=NULL&&buf->ntuples >=0);
142+
143+
if (SDP_FREE_SPACE(buf)<HEAPTUPLESIZE+tuple->t_len)
144+
return true;
145+
146+
dest= (HeapTuple)buf->curptr;
147+
dest->t_len=tuple->t_len;
148+
dest->t_self=tuple->t_self;
149+
dest->t_tableOid=tuple->t_tableOid;
150+
dest->t_data= (HeapTupleHeader) ((char*)dest+HEAPTUPLESIZE);
151+
buf->curptr+=HEAPTUPLESIZE;
152+
memcpy((char*)dest->t_data, (char*)tuple->t_data,tuple->t_len);
153+
154+
buf->curptr+=tuple->t_len;
155+
buf->ntuples++;
156+
Assert(SDP_FREE_SPACE(buf) >=0);
157+
158+
return false;
159+
}
160+
161+
void
162+
SDP_PrepareToRead(StreamDataPackage*buffer)
163+
{
164+
TupleBuffer*buf= (TupleBuffer*)buffer;
165+
166+
Assert(IsSDPBuf(buffer));
167+
buf->curptr=buf->data;
168+
}
169+
170+
HeapTuple
171+
SDP_Get_tuple(StreamDataPackage*buffer)
172+
{
173+
TupleBuffer*buf;
174+
HeapTupletuple;
175+
176+
Assert(IsSDPBuf(buffer));
177+
178+
if (SDP_IsEmpty(buffer))
179+
returnNULL;
180+
181+
buf= (TupleBuffer*)buffer;
182+
tuple= (HeapTuple)buf->curptr;
183+
tuple->t_data= (HeapTupleHeader) ((char*)tuple+HEAPTUPLESIZE);
184+
185+
buf->curptr+=TupSize(tuple);
186+
buf->ntuples--;
187+
188+
returntuple;
14189
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp