12
12
13
13
#include "copy_stmt_hooking.h"
14
14
#include "init.h"
15
+ #include "partition_filter.h"
15
16
#include "relation_info.h"
16
17
17
18
#include "access/htup_details.h"
26
27
#include "nodes/makefuncs.h"
27
28
#include "utils/builtins.h"
28
29
#include "utils/lsyscache.h"
30
+ #include "utils/memutils.h"
29
31
#include "utils/rel.h"
30
32
#include "utils/rls.h"
31
33
32
34
#include "libpq/libpq.h"
33
35
34
36
37
+ static uint64 PathmanCopyFrom (CopyState cstate ,
38
+ Relation parent_rel ,
39
+ List * range_table ,
40
+ bool old_protocol );
41
+ static ResultRelInfoHolder * select_partition_for_copy (const PartRelationInfo * prel ,
42
+ ResultPartsStorage * parts_storage ,
43
+ Datum value ,EState * estate );
44
+
45
+
35
46
/*
36
47
* Is pg_pathman supposed to handle this COPY stmt?
37
48
*/
@@ -283,6 +294,11 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
283
294
/* COPY ... FROM ... */
284
295
if (is_from )
285
296
{
297
+ bool is_old_protocol ;
298
+
299
+ is_old_protocol = PG_PROTOCOL_MAJOR (FrontendProtocol )< 3 &&
300
+ stmt -> filename == NULL ;
301
+
286
302
/* There should be relation */
287
303
Assert (rel );
288
304
@@ -293,9 +309,7 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
293
309
294
310
cstate = BeginCopyFrom (rel ,stmt -> filename ,stmt -> is_program ,
295
311
stmt -> attlist ,stmt -> options );
296
- /* TODO: copy files to DB */
297
- heap_close (rel ,NoLock );
298
- * processed = 0 ;
312
+ * processed = PathmanCopyFrom (cstate ,rel ,range_table ,is_old_protocol );
299
313
EndCopyFrom (cstate );
300
314
}
301
315
/* COPY ... TO ... */
@@ -314,4 +328,233 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
314
328
/* Call standard DoCopy using a new CopyStmt */
315
329
DoCopy (& modified_copy_stmt ,queryString ,processed );
316
330
}
331
+
332
+ /*
333
+ * Close the relation. If reading, we can release the AccessShareLock we
334
+ * got; if writing, we should hold the lock until end of transaction to
335
+ * ensure that updates will be committed before lock is released.
336
+ */
337
+ if (rel != NULL )
338
+ heap_close (rel , (is_from ?NoLock :AccessShareLock ));
339
+ }
340
+
341
+ /*
342
+ * Copy FROM file to relation.
343
+ */
344
+ static uint64
345
+ PathmanCopyFrom (CopyState cstate ,Relation parent_rel ,
346
+ List * range_table ,bool old_protocol )
347
+ {
348
+ HeapTuple tuple ;
349
+ TupleDesc tupDesc ;
350
+ Datum * values ;
351
+ bool * nulls ;
352
+
353
+ ResultPartsStorage parts_storage ;
354
+ ResultRelInfo * parent_result_rel ;
355
+
356
+ EState * estate = CreateExecutorState ();/* for ExecConstraints() */
357
+ ExprContext * econtext ;
358
+ TupleTableSlot * myslot ;
359
+ MemoryContext oldcontext = CurrentMemoryContext ;
360
+
361
+ uint64 processed = 0 ;
362
+
363
+
364
+ tupDesc = RelationGetDescr (parent_rel );
365
+
366
+ parent_result_rel = makeNode (ResultRelInfo );
367
+ InitResultRelInfo (parent_result_rel ,
368
+ parent_rel ,
369
+ 1 ,/* dummy rangetable index */
370
+ 0 );
371
+ ExecOpenIndices (parent_result_rel , false);
372
+
373
+ estate -> es_result_relations = parent_result_rel ;
374
+ estate -> es_num_result_relations = 1 ;
375
+ estate -> es_result_relation_info = parent_result_rel ;
376
+ estate -> es_range_table = range_table ;
377
+
378
+ /* Initialize ResultPartsStorage */
379
+ init_result_parts_storage (& parts_storage ,estate , false,
380
+ ResultPartsStorageStandard ,
381
+ check_acl_for_partition ,NULL );
382
+ parts_storage .saved_rel_info = parent_result_rel ;
383
+
384
+ /* Set up a tuple slot too */
385
+ myslot = ExecInitExtraTupleSlot (estate );
386
+ ExecSetSlotDescriptor (myslot ,tupDesc );
387
+ /* Triggers might need a slot as well */
388
+ estate -> es_trig_tuple_slot = ExecInitExtraTupleSlot (estate );
389
+
390
+ /* Prepare to catch AFTER triggers. */
391
+ AfterTriggerBeginQuery ();
392
+
393
+ /*
394
+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
395
+ * should do this for COPY, since it's not really an "INSERT" statement as
396
+ * such. However, executing these triggers maintains consistency with the
397
+ * EACH ROW triggers that we already fire on COPY.
398
+ */
399
+ ExecBSInsertTriggers (estate ,parent_result_rel );
400
+
401
+ values = (Datum * )palloc (tupDesc -> natts * sizeof (Datum ));
402
+ nulls = (bool * )palloc (tupDesc -> natts * sizeof (bool ));
403
+
404
+ econtext = GetPerTupleExprContext (estate );
405
+
406
+ for (;;)
407
+ {
408
+ TupleTableSlot * slot ;
409
+ bool skip_tuple ;
410
+ Oid tuple_oid = InvalidOid ;
411
+
412
+ const PartRelationInfo * prel ;
413
+ ResultRelInfoHolder * rri_holder_child ;
414
+ ResultRelInfo * child_result_rel ;
415
+
416
+ CHECK_FOR_INTERRUPTS ();
417
+
418
+ ResetPerTupleExprContext (estate );
419
+
420
+ /* Fetch PartRelationInfo for parent relation */
421
+ prel = get_pathman_relation_info (RelationGetRelid (parent_rel ));
422
+
423
+ /* Switch into its memory context */
424
+ MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
425
+
426
+ if (!NextCopyFrom (cstate ,econtext ,values ,nulls ,& tuple_oid ))
427
+ break ;
428
+
429
+ /* Search for a matching partition */
430
+ rri_holder_child = select_partition_for_copy (prel ,& parts_storage ,
431
+ values [prel -> attnum - 1 ],
432
+ estate );
433
+ child_result_rel = rri_holder_child -> result_rel_info ;
434
+ estate -> es_result_relation_info = child_result_rel ;
435
+
436
+ /* And now we can form the input tuple. */
437
+ tuple = heap_form_tuple (tupDesc ,values ,nulls );
438
+ if (tuple_oid != InvalidOid )
439
+ HeapTupleSetOid (tuple ,tuple_oid );
440
+
441
+ /*
442
+ * Constraints might reference the tableoid column, so initialize
443
+ * t_tableOid before evaluating them.
444
+ */
445
+ tuple -> t_tableOid = RelationGetRelid (child_result_rel -> ri_RelationDesc );
446
+
447
+ /* Triggers and stuff need to be invoked in query context. */
448
+ MemoryContextSwitchTo (oldcontext );
449
+
450
+ /* Place tuple in tuple slot --- but slot shouldn't free it */
451
+ slot = myslot ;
452
+ ExecStoreTuple (tuple ,slot ,InvalidBuffer , false);
453
+
454
+ skip_tuple = false;
455
+
456
+ /* BEFORE ROW INSERT Triggers */
457
+ if (child_result_rel -> ri_TrigDesc &&
458
+ child_result_rel -> ri_TrigDesc -> trig_insert_before_row )
459
+ {
460
+ slot = ExecBRInsertTriggers (estate ,child_result_rel ,slot );
461
+
462
+ if (slot == NULL )/* "do nothing" */
463
+ skip_tuple = true;
464
+ else /* trigger might have changed tuple */
465
+ tuple = ExecMaterializeSlot (slot );
466
+ }
467
+
468
+ /* Proceed if we still have a tuple */
469
+ if (!skip_tuple )
470
+ {
471
+ List * recheckIndexes = NIL ;
472
+
473
+ /* Check the constraints of the tuple */
474
+ if (child_result_rel -> ri_RelationDesc -> rd_att -> constr )
475
+ ExecConstraints (child_result_rel ,slot ,estate );
476
+
477
+ /* OK, store the tuple and create index entries for it */
478
+ simple_heap_insert (child_result_rel -> ri_RelationDesc ,tuple );
479
+
480
+ if (child_result_rel -> ri_NumIndices > 0 )
481
+ recheckIndexes = ExecInsertIndexTuples (slot ,& (tuple -> t_self ),
482
+ estate , false,NULL ,
483
+ NIL );
484
+
485
+ /* AFTER ROW INSERT Triggers */
486
+ ExecARInsertTriggers (estate ,child_result_rel ,tuple ,
487
+ recheckIndexes );
488
+
489
+ list_free (recheckIndexes );
490
+
491
+ /*
492
+ * We count only tuples not suppressed by a BEFORE INSERT trigger;
493
+ * this is the same definition used by execMain.c for counting
494
+ * tuples inserted by an INSERT command.
495
+ */
496
+ processed ++ ;
497
+ }
498
+ }
499
+
500
+ MemoryContextSwitchTo (oldcontext );
501
+
502
+ /*
503
+ * In the old protocol, tell pqcomm that we can process normal protocol
504
+ * messages again.
505
+ */
506
+ if (old_protocol )
507
+ pq_endmsgread ();
508
+
509
+ /* Execute AFTER STATEMENT insertion triggers */
510
+ ExecASInsertTriggers (estate ,parent_result_rel );
511
+
512
+ /* Handle queued AFTER triggers */
513
+ AfterTriggerEndQuery (estate );
514
+
515
+ pfree (values );
516
+ pfree (nulls );
517
+
518
+ ExecResetTupleTable (estate -> es_tupleTable , false);
519
+ fini_result_parts_storage (& parts_storage );
520
+
521
+ FreeExecutorState (estate );
522
+
523
+ return processed ;
524
+ }
525
+
526
+ /*
527
+ * Smart wrapper for scan_result_parts_storage().
528
+ */
529
+ static ResultRelInfoHolder *
530
+ select_partition_for_copy (const PartRelationInfo * prel ,
531
+ ResultPartsStorage * parts_storage ,
532
+ Datum value ,EState * estate )
533
+ {
534
+ ExprContext * econtext ;
535
+ ResultRelInfoHolder * rri_holder ;
536
+ Oid selected_partid = InvalidOid ;
537
+ Oid * parts ;
538
+ int nparts ;
539
+
540
+ econtext = GetPerTupleExprContext (estate );
541
+
542
+ /* Search for matching partitions using partitioned column */
543
+ parts = find_partitions_for_value (value ,prel ,econtext ,& nparts );
544
+
545
+ if (nparts > 1 )
546
+ elog (ERROR ,"PATHMAN COPY selected more than one partition" );
547
+ else if (nparts == 0 )
548
+ elog (ERROR ,
549
+ "There is no suitable partition for key '%s'" ,
550
+ datum_to_cstring (value ,prel -> atttype ));
551
+ else
552
+ selected_partid = parts [0 ];
553
+
554
+ /* Replace parent table with a suitable partition */
555
+ MemoryContextSwitchTo (estate -> es_query_cxt );
556
+ rri_holder = scan_result_parts_storage (selected_partid ,parts_storage );
557
+ MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
558
+
559
+ return rri_holder ;
317
560
}