@@ -65,6 +65,8 @@ static void EXCHANGE_InitializeWorker(CustomScanState *node,
65
65
static Node * EXCHANGE_Create_state (CustomScan * node );
66
66
67
67
68
+ #define END_OF_TUPLES 'E'
69
+ #define END_OF_EXCHANGE 'Q'
68
70
void
69
71
EXCHANGE_Init_methods (void )
70
72
{
@@ -95,6 +97,7 @@ EXCHANGE_Init_methods(void)
95
97
DistExec_Init_methods ();
96
98
}
97
99
100
+ #include "nodes/relation.h"
98
101
/*
99
102
* Add one path for a base relation target: replace all ForeignScan nodes by
100
103
* local Scan nodes.
@@ -116,7 +119,7 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
116
119
*/
117
120
return ;
118
121
119
- elog (INFO ,"INSERT EXCHANGE" );
122
+ elog (INFO ,"INSERT EXCHANGE. paths: %d" , list_length ( rel -> pathlist ) );
120
123
121
124
/* Traverse all possible paths and search for APPEND */
122
125
foreach (lc ,rel -> pathlist )
@@ -125,16 +128,15 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
125
128
Path * tmpLocalScanPath = NULL ;
126
129
AppendPath * appendPath = NULL ;
127
130
ListCell * lc1 ;
128
- List * private_data = NIL ;
129
-
130
- Assert (path -> pathtype != T_MergeAppend );/* Do it later */
131
+ Bitmapset * servers = NULL ;
132
+ List * subpaths = NIL ;
131
133
132
134
if (path -> pathtype != T_Append )
133
135
continue ;
134
136
135
- appendPath = makeNode ( AppendPath );
136
- memcpy ( appendPath ,path , sizeof ( AppendPath ));
137
- appendPath -> subpaths = NIL ;
137
+ //elog(INFO, "-> IE. path params: %hhu, ptype: %d, tcost: %f, scost: %f",
138
+ //path->param_info != NULL , path->pathtype,
139
+ //path->total_cost, path->startup_cost) ;
138
140
139
141
/*
140
142
* Traverse all APPEND subpaths, check for scan-type and search for
@@ -145,7 +147,9 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
145
147
Path * subpath = (Path * )lfirst (lc1 );
146
148
Path * tmpPath ;
147
149
Oid serverid = InvalidOid ;
148
-
150
+ elog (INFO ,"--> IE. subpath params: %hhu, ptype: %d, tcost: %f, scost: %f" ,
151
+ subpath -> param_info != NULL ,subpath -> pathtype ,
152
+ subpath -> total_cost ,subpath -> startup_cost );
149
153
if ((subpath -> pathtype != T_ForeignScan )&& (tmpLocalScanPath ))
150
154
/* Check assumption No.1 */
151
155
Assert (tmpLocalScanPath -> pathtype == subpath -> pathtype );
@@ -159,8 +163,11 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
159
163
160
164
case T_ForeignScan :
161
165
serverid = subpath -> parent -> serverid ;
166
+ if (PATH_REQ_OUTER (subpath )!= NULL )
167
+ continue ;
162
168
tmpPath = (Path * )makeNode (SeqScan );
163
- tmpPath = create_seqscan_path (root ,subpath -> parent ,subpath -> parent -> lateral_relids ,0 );
169
+ tmpPath = create_seqscan_path (root ,subpath -> parent ,
170
+ PATH_REQ_OUTER (subpath ),0 );
164
171
break ;
165
172
166
173
default :
@@ -170,22 +177,25 @@ add_exchange_paths(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry
170
177
if (!tmpLocalScanPath )
171
178
tmpLocalScanPath = tmpPath ;
172
179
173
- appendPath -> subpaths = lappend (appendPath -> subpaths ,tmpPath );
174
- if (OidIsValid (serverid ))
175
- private_data = lappend_oid (private_data ,serverid );
180
+ subpaths = lappend (subpaths ,tmpPath );
181
+ //appendPath->subpaths = lappend(appendPath->subpaths, tmpPath);
182
+ if (OidIsValid (serverid )&& !bms_is_member ((int )serverid ,servers ))
183
+ servers = bms_add_member (servers ,serverid );
176
184
}
177
185
178
- if (private_data == NIL )
186
+ if (servers == NULL )
179
187
{
180
- pfree (appendPath );
181
188
elog (INFO ,"NO one foreign source found" );
182
189
continue ;
183
190
}
184
191
else
185
- elog (INFO ,"Source found: %d" ,list_length ( private_data ));
192
+ elog (INFO ,"Source found: %d" ,bms_num_members ( servers ));
186
193
194
+ appendPath = create_append_path (root ,rel ,subpaths ,NIL ,
195
+ PATH_REQ_OUTER (tmpLocalScanPath ),0 , false,
196
+ ((AppendPath * )path )-> partitioned_rels ,-1 );
187
197
path = create_exchange_path (root ,rel , (Path * )appendPath );
188
- path = create_distexec_path (root ,rel ,path ,private_data );
198
+ path = create_distexec_path (root ,rel ,path ,servers );
189
199
add_path (rel ,path );
190
200
}
191
201
}
@@ -206,8 +216,8 @@ cost_exchange(PlannerInfo *root, RelOptInfo *baserel, Path *path)
206
216
207
217
/* Now I do not want to think about cost estimations. */
208
218
path -> rows = baserel -> tuples ;
209
- path -> startup_cost = 0 .0001 ;
210
- path -> total_cost = path -> startup_cost + 0 .0001* path -> rows ;
219
+ path -> startup_cost = 10000 .0001 ;
220
+ path -> total_cost = path -> startup_cost + 100000 .0001* path -> rows ;
211
221
}
212
222
213
223
/* XXX: Need to be placed in shared memory */
@@ -342,7 +352,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
342
352
{
343
353
CustomScan * cscan = (CustomScan * )node -> ss .ps .plan ;
344
354
Plan * scan_plan ;
345
- bool explain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY )!= 0 );
355
+ // boolexplain_only = ((eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0);
346
356
PlanState * planState ;
347
357
ExchangeState * state = (ExchangeState * )node ;
348
358
TupleDesc scan_tupdesc ;
@@ -353,7 +363,7 @@ EXCHANGE_Begin(CustomScanState *node, EState *estate, int eflags)
353
363
planState = (PlanState * )ExecInitNode (scan_plan ,estate ,eflags );
354
364
node -> custom_ps = lappend (node -> custom_ps ,planState );
355
365
356
- Assert ( Stream_subscribe (state -> stream ) );
366
+ Stream_subscribe (state -> stream );
357
367
358
368
state -> init = false;
359
369
state -> ltuples = 0 ;
@@ -375,23 +385,33 @@ distribution_fn_gather(TupleTableSlot *slot, DMQDestCont *dcont)
375
385
return -1 ;
376
386
}
377
387
378
- static TupleTableSlot *
379
- EXCHANGE_Execute ( CustomScanState * node )
388
+ static void
389
+ init_state_ifany ( ExchangeState * state )
380
390
{
381
- ScanState * ss = & node -> ss ;
382
- ScanState * subPlanState = linitial (node -> custom_ps );
383
- ExchangeState * state = (ExchangeState * )node ;
384
- bool readRemote = true;
385
-
386
391
if (!state -> init )
387
392
{
388
393
EphemeralNamedRelation enr = get_ENR (state -> estate -> es_queryEnv ,destsName );
394
+
395
+ Assert (enr != NULL && enr -> reldata != NULL );
389
396
state -> dests = (DMQDestCont * )enr -> reldata ;
390
- state -> init = true;
391
397
state -> hasLocal = true;
392
398
state -> activeRemotes = state -> dests -> nservers ;
399
+ state -> init = true;
400
+ //elog(INFO, "[%d] EXCHANGE Init", getpid());
393
401
}
394
402
403
+ }
404
+
405
+ static TupleTableSlot *
406
+ EXCHANGE_Execute (CustomScanState * node )
407
+ {
408
+ ScanState * ss = & node -> ss ;
409
+ ScanState * subPlanState = linitial (node -> custom_ps );
410
+ ExchangeState * state = (ExchangeState * )node ;
411
+ bool readRemote = true;
412
+
413
+ init_state_ifany (state );
414
+
395
415
for (;;)
396
416
{
397
417
TupleTableSlot * slot = NULL ;
@@ -405,22 +425,23 @@ EXCHANGE_Execute(CustomScanState *node)
405
425
406
426
slot = RecvTuple (ss -> ss_ScanTupleSlot -> tts_tupleDescriptor ,
407
427
state -> stream ,& status );
408
- if (status == 0 )
428
+ switch (status )
409
429
{
410
- if (TupIsNull (slot ))
411
- {
412
- state -> activeRemotes -- ;
413
- //elog(LOG, "Finish remote receiving. r=%d", state->rtuples);
414
- }
415
- else
416
- {
417
- state -> rtuples ++ ;
418
- //elog(LOG, "GOT tuple from another node. r=%d", state->rtuples);
419
- return slot ;
420
- }
430
+ case -1 :
431
+ /* No tuples currently */
432
+ break ;
433
+ case 0 :
434
+ Assert (!TupIsNull (slot ));
435
+ state -> rtuples ++ ;
436
+ return slot ;
437
+ case 1 :
438
+ state -> activeRemotes -- ;
439
+ break ;
440
+ case 2 :/* Close EXCHANGE channel */
441
+ break ;
442
+ default :
443
+ Assert (0 );
421
444
}
422
- //else
423
- //elog(LOG, "No remote tuples for now");
424
445
}
425
446
426
447
if ((state -> hasLocal )&& (!readRemote ))
@@ -429,9 +450,9 @@ EXCHANGE_Execute(CustomScanState *node)
429
450
if (TupIsNull (slot ))
430
451
{
431
452
int i ;
432
- //elog(LOG, "FINISH Local store: l=%d, r=%d", state->ltuples, state->rtuples);
453
+ // elog(LOG, "[%s] FINISH Local store: l=%d, r=%d", state->stream , state->ltuples, state->rtuples);
433
454
for (i = 0 ;i < state -> dests -> nservers ;i ++ )
434
- SendTuple (state -> dests -> dests [i ].dest_id ,state -> stream ,NULL );
455
+ SendByteMessage (state -> dests -> dests [i ].dest_id ,state -> stream ,END_OF_TUPLES );
435
456
state -> hasLocal = false;
436
457
continue ;
437
458
}
@@ -444,7 +465,8 @@ EXCHANGE_Execute(CustomScanState *node)
444
465
445
466
if ((state -> activeRemotes == 0 )&& (!state -> hasLocal ))
446
467
{
447
- elog (LOG ,"Exchange returns NULL: %d %d" ,state -> ltuples ,state -> rtuples );
468
+ elog (LOG ,"[%s] Exchange returns NULL: %d %d" ,state -> stream ,
469
+ state -> ltuples ,state -> rtuples );
448
470
return NULL ;
449
471
}
450
472
@@ -457,7 +479,6 @@ EXCHANGE_Execute(CustomScanState *node)
457
479
return slot ;
458
480
else
459
481
{
460
- //elog(LOG, "Send real tuple");
461
482
SendTuple (dest ,state -> stream ,slot );
462
483
}
463
484
}
@@ -471,20 +492,25 @@ EXCHANGE_End(CustomScanState *node)
471
492
472
493
Assert (list_length (node -> custom_ps )== 1 );
473
494
ExecEndNode (linitial (node -> custom_ps ));
474
- Assert (Stream_unsubscribe (state -> stream ));
475
- elog (LOG ,"EXCHANGE_END" );
476
- /*
477
- * Clean out exchange state
478
- */
495
+ Stream_unsubscribe (state -> stream );
496
+
497
+ elog (INFO ,"EXCHANGE_END" );
479
498
}
480
499
481
500
static void
482
501
EXCHANGE_Rescan (CustomScanState * node )
483
502
{
484
- PlanState * outerPlan = outerPlanState (node );
503
+ ExchangeState * state = (ExchangeState * )node ;
504
+ PlanState * subPlan = (PlanState * )linitial (node -> custom_ps );
485
505
486
- if (outerPlan -> chgParam == NULL )
487
- ExecReScan (outerPlan );
506
+ init_state_ifany (state );
507
+ elog (INFO ,"Rescan exchange! %d" ,getpid ());
508
+ if (subPlan -> chgParam == NULL )
509
+ ExecReScan (subPlan );
510
+ state -> activeRemotes = state -> dests -> nservers ;
511
+ state -> ltuples = 0 ;
512
+ state -> rtuples = 0 ;
513
+ state -> hasLocal = true;
488
514
}
489
515
490
516
static void
@@ -500,8 +526,10 @@ static void
500
526
EXCHANGE_Explain (CustomScanState * node ,List * ancestors ,ExplainState * es )
501
527
{
502
528
StringInfoData str ;
529
+ ExchangeState * state = (ExchangeState * )node ;
503
530
504
531
initStringInfo (& str );
532
+ appendStringInfo (& str ,"stream: %s. " ,state -> stream );
505
533
ExplainPropertyText ("Exchange" ,str .data ,es );
506
534
}
507
535