2121#include " duckdb/common/field_writer.hpp"
2222#include " duckdb/common/file_system.hpp"
2323#include " duckdb/common/multi_file_reader.hpp"
24+ #include " duckdb/common/serializer/format_deserializer.hpp"
25+ #include " duckdb/common/serializer/format_serializer.hpp"
2426#include " duckdb/common/types/chunk_collection.hpp"
2527#include " duckdb/function/copy_function.hpp"
2628#include " duckdb/function/table_function.hpp"
3537#include " duckdb/planner/operator/logical_get.hpp"
3638#include " duckdb/storage/statistics/base_statistics.hpp"
3739#include " duckdb/storage/table/row_group.hpp"
38- #include " duckdb/common/serializer/format_serializer.hpp"
39- #include " duckdb/common/serializer/format_deserializer.hpp"
4040
4141#endif
4242
@@ -78,6 +78,8 @@ struct ParquetReadLocalState : public LocalTableFunctionState {
7878DataChunk all_columns;
7979};
8080
81+ enum class ParquetFileState :uint8_t { UNOPENED, OPENING, OPEN, CLOSED };
82+
8183struct ParquetReadGlobalState :public GlobalTableFunctionState {
8284mutex lock;
8385
@@ -86,7 +88,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {
8688// ! Currently opened readers
8789vector<shared_ptr<ParquetReader>> readers;
8890// ! Flag to indicate a file is being opened
89- vector<bool > file_opening ;
91+ vector<ParquetFileState> file_states ;
9092// ! Mutexes to wait for a file that is currently being opened
9193unique_ptr<mutex[]> file_mutexes;
9294// ! Signal to other threads that a file failed to open, letting every thread abort.
@@ -359,14 +361,16 @@ class ParquetScanFunction {
359361auto &bind_data = input.bind_data ->CastNoConst <ParquetReadBindData>();
360362auto result = make_uniq<ParquetReadGlobalState>();
361363
362- result->file_opening = vector<bool >(bind_data.files .size (),false );
364+ result->file_states = vector<ParquetFileState >(bind_data.files .size (),ParquetFileState::UNOPENED );
363365result->file_mutexes = unique_ptr<mutex[]>(new mutex[bind_data.files .size ()]);
364366if (bind_data.files .empty ()) {
365367result->initial_reader =nullptr ;
366368}else {
367369result->readers =std::move (bind_data.union_readers );
368370if (result->readers .size () != bind_data.files .size ()) {
369371result->readers = vector<shared_ptr<ParquetReader>>(bind_data.files .size (),nullptr );
372+ }else {
373+ std::fill (result->file_states .begin (), result->file_states .end (), ParquetFileState::OPEN);
370374}
371375if (bind_data.initial_reader ) {
372376result->initial_reader =std::move (bind_data.initial_reader );
@@ -378,6 +382,7 @@ class ParquetScanFunction {
378382 make_shared<ParquetReader>(context, bind_data.files [0 ], bind_data.parquet_options );
379383result->readers [0 ] = result->initial_reader ;
380384}
385+ result->file_states [0 ] = ParquetFileState::OPEN;
381386}
382387for (auto &reader : result->readers ) {
383388if (!reader) {
@@ -511,7 +516,7 @@ class ParquetScanFunction {
511516
512517D_ASSERT (parallel_state.initial_reader );
513518
514- if (parallel_state.readers [parallel_state.file_index ]) {
519+ if (parallel_state.file_states [parallel_state.file_index ] == ParquetFileState::OPEN ) {
515520if (parallel_state.row_group_index <
516521 parallel_state.readers [parallel_state.file_index ]->NumRowGroups ()) {
517522// The current reader has rowgroups left to be scanned
@@ -523,12 +528,14 @@ class ParquetScanFunction {
523528parallel_state.row_group_index ++;
524529return true ;
525530}else {
531+ // Close current file
532+ parallel_state.file_states [parallel_state.file_index ] = ParquetFileState::CLOSED;
533+ parallel_state.readers [parallel_state.file_index ] =nullptr ;
534+
526535// Set state to the next file
527536parallel_state.file_index ++;
528537parallel_state.row_group_index =0 ;
529538
530- parallel_state.readers [parallel_state.file_index -1 ] =nullptr ;
531-
532539if (parallel_state.file_index >= bind_data.files .size ()) {
533540return false ;
534541}
@@ -541,8 +548,7 @@ class ParquetScanFunction {
541548}
542549
543550// Check if the current file is being opened, in that case we need to wait for it.
544- if (!parallel_state.readers [parallel_state.file_index ] &&
545- parallel_state.file_opening [parallel_state.file_index ]) {
551+ if (parallel_state.file_states [parallel_state.file_index ] == ParquetFileState::OPENING) {
546552WaitForFile (parallel_state.file_index , parallel_state, parallel_lock);
547553}
548554}
@@ -573,7 +579,8 @@ class ParquetScanFunction {
573579// - the thread opening the file has failed
574580// - the file was somehow scanned till the end while we were waiting
575581if (parallel_state.file_index >= parallel_state.readers .size () ||
576- parallel_state.readers [parallel_state.file_index ] || parallel_state.error_opening_file ) {
582+ parallel_state.file_states [parallel_state.file_index ] != ParquetFileState::OPENING ||
583+ parallel_state.error_opening_file ) {
577584return ;
578585}
579586}
@@ -583,10 +590,12 @@ class ParquetScanFunction {
583590static bool TryOpenNextFile (ClientContext &context,const ParquetReadBindData &bind_data,
584591 ParquetReadLocalState &scan_data, ParquetReadGlobalState ¶llel_state,
585592 unique_lock<mutex> ¶llel_lock) {
586- for (idx_t i = parallel_state.file_index ; i < bind_data.files .size (); i++) {
587- if (!parallel_state.readers [i] && parallel_state.file_opening [i] ==false ) {
593+ const auto num_threads =TaskScheduler::GetScheduler (context).NumberOfThreads ();
594+ const auto file_index_limit = MinValue<idx_t >(parallel_state.file_index + num_threads, bind_data.files .size ());
595+ for (idx_t i = parallel_state.file_index ; i < file_index_limit; i++) {
596+ if (parallel_state.file_states [i] == ParquetFileState::UNOPENED) {
588597string file = bind_data.files [i];
589- parallel_state.file_opening [i] =true ;
598+ parallel_state.file_states [i] =ParquetFileState::OPENING ;
590599auto pq_options = parallel_state.initial_reader ->parquet_options ;
591600
592601// Now we switch which lock we are holding, instead of locking the global state, we grab the lock on
@@ -611,6 +620,7 @@ class ParquetScanFunction {
611620// Now re-lock the state and add the reader
612621parallel_lock.lock ();
613622parallel_state.readers [i] = reader;
623+ parallel_state.file_states [i] = ParquetFileState::OPEN;
614624
615625return true ;
616626}