Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit260738c

Browse files
committed
Some corrections on optimizations
1 parent04dc7fe commit260738c

File tree

12 files changed

+40
-40
lines changed

12 files changed

+40
-40
lines changed

‎contrib/pg_exchange/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ EXTVERSION = 0.1
66
PGFILEDESC = "pg_exchange - an exchange custom node and rules for the planner"
77
MODULES = pg_exchange
88
OBJS =common.o dmq.o exchange.o expath.o hooks.o nodeDistPlanExec.o\
9-
nodeDummyscan.o partutils.o pg_exchange.o stream.o$(WIN32RES)
9+
nodeDummyscan.o partutils.o pg_exchange.osbuf.ostream.o$(WIN32RES)
1010

1111
fdw_srcdir =$(top_srcdir)/contrib/postgres_fdw/
1212

‎contrib/pg_exchange/dmq.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@
4747
#include"utils/dynahash.h"
4848
#include"utils/ps_status.h"
4949

50-
#defineDMQ_MQ_SIZE ((Size) 65536)
50+
//#define DMQ_MQ_SIZE ((Size) 65536)
51+
#defineDMQ_MQ_SIZE ((Size) 1048576)/* 1 MB */
52+
//#define DMQ_MQ_SIZE ((Size) 8388608) /* 8 MB */
53+
5154
#defineDMQ_MQ_MAGIC 0x646d71
5255

5356
// XXX: move to common
@@ -222,6 +225,7 @@ dmq_shmem_size(void)
222225
{
223226
Sizesize=0;
224227

228+
//size = add_size(size, DMQ_MQ_SIZE * DMQ_MAX_DESTINATIONS * 2);
225229
size=add_size(size,sizeof(structDmqSharedState));
226230
size=add_size(size,hash_estimate_size(DMQ_MAX_SUBS_PER_BACKEND*MaxBackends,
227231
sizeof(DmqStreamSubscription)));

‎contrib/pg_exchange/stream.c

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
*
44
*/
55

6+
#include"sbuf.h"
67
#include"stream.h"
78
#include"miscadmin.h"
89
#include"unistd.h"
910
#include"utils/memutils.h"/* MemoryContexts */
1011

11-
#defineIsDeliveryMessage(msg)(msg->tot_len ==MinSizeOfSendBuf)
12+
#defineIsDeliveryMessage(msg)(msg->datalen ==0)
1213

1314
staticList*istreams=NIL;
1415
staticList*ostreams=NIL;
@@ -41,6 +42,8 @@ get_stream(List *streams, const char *name)
4142
for (lc=list_head(streams);lc!=NULL;lc=lnext(lc))
4243
{
4344
char*streamName= (char*)lfirst(lc);
45+
46+
/* streamName is a first field of IStream and OStream structures. */
4447
if (strcmp(name,streamName)==0)
4548
returnstreamName;
4649
}
@@ -153,12 +156,13 @@ RecvIfAny(void)
153156
DmqDestinationIddest_id;
154157

155158
/* If message is not delivery message, send delivery. */
156-
dbuf.tot_len=MinSizeOfSendBuf;
159+
dbuf.datalen=0;
157160
dbuf.index=msg->index;
158161
dest_id=dmq_dest_id(sender_id);
159162
Assert(dest_id >=0);
160163

161-
dmq_push_buffer(dest_id,istream->streamName,&dbuf,dbuf.tot_len, false);
164+
dmq_push_buffer(dest_id,istream->streamName,&dbuf,
165+
MinSizeOfSendBuf, false);
162166
}
163167
}
164168
}
@@ -193,17 +197,18 @@ checkDelivery(OStream *ostream)
193197
{
194198
RecvBuf*buf=lfirst(lc);
195199

196-
istream->msgs=list_delete_ptr(istream->msgs,buf);
200+
istream->deliveries=list_delete_ptr(istream->deliveries,buf);
197201
pfree(buf);
198202
}
203+
list_free(temp);
199204
returnfound;
200205
}
201206

202207
staticvoid
203208
StreamRepeatSend(OStream*ostream)
204209
{
205210
while (!dmq_push_buffer(ostream->dest_id,ostream->streamName,ostream->buf,
206-
ostream->buf->tot_len, true))
211+
buf_len(ostream->buf), true))
207212
RecvIfAny();
208213
}
209214

@@ -215,36 +220,31 @@ ISendTuple(DmqDestinationId dest_id, char *stream, TupleTableSlot *slot,
215220
inttupsize;
216221
SendBuf*buf;
217222
OStream*ostream;
223+
inttot_len;
218224

219225
RecvIfAny();
220226

221227
ostream= (OStream*)get_stream(ostreams,stream);
222228
Assert(ostream&& !ostream->buf);
223229

224-
if (!TupIsNull(slot))
225-
{
226-
inttot_len;
230+
Assert(!TupIsNull(slot));
227231

228-
if (slot->tts_tuple==NULL)
229-
ExecMaterializeSlot(slot);
232+
if (slot->tts_tuple==NULL)
233+
ExecMaterializeSlot(slot);
230234

231-
tuple=slot->tts_tuple;
232-
tupsize= offsetof(HeapTupleData,t_data);
233-
234-
tot_len=MinSizeOfSendBuf+tupsize+tuple->t_len;
235-
buf=palloc(tot_len);
236-
buf->tot_len=tot_len;
237-
memcpy(buf->data,tuple,tupsize);
238-
memcpy(buf->data+tupsize,tuple->t_data,tuple->t_len);
239-
}
240-
else
241-
Assert(0);
235+
tuple=slot->tts_tuple;
236+
tupsize= offsetof(HeapTupleData,t_data);
237+
tot_len=MinSizeOfSendBuf+tupsize+tuple->t_len;
238+
buf=palloc(tot_len);
239+
buf->datalen=tot_len-MinSizeOfSendBuf;
240+
memcpy(buf->data,tuple,tupsize);
241+
memcpy(buf->data+tupsize,tuple->t_data,tuple->t_len);
242242

243243
buf->index=++(ostream->index);
244244
buf->needConfirm=needConfirm;
245245
ostream->dest_id=dest_id;
246246

247-
while (!dmq_push_buffer(dest_id,stream,buf,buf->tot_len, true))
247+
while (!dmq_push_buffer(dest_id,stream,buf,buf_len(buf), true))
248248
RecvIfAny();
249249

250250
if (buf->needConfirm)
@@ -284,15 +284,15 @@ SendByteMessage(DmqDestinationId dest_id, char *stream, char tag)
284284
Assert(ostream&& !ostream->buf);
285285

286286
buf=palloc(MinSizeOfSendBuf+1);
287-
buf->tot_len=MinSizeOfSendBuf+1;
287+
buf->datalen=1;
288288
buf->data[0]=tag;
289289
buf->index=++(ostream->index);
290290
buf->needConfirm= true;
291291

292292
ostream->buf=buf;
293293
ostream->dest_id=dest_id;
294294

295-
while (!dmq_push_buffer(dest_id,stream,buf,buf->tot_len, true))
295+
while (!dmq_push_buffer(dest_id,stream,buf,buf_len(buf), true))
296296
RecvIfAny();
297297

298298
wait_for_delivery(ostream);
@@ -335,7 +335,7 @@ RecvByteMessage(const char *streamName, const char *sender)
335335
*/
336336
void
337337
SendTuple(DmqDestinationIddest_id,char*stream,TupleTableSlot*slot,
338-
boolneedConfirm)
338+
boolneedConfirm)
339339
{
340340
OStream*ostream;
341341

‎contrib/pg_exchange/stream.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
typedefstructSendBuf
2222
{
2323
uint32index;
24-
uint32tot_len;
24+
uint32datalen;
2525
boolneedConfirm;
2626
chardata[FLEXIBLE_ARRAY_MEMBER];
2727
}SendBuf;
@@ -35,12 +35,13 @@ typedef struct RecvBuf
3535
}RecvBuf;
3636

3737
#defineMinSizeOfSendBuf offsetof(SendBuf, data)
38+
#definebuf_len(buf) (MinSizeOfSendBuf + buf->datalen)
3839
#defineMinSizeOfRecvBuf offsetof(RecvBuf, data)
3940

4041
typedefstruct
4142
{
4243
charstreamName[STREAM_NAME_MAX_LEN];
43-
uint64index;
44+
uint32index;
4445
SendBuf*buf;
4546
DmqDestinationIddest_id;
4647
}OStream;

‎src/backend/optimizer/path/joinpath.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ static void consider_parallel_mergejoin(PlannerInfo *root,
6868
JoinTypejointype,
6969
JoinPathExtraData*extra,
7070
Path*inner_cheapest_total);
71+
staticvoidhash_inner_and_outer(PlannerInfo*root,RelOptInfo*joinrel,
72+
RelOptInfo*outerrel,RelOptInfo*innerrel,
73+
JoinTypejointype,JoinPathExtraData*extra);
7174
staticList*select_mergejoin_clauses(PlannerInfo*root,
7275
RelOptInfo*joinrel,
7376
RelOptInfo*outerrel,
@@ -1670,7 +1673,7 @@ consider_parallel_nestloop(PlannerInfo *root,
16701673
* 'jointype' is the type of join to do
16711674
* 'extra' contains additional input values
16721675
*/
1673-
void
1676+
staticvoid
16741677
hash_inner_and_outer(PlannerInfo*root,
16751678
RelOptInfo*joinrel,
16761679
RelOptInfo*outerrel,

‎src/backend/optimizer/plan/planner.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1182,7 +1182,7 @@ inheritance_planner(PlannerInfo *root)
11821182
PlannerInfo**parent_roots=NULL;
11831183

11841184
Assert(parse->commandType!=CMD_INSERT);
1185-
elog(LOG,"inheritance_plnner()");
1185+
11861186
/*
11871187
* We generate a modified instance of the original Query for each target
11881188
* relation, plan that, and put all the plans into a list that will be

‎src/backend/storage/ipc/dsm_impl.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ dsm_impl_posix(dsm_op op, dsm_handle handle, Size request_size,
341341
*/
342342
if (errno==EINTR&&elevel >=ERROR)
343343
CHECK_FOR_INTERRUPTS();
344-
344+
Assert(0);
345345
ereport(elevel,
346346
(errcode_for_dynamic_shared_memory(),
347347
errmsg("could not resize shared memory segment \"%s\" to %zu bytes: %m",

‎src/backend/storage/ipc/latch.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
#include<poll.h>
4444
#endif
4545

46-
//#include "common/pg_socket.h"
4746
#include"miscadmin.h"
4847
#include"pgstat.h"
4948
#include"port/atomics.h"

‎src/backend/tcop/postgres.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2738,7 +2738,6 @@ StatementCancelHandler(SIGNAL_ARGS)
27382738
void
27392739
FloatExceptionHandler(SIGNAL_ARGS)
27402740
{
2741-
Assert(0);
27422741
/* We're not returning, so no need to save errno */
27432742
ereport(ERROR,
27442743
(errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
@@ -3190,7 +3189,6 @@ check_stack_depth(void)
31903189
{
31913190
if (stack_is_too_deep())
31923191
{
3193-
Assert(0);
31943192
ereport(ERROR,
31953193
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
31963194
errmsg("stack depth limit exceeded"),

‎src/backend/utils/cache/typcache.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1621,7 +1621,6 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
16211621
ereport(ERROR,
16221622
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
16231623
errmsg("record type has not been registered")));
1624-
16251624
returnNULL;
16261625
}
16271626
}

‎src/include/executor/nodeSeqscan.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
#include"access/parallel.h"
1818
#include"nodes/execnodes.h"
19-
#include"nodes/extensible.h"
2019

2120
externSeqScanState*ExecInitSeqScan(SeqScan*node,EState*estate,inteflags);
2221
externvoidExecEndSeqScan(SeqScanState*node);

‎src/include/optimizer/paths.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,6 @@ extern void add_paths_to_joinrel(PlannerInfo *root, RelOptInfo *joinrel,
102102
RelOptInfo*outerrel,RelOptInfo*innerrel,
103103
JoinTypejointype,SpecialJoinInfo*sjinfo,
104104
List*restrictlist);
105-
externvoidhash_inner_and_outer(PlannerInfo*root,RelOptInfo*joinrel,
106-
RelOptInfo*outerrel,RelOptInfo*innerrel,
107-
JoinTypejointype,JoinPathExtraData*extra);
108105

109106
/*
110107
* joinrels.c

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp