Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Parquet reader: allow Prefetch to be scheduled in parallel (via TaskExecutor)#19942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
carlopi wants to merge1 commit intoduckdb:main
base:main
Choose a base branch
Loading
fromcarlopi:parquet_prefetch_sometimes_in_parallel

Conversation

@carlopi
Copy link
Contributor

@carlopicarlopi commentedNov 26, 2025
edited
Loading

Incremental step on top of infrastructure added previously in#19422 and partially in#19566

This allows for some codepaths of ParquetReader to create a task for each range to be prefetched, and have those be executed in parallel, with the main task restarting afterward.

This is most visible when doing query like:

SELECT column_2, column_5, column_12FROM'https://some-endpoint.com/my_file.parquet'LIMIT1000000;

where one need to fetch only certain columns that are spaced apart AND row_group level parallelism potential can't be unlocked.

In this case before this PR a single task would do different sub-tasks like:

---preparatory_work----fetch_#2-----fetch_#5---fetch_#12----final_work----|

and afterward:

---preparatory_work--|                      --fetch_#2----|                      -fetch_#5-|                      --fetch_#12--|                                     --final_work----|

Where the subtasks are split (but with dependency on each other) and handled by the DuckDB own TaskExecutor. Note that the same logical thread might end up being scheduled for doing all the work (say system is already busy), but splitting subtasks into proper task is at very least more scheduling friendly.

Note that for the call-sites with a pattern like:

auto res = transport.Prefetch(file_size - prefetch_size, prefetch_size);res.ExecuteTasksSynchronously();

That means Task are still executed as sub-tasks (and doing so sequentially). Only when the tasks (packaged asAsyncResult are returned up to the PhysicalTableScan they are there and then scheduled.

Main item that could use an extra pair of reviewer eyes: lifetime management of the referenced objects, especially in case of query cancellation.

auto total_compressed_size =GetGroupCompressedSize(state);
if (total_compressed_size >0) {
trans.Prefetch(GetGroupOffset(state), total_row_group_span);
auto res = trans.Prefetch(GetGroupOffset(state), total_row_group_span);
Copy link
Contributor

@TishjTishjNov 27, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This isn't resettingresult, which isn't preserving the existing behavior
Actuallyresult is already reset at the top of the function, so it's not necessary in the other 2 places: lines 1297 and 1302

Copy link
Contributor

@TishjTishjNov 27, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is the point of interest of the PR right?
Since we're not usingExecuteTasksSynchronously here

TheReadIntoBufferAsyncTask is referencing theReadHead which is owned by theThriftFileTransport class.
That class has a method to clear the readheads:

voidClearPrefetch() {        ra_buffer.read_heads.clear();        ra_buffer.merge_set.clear();    }

If that's used, we'll have a dangling reference, so that needs to be looked into.

TheCachingFileHandle & is owned by theParquetReaderScanState (more on this below)

But from an even wider perspective, I'm not sure how we ensure that these dependencies that are mentioned are enforced? If this task dies, and the global/local state with it, everything in theReadIntoBufferAsyncTask is a dangling pointer to freed memory. How are we ensuring that's never the case?

^ If I remember correctly, we're blocking this task, and only once the other tasks have finished is this one unblocked right?
I think that should work,ClearPrefetch is only called by this Scan method, so that should never run until we're finished if we are in charge of unblocking this, and nothing else will ever cause a wake-up

Though I think it would be safer if we had an "outstanding prefetches" variable, so even if the task would erroneously wake up, we can detect and throw

Copy link
Contributor

@TishjTishjNov 27, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The file transport seems to be owned byParquetReaderScanState

duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto;

which is part of the local state for the table function:

structParquetReadLocalState :publicLocalTableFunctionState {ParquetReaderScanState scan_state;};

So we need to make sure that this local state will always outlive the createdReadIntoBufferAsyncTask tasks, or we need to wrap things in a shared ptr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't think we can guarantee that tasks are canceled in a specific order if an interrupt happens
So if the parquet read is cancelled before the prefetch tasks are cancelled, we'll be causing a heap-use-after-free

Copy link
Contributor

@TishjTishjNov 27, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is problematic for example:

voidExecutor::CancelTasks() {task.reset();{lock_guard<mutex>elock(executor_lock);// mark the query as cancelled so tasks will early-outcancelled =true;// destroy all pipelines, events and statesfor (auto &rec_cte_ref : recursive_ctes) {auto &rec_cte = rec_cte_ref.get().Cast<PhysicalRecursiveCTE>();rec_cte.recursive_meta_pipeline.reset();}pipelines.clear();root_pipelines.clear();to_be_rescheduled_tasks.clear();events.clear();}// Take all pending tasks and execute them until they cancelwhile (executor_tasks >0) {WorkOnTasks();}}

The blocked tasks are cleared first (to_be_rescheduled_tasks)
ThePipelineTask owns thePipelineExecutor
thePipelineExecutor owns theunique_ptr<LocalSourceState> local_source_state;

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Thanks for the feedback!

@carlopicarlopi marked this pull request as draftNovember 27, 2025 13:11
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

1 more reviewer

@TishjTishjTishj left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants

@carlopi@Tishj

[8]ページ先頭

©2009-2025 Movatter.jp