- Notifications
You must be signed in to change notification settings - Fork2.7k
Parquet reader: allow Prefetch to be scheduled in parallel (via TaskExecutor)#19942
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Conversation
| auto total_compressed_size =GetGroupCompressedSize(state); | ||
| if (total_compressed_size >0) { | ||
| trans.Prefetch(GetGroupOffset(state), total_row_group_span); | ||
| auto res = trans.Prefetch(GetGroupOffset(state), total_row_group_span); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This isn't resettingresult, which isn't preserving the existing behavior
Actuallyresult is already reset at the top of the function, so it's not necessary in the other 2 places: lines 1297 and 1302
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is the point of interest of the PR right?
Since we're not usingExecuteTasksSynchronously here
TheReadIntoBufferAsyncTask is referencing theReadHead which is owned by theThriftFileTransport class.
That class has a method to clear the readheads:
voidClearPrefetch() { ra_buffer.read_heads.clear(); ra_buffer.merge_set.clear(); }
If that's used, we'll have a dangling reference, so that needs to be looked into.
TheCachingFileHandle & is owned by theParquetReaderScanState (more on this below)
But from an even wider perspective, I'm not sure how we ensure that these dependencies that are mentioned are enforced? If this task dies, and the global/local state with it, everything in theReadIntoBufferAsyncTask is a dangling pointer to freed memory. How are we ensuring that's never the case?
^ If I remember correctly, we're blocking this task, and only once the other tasks have finished is this one unblocked right?
I think that should work,ClearPrefetch is only called by this Scan method, so that should never run until we're finished if we are in charge of unblocking this, and nothing else will ever cause a wake-up
Though I think it would be safer if we had an "outstanding prefetches" variable, so even if the task would erroneously wake up, we can detect and throw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The file transport seems to be owned byParquetReaderScanState
duckdb_base_std::unique_ptr<duckdb_apache::thrift::protocol::TProtocol> thrift_file_proto;
which is part of the local state for the table function:
structParquetReadLocalState :publicLocalTableFunctionState {ParquetReaderScanState scan_state;};
So we need to make sure that this local state will always outlive the createdReadIntoBufferAsyncTask tasks, or we need to wrap things in a shared ptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't think we can guarantee that tasks are canceled in a specific order if an interrupt happens
So if the parquet read is cancelled before the prefetch tasks are cancelled, we'll be causing a heap-use-after-free
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is problematic for example:
voidExecutor::CancelTasks() {task.reset();{lock_guard<mutex>elock(executor_lock);// mark the query as cancelled so tasks will early-outcancelled =true;// destroy all pipelines, events and statesfor (auto &rec_cte_ref : recursive_ctes) {auto &rec_cte = rec_cte_ref.get().Cast<PhysicalRecursiveCTE>();rec_cte.recursive_meta_pipeline.reset();}pipelines.clear();root_pipelines.clear();to_be_rescheduled_tasks.clear();events.clear();}// Take all pending tasks and execute them until they cancelwhile (executor_tasks >0) {WorkOnTasks();}}
The blocked tasks are cleared first (to_be_rescheduled_tasks)
ThePipelineTask owns thePipelineExecutor
thePipelineExecutor owns theunique_ptr<LocalSourceState> local_source_state;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Thanks for the feedback!
Uh oh!
There was an error while loading.Please reload this page.
Incremental step on top of infrastructure added previously in#19422 and partially in#19566
This allows for some codepaths of ParquetReader to create a task for each range to be prefetched, and have those be executed in parallel, with the main task restarting afterward.
This is most visible when doing query like:
where one need to fetch only certain columns that are spaced apart AND row_group level parallelism potential can't be unlocked.
In this case before this PR a single task would do different sub-tasks like:
and afterward:
Where the subtasks are split (but with dependency on each other) and handled by the DuckDB own TaskExecutor. Note that the same logical thread might end up being scheduled for doing all the work (say system is already busy), but splitting subtasks into proper task is at very least more scheduling friendly.
Note that for the call-sites with a pattern like:
auto res = transport.Prefetch(file_size - prefetch_size, prefetch_size);res.ExecuteTasksSynchronously();That means Task are still executed as sub-tasks (and doing so sequentially). Only when the tasks (packaged as
AsyncResultare returned up to the PhysicalTableScan they are there and then scheduled.Main item that could use an extra pair of reviewer eyes: lifetime management of the referenced objects, especially in case of query cancellation.