Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitc9f9831

Browse files
committed
fix partition creation callback invocation
1 parentb94d661 commitc9f9831

File tree

6 files changed

+62
-85
lines changed

6 files changed

+62
-85
lines changed

‎expected/pathman_callbacks.out

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,11 @@
22
CREATE EXTENSION pg_pathman;
33
CREATE SCHEMA callbacks;
44
/* Check callbacks */
5-
CREATE TABLE callbacks.log(id serial, message text);
6-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_range_callback(
5+
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_callback(
76
args JSONB)
87
RETURNS VOID AS $$
9-
DECLARE
10-
start_valueTEXT := args->>'start';
11-
end_valueTEXT := args->'end';
128
BEGIN
13-
INSERT INTO callbacks.log(message)
14-
VALUES (start_value || '-' || end_value);
15-
END
16-
$$ language plpgsql;
17-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_hash_callback(
18-
args JSONB)
19-
RETURNS VOID AS $$
20-
BEGIN
21-
RAISE WARNING 'callback: partition %', args->'partition';
9+
RAISE WARNING 'callback arg: %', args::TEXT;
2210
END
2311
$$ language plpgsql;
2412
/* set callback to be called on RANGE partitions */
@@ -31,7 +19,7 @@ NOTICE: sequence "abc_seq" does not exist, skipping
3119
(1 row)
3220

3321
SELECT set_part_init_callback('callbacks.abc',
34-
'callbacks.abc_on_part_created_range_callback');
22+
'callbacks.abc_on_part_created_callback');
3523
set_part_init_callback
3624
------------------------
3725

@@ -40,32 +28,26 @@ SELECT set_part_init_callback('callbacks.abc',
4028
INSERT INTO callbacks.abc VALUES (123, 1);
4129
INSERT INTO callbacks.abc VALUES (223, 1);
4230
SELECT append_range_partition('callbacks.abc');
31+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_4", "range_max": "401", "range_min": "301"}
4332
append_range_partition
4433
------------------------
4534
callbacks.abc_4
4635
(1 row)
4736

4837
SELECT prepend_range_partition('callbacks.abc');
38+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_5", "range_max": "1", "range_min": "-99"}
4939
prepend_range_partition
5040
-------------------------
5141
callbacks.abc_5
5242
(1 row)
5343

5444
SELECT add_range_partition('callbacks.abc', 401, 502);
45+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_6", "range_max": "502", "range_min": "401"}
5546
add_range_partition
5647
---------------------
5748
callbacks.abc_6
5849
(1 row)
5950

60-
SELECT message FROM callbacks.log ORDER BY id;
61-
message
62-
-----------
63-
201-"301"
64-
301-"401"
65-
-99-"1"
66-
401-"502"
67-
(4 rows)
68-
6951
SELECT drop_partitions('callbacks.abc');
7052
NOTICE: function callbacks.abc_upd_trig_func() does not exist, skipping
7153
NOTICE: 0 rows copied from callbacks.abc_1
@@ -81,23 +63,23 @@ NOTICE: 0 rows copied from callbacks.abc_6
8163

8264
/* set callback to be called on HASH partitions */
8365
SELECT set_part_init_callback('callbacks.abc',
84-
'callbacks.abc_on_part_created_hash_callback');
66+
'callbacks.abc_on_part_created_callback');
8567
set_part_init_callback
8668
------------------------
8769

8870
(1 row)
8971

9072
SELECT create_hash_partitions('callbacks.abc', 'a', 5);
91-
WARNING: callback:partition "abc_0"
92-
WARNING: callback:partition "abc_1"
93-
WARNING: callback:partition "abc_2"
94-
WARNING: callback:partition "abc_3"
95-
WARNING: callback:partition "abc_4"
73+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_0"}
74+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_1"}
75+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_2"}
76+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_3"}
77+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_4"}
9678
create_hash_partitions
9779
------------------------
9880
5
9981
(1 row)
10082

10183
DROP SCHEMA callbacks CASCADE;
102-
NOTICE: drop cascades to10 other objects
84+
NOTICE: drop cascades to8 other objects
10385
DROP EXTENSION pg_pathman CASCADE;

‎sql/pathman_callbacks.sql

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,12 @@ CREATE EXTENSION pg_pathman;
44
CREATESCHEMAcallbacks;
55

66
/* Check callbacks*/
7-
CREATETABLEcallbacks.log(idserial, messagetext);
87

9-
CREATE OR REPLACEFUNCTIONcallbacks.abc_on_part_created_range_callback(
8+
CREATE OR REPLACEFUNCTIONcallbacks.abc_on_part_created_callback(
109
args JSONB)
1110
RETURNS VOIDAS $$
12-
DECLARE
13-
start_valueTEXT := args->>'start';
14-
end_valueTEXT := args->'end';
1511
BEGIN
16-
INSERT INTOcallbacks.log(message)
17-
VALUES (start_value||'-'|| end_value);
18-
END
19-
$$ language plpgsql;
20-
21-
22-
CREATE OR REPLACEFUNCTIONcallbacks.abc_on_part_created_hash_callback(
23-
args JSONB)
24-
RETURNS VOIDAS $$
25-
BEGIN
26-
RAISE WARNING'callback: partition %', args->'partition';
12+
RAISE WARNING'callback arg: %', args::TEXT;
2713
END
2814
$$ language plpgsql;
2915

@@ -33,7 +19,7 @@ CREATE TABLE callbacks.abc(a serial, b int);
3319
SELECT create_range_partitions('callbacks.abc','a',1,100,2);
3420

3521
SELECT set_part_init_callback('callbacks.abc',
36-
'callbacks.abc_on_part_created_range_callback');
22+
'callbacks.abc_on_part_created_callback');
3723

3824
INSERT INTOcallbacks.abcVALUES (123,1);
3925
INSERT INTOcallbacks.abcVALUES (223,1);
@@ -42,14 +28,12 @@ SELECT append_range_partition('callbacks.abc');
4228
SELECT prepend_range_partition('callbacks.abc');
4329
SELECT add_range_partition('callbacks.abc',401,502);
4430

45-
SELECT messageFROMcallbacks.logORDER BY id;
46-
4731
SELECT drop_partitions('callbacks.abc');
4832

4933

5034
/* set callback to be called on HASH partitions*/
5135
SELECT set_part_init_callback('callbacks.abc',
52-
'callbacks.abc_on_part_created_hash_callback');
36+
'callbacks.abc_on_part_created_callback');
5337
SELECT create_hash_partitions('callbacks.abc','a',5);
5438

5539

‎src/pathman_workers.c

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ bgw_main_concurrent_part(Datum main_arg)
488488
failures_count++;
489489
ereport(LOG,
490490
(errmsg("%s: %s",concurrent_part_bgw,error->message),
491-
errdetail("Attempt: %d/%d, sleep time: %s",
491+
errdetail("attempt: %d/%d, sleep time: %s",
492492
failures_count,
493493
PART_WORKER_MAX_ATTEMPTS,
494494
sleep_time_str)));
@@ -507,9 +507,9 @@ bgw_main_concurrent_part(Datum main_arg)
507507
cps_set_status(part_slot,CPS_FREE);
508508

509509
elog(LOG,
510-
"Concurrent partitioning worker has canceled the task because "
511-
"maximum amount of attempts (%d) had been exceeded. "
512-
"See the error message below",
510+
"concurrent partitioning worker has canceled the task because "
511+
"maximum amount of attempts (%d) had been exceeded, "
512+
"see the error message below",
513513
PART_WORKER_MAX_ATTEMPTS);
514514

515515
return;/* exit quickly */
@@ -573,11 +573,9 @@ bgw_main_concurrent_part(Datum main_arg)
573573
Datum
574574
partition_table_concurrently(PG_FUNCTION_ARGS)
575575
{
576-
#definetostr(str) ( #str )/* convert function's name to literal */
577-
578576
Oidrelid=PG_GETARG_OID(0);
579-
intempty_slot_idx=-1;/* do we have a slot for BGWorker? */
580-
inti;
577+
intempty_slot_idx=-1,/* do we have a slot for BGWorker? */
578+
i;
581579

582580
/* Check if relation is a partitioned table */
583581
shout_if_prel_is_invalid(relid,
@@ -617,7 +615,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
617615
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
618616

619617
elog(ERROR,
620-
"Table \"%s\" is already being partitioned",
618+
"table \"%s\" is already being partitioned",
621619
get_rel_name(relid));
622620
}
623621

@@ -628,7 +626,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
628626

629627
/* Looks like we could not find an empty slot */
630628
if (empty_slot_idx<0)
631-
elog(ERROR,"No empty worker slots found");
629+
elog(ERROR,"no empty worker slots found");
632630
else
633631
{
634632
/* Initialize concurrent part slot */
@@ -648,9 +646,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
648646

649647
/* Tell user everything's fine */
650648
elog(NOTICE,
651-
"Worker started. You can stop it "
649+
"worker started, you can stop it "
652650
"with the following command: select %s('%s');",
653-
tostr(stop_concurrent_part_task),/* convert function's name to literal */
651+
CppAsString(stop_concurrent_part_task),
654652
get_rel_name(relid));
655653

656654
PG_RETURN_VOID();
@@ -785,7 +783,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
785783
cur_slot->relid==relid&&
786784
cur_slot->dbid==MyDatabaseId)
787785
{
788-
elog(NOTICE,"Worker will stop after it finishes current batch");
786+
elog(NOTICE,"worker will stop after it finishes current batch");
789787

790788
/* Change worker's state & set 'worker_found' */
791789
cur_slot->worker_status=CPS_STOPPING;
@@ -800,7 +798,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
800798
PG_RETURN_BOOL(true);
801799
else
802800
{
803-
elog(ERROR,"Cannot find worker for relation \"%s\"",
801+
elog(ERROR,"cannot find worker for relation \"%s\"",
804802
get_rel_name_or_relid(relid));
805803

806804
PG_RETURN_BOOL(false);/* keep compiler happy */

‎src/pl_funcs.c

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -769,16 +769,25 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
769769
if (PG_ARGISNULL(ARG_CHILD))
770770
elog(ERROR,"partition should not be null");
771771

772-
/* Both RANGE_START & RANGE_END are not available (HASH) */
773-
if (PG_ARGISNULL(ARG_RANGE_START)&&PG_ARGISNULL(ARG_RANGE_START))
774-
part_type=PT_HASH;
772+
switch (PG_NARGS())
773+
{
774+
case3:
775+
part_type=PT_HASH;
776+
break;
777+
778+
case5:
779+
{
780+
if (PG_ARGISNULL(ARG_RANGE_START)||PG_ARGISNULL(ARG_RANGE_START))
781+
elog(ERROR,"both bounds must be provided for RANGE partition");
775782

776-
/* Either RANGE_START or RANGE_END is missing */
777-
elseif (PG_ARGISNULL(ARG_RANGE_START)||PG_ARGISNULL(ARG_RANGE_START))
778-
elog(ERROR,"both boundaries must be provided for RANGE partition");
783+
part_type=PT_RANGE;
784+
}
785+
break;
779786

780-
/* Both RANGE_START & RANGE_END are provided */
781-
elsepart_type=PT_RANGE;
787+
default:
788+
elog(ERROR,"error in function \"%s\"",
789+
CppAsString(invoke_on_partition_created_callback));
790+
}
782791

783792
/* Build JSONB according to partitioning type */
784793
switch (part_type)
@@ -791,8 +800,8 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
791800
JSB_INIT_VAL(&val,WJB_VALUE,get_rel_name_or_relid(parent_oid));
792801
JSB_INIT_VAL(&key,WJB_KEY,"partition");
793802
JSB_INIT_VAL(&val,WJB_VALUE,get_rel_name_or_relid(partition_oid));
794-
JSB_INIT_VAL(&key,WJB_KEY,"part_type");
795-
JSB_INIT_VAL(&val,WJB_VALUE,"HASH");
803+
JSB_INIT_VAL(&key,WJB_KEY,"parttype");
804+
JSB_INIT_VAL(&val,WJB_VALUE,PartTypeToCString(PT_HASH));
796805

797806
result=pushJsonbValue(&jsonb_state,WJB_END_OBJECT,NULL);
798807
}
@@ -814,11 +823,11 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
814823
JSB_INIT_VAL(&val,WJB_VALUE,get_rel_name_or_relid(parent_oid));
815824
JSB_INIT_VAL(&key,WJB_KEY,"partition");
816825
JSB_INIT_VAL(&val,WJB_VALUE,get_rel_name_or_relid(partition_oid));
817-
JSB_INIT_VAL(&key,WJB_KEY,"part_type");
818-
JSB_INIT_VAL(&val,WJB_VALUE,"RANGE");
819-
JSB_INIT_VAL(&key,WJB_KEY,"start");
826+
JSB_INIT_VAL(&key,WJB_KEY,"parttype");
827+
JSB_INIT_VAL(&val,WJB_VALUE,PartTypeToCString(PT_RANGE));
828+
JSB_INIT_VAL(&key,WJB_KEY,"range_min");
820829
JSB_INIT_VAL(&val,WJB_VALUE,start_value);
821-
JSB_INIT_VAL(&key,WJB_KEY,"end");
830+
JSB_INIT_VAL(&key,WJB_KEY,"range_max");
822831
JSB_INIT_VAL(&val,WJB_VALUE,end_value);
823832

824833
result=pushJsonbValue(&jsonb_state,WJB_END_OBJECT,NULL);

‎src/relation_info.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,19 +627,23 @@ DatumGetPartType(Datum datum)
627627
return (PartType)val;
628628
}
629629

630-
Datum
631-
PartTypeGetTextDatum(PartTypeparttype)
630+
char*
631+
PartTypeToCString(PartTypeparttype)
632632
{
633-
switch(parttype)
633+
staticchar*hash_str="1",
634+
*range_str="2";
635+
636+
switch (parttype)
634637
{
635638
casePT_HASH:
636-
returnCStringGetTextDatum("HASH");
639+
returnhash_str;
637640

638641
casePT_RANGE:
639-
returnCStringGetTextDatum("RANGE");
642+
returnrange_str;
640643

641644
default:
642645
elog(ERROR,"Unknown partitioning type %u",parttype);
646+
returnNULL;/* keep compiler happy */
643647
}
644648
}
645649

‎src/relation_info.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ Oid forget_parent_of_partition(Oid partition, PartParentSearch *status);
137137
Oidget_parent_of_partition(Oidpartition,PartParentSearch*status);
138138

139139
PartTypeDatumGetPartType(Datumdatum);
140-
DatumPartTypeGetTextDatum(PartTypeparttype);
140+
char*PartTypeToCString(PartTypeparttype);
141141

142142
voidshout_if_prel_is_invalid(Oidparent_oid,
143143
constPartRelationInfo*prel,

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp