Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitfd4405d

Browse files
committed
make sure PartitionFilter is enabled while performing [concurrent] partitioning, fixes for ConcurrentPartWorker: pythonish tests pass
1 parent0042c46 commitfd4405d

File tree

4 files changed

+44
-20
lines changed

4 files changed

+44
-20
lines changed

‎init.sql

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ BEGIN
247247
RETURN;
248248
END
249249
$$
250-
LANGUAGE plpgsql;
250+
LANGUAGE plpgsql
251+
SETpg_pathman.enable_partitionfilter=on;/* ensures that PartitionFilter is ON*/
251252

252253
/*
253254
* Old school way to distribute rows to partitions.
@@ -275,7 +276,8 @@ BEGIN
275276
RETURN;
276277
END
277278
$$
278-
LANGUAGE plpgsql;
279+
LANGUAGE plpgsql
280+
SETpg_pathman.enable_partitionfilter=on;/* ensures that PartitionFilter is ON*/
279281

280282
/*
281283
* Disable pathman partitioning for specified relation.
@@ -541,7 +543,7 @@ BEGIN
541543
RETURN v_part_count;
542544
END
543545
$$ LANGUAGE plpgsql
544-
SETpg_pathman.enable_partitionfilter= off;
546+
SETpg_pathman.enable_partitionfilter= off;/* ensures that PartitionFilter is OFF*/
545547

546548

547549

‎src/pathman_workers.c

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,11 @@ bgw_main_concurrent_part(Datum main_arg)
421421
/* Do the job */
422422
do
423423
{
424-
Oidtypes[2]= {OIDOID,INT4OID };
425-
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
426-
boolnulls[2]= { false,false };
424+
MemoryContextold_mcxt;
425+
426+
Oidtypes[2]= {OIDOID,INT4OID };
427+
Datumvals[2]= {part_slot->relid,part_slot->batch_size };
428+
boolnulls[2]= { false,false };
427429

428430
/* Reset loop variables */
429431
failed= false;
@@ -432,22 +434,25 @@ bgw_main_concurrent_part(Datum main_arg)
432434
/* Start new transaction (syscache access etc.) */
433435
StartTransactionCommand();
434436

437+
/* We'll need this to recover from errors */
438+
old_mcxt=CurrentMemoryContext;
439+
435440
SPI_connect();
436441
PushActiveSnapshot(GetTransactionSnapshot());
437442

438443
/* Prepare the query if needed */
439444
if (sql==NULL)
440445
{
441-
MemoryContextoldcontext;
446+
MemoryContextcurrent_mcxt;
442447

443448
/*
444449
* Allocate as SQL query in top memory context because current
445450
* context will be destroyed after transaction finishes
446451
*/
447-
oldcontext=MemoryContextSwitchTo(TopMemoryContext);
452+
current_mcxt=MemoryContextSwitchTo(TopMemoryContext);
448453
sql=psprintf("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)",
449454
get_namespace_name(get_pathman_schema()));
450-
MemoryContextSwitchTo(oldcontext);
455+
MemoryContextSwitchTo(current_mcxt);
451456
}
452457

453458
/* Exec ret = _partition_data_concurrent() */
@@ -471,21 +476,33 @@ bgw_main_concurrent_part(Datum main_arg)
471476
}
472477
PG_CATCH();
473478
{
474-
ErrorData*error;
475-
476-
EmitErrorReport();
479+
ErrorData*error;
480+
char*sleep_time_str;
477481

482+
/* Switch to the original context & copy edata */
483+
MemoryContextSwitchTo(old_mcxt);
478484
error=CopyErrorData();
479-
elog(LOG,"%s: %s",concurrent_part_bgw,error->message);
480485
FlushErrorState();
486+
487+
/* Print messsage for this BGWorker to server log */
488+
sleep_time_str=datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
489+
FLOAT8OID);
490+
ereport(LOG,
491+
(errmsg("%s: %s",concurrent_part_bgw,error->message),
492+
errdetail("Attempt: %d/%d, sleep time: %s",
493+
failures_count+1,
494+
PART_WORKER_MAX_ATTEMPTS,
495+
sleep_time_str)));
496+
pfree(sleep_time_str);/* free the time string */
497+
481498
FreeErrorData(error);
482499

483500
/*
484501
* The most common exception we can catch here is a deadlock with
485502
* concurrent user queries. Check that attempts count doesn't exceed
486503
* some reasonable value
487504
*/
488-
if (failures_count++>PART_WORKER_MAX_ATTEMPTS)
505+
if (failures_count++ >=PART_WORKER_MAX_ATTEMPTS)
489506
{
490507
/* Mark slot as FREE */
491508
part_slot->worker_status=WS_FREE;
@@ -510,8 +527,11 @@ bgw_main_concurrent_part(Datum main_arg)
510527
if (failed)
511528
{
512529
#ifdefUSE_ASSERT_CHECKING
513-
elog(DEBUG2,"%s: could not relocate batch, total: %lu [%u]",
514-
concurrent_part_bgw,part_slot->total_rows,MyProcPid);
530+
elog(DEBUG1,"%s: could not relocate batch (%d/%d), total: %lu [%u]",
531+
concurrent_part_bgw,
532+
failures_count,PART_WORKER_MAX_ATTEMPTS,/* current/max */
533+
part_slot->total_rows,
534+
MyProcPid);
515535
#endif
516536

517537
/* Abort transaction and sleep for a second */
@@ -528,7 +548,7 @@ bgw_main_concurrent_part(Datum main_arg)
528548
part_slot->total_rows+=rows;
529549

530550
#ifdefUSE_ASSERT_CHECKING
531-
elog(DEBUG2,"%s: relocated %d rows, total: %lu [%u]",
551+
elog(DEBUG1,"%s: relocated %d rows, total: %lu [%u]",
532552
concurrent_part_bgw,rows,part_slot->total_rows,MyProcPid);
533553
#endif
534554
}

‎src/pathman_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ typedef struct
8282
#definePART_WORKER_SLOTS10
8383

8484
/* Max number of attempts per batch */
85-
#definePART_WORKER_MAX_ATTEMPTS100
85+
#definePART_WORKER_MAX_ATTEMPTS60
8686

8787

8888
/*

‎tests/partitioning_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ def test_concurrent(self):
5353
whileTrue:
5454
# update some rows to check for deadlocks
5555
node.safe_psql('postgres',
56-
'''update abc set t = 'test'
57-
where id in (select (random() * 300000)::int from generate_series(1, 3000))''')
56+
'''
57+
update abc set t = 'test'
58+
where id in (select (random() * 300000)::int from generate_series(1, 3000))
59+
''')
5860

5961
count=node.execute('postgres','select count(*) from pathman_concurrent_part_tasks')
6062

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp