Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9698e9e

Browse files
authored
Merge pull request#8812 from lnkuiper/parquet_too_many_files
Fix "Too many open files" issues
2 parents6eddf2c +4773ddb commit9698e9e

File tree

2 files changed

+33
-21
lines changed

2 files changed

+33
-21
lines changed

‎extension/parquet/parquet_extension.cpp‎

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include"duckdb/common/field_writer.hpp"
2222
#include"duckdb/common/file_system.hpp"
2323
#include"duckdb/common/multi_file_reader.hpp"
24+
#include"duckdb/common/serializer/format_deserializer.hpp"
25+
#include"duckdb/common/serializer/format_serializer.hpp"
2426
#include"duckdb/common/types/chunk_collection.hpp"
2527
#include"duckdb/function/copy_function.hpp"
2628
#include"duckdb/function/table_function.hpp"
@@ -35,8 +37,6 @@
3537
#include"duckdb/planner/operator/logical_get.hpp"
3638
#include"duckdb/storage/statistics/base_statistics.hpp"
3739
#include"duckdb/storage/table/row_group.hpp"
38-
#include"duckdb/common/serializer/format_serializer.hpp"
39-
#include"duckdb/common/serializer/format_deserializer.hpp"
4040

4141
#endif
4242

@@ -78,6 +78,8 @@ struct ParquetReadLocalState : public LocalTableFunctionState {
7878
DataChunk all_columns;
7979
};
8080

81+
enumclassParquetFileState :uint8_t { UNOPENED, OPENING, OPEN, CLOSED };
82+
8183
structParquetReadGlobalState :publicGlobalTableFunctionState {
8284
mutex lock;
8385

@@ -86,7 +88,7 @@ struct ParquetReadGlobalState : public GlobalTableFunctionState {
8688
//! Currently opened readers
8789
vector<shared_ptr<ParquetReader>> readers;
8890
//! Flag to indicate a file is being opened
89-
vector<bool> file_opening;
91+
vector<ParquetFileState> file_states;
9092
//! Mutexes to wait for a file that is currently being opened
9193
unique_ptr<mutex[]> file_mutexes;
9294
//! Signal to other threads that a file failed to open, letting every thread abort.
@@ -359,14 +361,16 @@ class ParquetScanFunction {
359361
auto &bind_data = input.bind_data->CastNoConst<ParquetReadBindData>();
360362
auto result = make_uniq<ParquetReadGlobalState>();
361363

362-
result->file_opening = vector<bool>(bind_data.files.size(),false);
364+
result->file_states = vector<ParquetFileState>(bind_data.files.size(),ParquetFileState::UNOPENED);
363365
result->file_mutexes = unique_ptr<mutex[]>(new mutex[bind_data.files.size()]);
364366
if (bind_data.files.empty()) {
365367
result->initial_reader =nullptr;
366368
}else {
367369
result->readers =std::move(bind_data.union_readers);
368370
if (result->readers.size() != bind_data.files.size()) {
369371
result->readers = vector<shared_ptr<ParquetReader>>(bind_data.files.size(),nullptr);
372+
}else {
373+
std::fill(result->file_states.begin(), result->file_states.end(), ParquetFileState::OPEN);
370374
}
371375
if (bind_data.initial_reader) {
372376
result->initial_reader =std::move(bind_data.initial_reader);
@@ -378,6 +382,7 @@ class ParquetScanFunction {
378382
make_shared<ParquetReader>(context, bind_data.files[0], bind_data.parquet_options);
379383
result->readers[0] = result->initial_reader;
380384
}
385+
result->file_states[0] = ParquetFileState::OPEN;
381386
}
382387
for (auto &reader : result->readers) {
383388
if (!reader) {
@@ -511,7 +516,7 @@ class ParquetScanFunction {
511516

512517
D_ASSERT(parallel_state.initial_reader);
513518

514-
if (parallel_state.readers[parallel_state.file_index]) {
519+
if (parallel_state.file_states[parallel_state.file_index] == ParquetFileState::OPEN) {
515520
if (parallel_state.row_group_index <
516521
parallel_state.readers[parallel_state.file_index]->NumRowGroups()) {
517522
// The current reader has rowgroups left to be scanned
@@ -523,12 +528,14 @@ class ParquetScanFunction {
523528
parallel_state.row_group_index++;
524529
returntrue;
525530
}else {
531+
// Close current file
532+
parallel_state.file_states[parallel_state.file_index] = ParquetFileState::CLOSED;
533+
parallel_state.readers[parallel_state.file_index] =nullptr;
534+
526535
// Set state to the next file
527536
parallel_state.file_index++;
528537
parallel_state.row_group_index =0;
529538

530-
parallel_state.readers[parallel_state.file_index -1] =nullptr;
531-
532539
if (parallel_state.file_index >= bind_data.files.size()) {
533540
returnfalse;
534541
}
@@ -541,8 +548,7 @@ class ParquetScanFunction {
541548
}
542549

543550
// Check if the current file is being opened, in that case we need to wait for it.
544-
if (!parallel_state.readers[parallel_state.file_index] &&
545-
parallel_state.file_opening[parallel_state.file_index]) {
551+
if (parallel_state.file_states[parallel_state.file_index] == ParquetFileState::OPENING) {
546552
WaitForFile(parallel_state.file_index, parallel_state, parallel_lock);
547553
}
548554
}
@@ -573,7 +579,8 @@ class ParquetScanFunction {
573579
// - the thread opening the file has failed
574580
// - the file was somehow scanned till the end while we were waiting
575581
if (parallel_state.file_index >= parallel_state.readers.size() ||
576-
parallel_state.readers[parallel_state.file_index] || parallel_state.error_opening_file) {
582+
parallel_state.file_states[parallel_state.file_index] != ParquetFileState::OPENING ||
583+
parallel_state.error_opening_file) {
577584
return;
578585
}
579586
}
@@ -583,10 +590,12 @@ class ParquetScanFunction {
583590
staticboolTryOpenNextFile(ClientContext &context,const ParquetReadBindData &bind_data,
584591
ParquetReadLocalState &scan_data, ParquetReadGlobalState &parallel_state,
585592
unique_lock<mutex> &parallel_lock) {
586-
for (idx_t i = parallel_state.file_index; i < bind_data.files.size(); i++) {
587-
if (!parallel_state.readers[i] && parallel_state.file_opening[i] ==false) {
593+
constauto num_threads =TaskScheduler::GetScheduler(context).NumberOfThreads();
594+
constauto file_index_limit = MinValue<idx_t>(parallel_state.file_index + num_threads, bind_data.files.size());
595+
for (idx_t i = parallel_state.file_index; i < file_index_limit; i++) {
596+
if (parallel_state.file_states[i] == ParquetFileState::UNOPENED) {
588597
string file = bind_data.files[i];
589-
parallel_state.file_opening[i] =true;
598+
parallel_state.file_states[i] =ParquetFileState::OPENING;
590599
auto pq_options = parallel_state.initial_reader->parquet_options;
591600

592601
// Now we switch which lock we are holding, instead of locking the global state, we grab the lock on
@@ -611,6 +620,7 @@ class ParquetScanFunction {
611620
// Now re-lock the state and add the reader
612621
parallel_lock.lock();
613622
parallel_state.readers[i] = reader;
623+
parallel_state.file_states[i] = ParquetFileState::OPEN;
614624

615625
returntrue;
616626
}

‎src/storage/standard_buffer_manager.cpp‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
#include"duckdb/common/allocator.hpp"
44
#include"duckdb/common/exception.hpp"
55
#include"duckdb/common/set.hpp"
6-
#include"duckdb/storage/in_memory_block_manager.hpp"
7-
#include"duckdb/storage/storage_manager.hpp"
86
#include"duckdb/main/attached_database.hpp"
97
#include"duckdb/main/database.hpp"
108
#include"duckdb/storage/buffer/buffer_pool.hpp"
9+
#include"duckdb/storage/in_memory_block_manager.hpp"
10+
#include"duckdb/storage/storage_manager.hpp"
1111

1212
namespaceduckdb {
1313

@@ -329,12 +329,13 @@ struct BlockIndexManager {
329329
};
330330

331331
classTemporaryFileHandle {
332-
constexprstaticidx_tMAX_ALLOWED_INDEX =4000;
332+
constexprstaticidx_tMAX_ALLOWED_INDEX_BASE =4000;
333333

334334
public:
335-
TemporaryFileHandle(DatabaseInstance &db,const string &temp_directory,idx_t index)
336-
: db(db), file_index(index), path(FileSystem::GetFileSystem(db).JoinPath(
337-
temp_directory,"duckdb_temp_storage-" + to_string(index) +".tmp")) {
335+
TemporaryFileHandle(idx_t temp_file_count, DatabaseInstance &db,const string &temp_directory,idx_t index)
336+
: max_allowed_index((1 << temp_file_count) * MAX_ALLOWED_INDEX_BASE), db(db), file_index(index),
337+
path(FileSystem::GetFileSystem(db).JoinPath(temp_directory,
338+
"duckdb_temp_storage-" + to_string(index) + ".tmp")) {
338339
}
339340

340341
public:
@@ -348,7 +349,7 @@ class TemporaryFileHandle {
348349
public:
349350
TemporaryFileIndexTryGetBlockIndex() {
350351
TemporaryFileLocklock(file_lock);
351-
if (index_manager.GetMaxIndex() >=MAX_ALLOWED_INDEX && index_manager.HasFreeBlocks()) {
352+
if (index_manager.GetMaxIndex() >=max_allowed_index && index_manager.HasFreeBlocks()) {
352353
// file is at capacity
353354
returnTemporaryFileIndex();
354355
}
@@ -426,6 +427,7 @@ class TemporaryFileHandle {
426427
}
427428

428429
private:
430+
constidx_t max_allowed_index;
429431
DatabaseInstance &db;
430432
unique_ptr<FileHandle> handle;
431433
idx_t file_index;
@@ -467,7 +469,7 @@ class TemporaryFileManager {
467469
if (!handle) {
468470
// no existing handle to write to; we need to create & open a new file
469471
auto new_file_index = index_manager.GetNewBlockIndex();
470-
auto new_file = make_uniq<TemporaryFileHandle>(db, temp_directory, new_file_index);
472+
auto new_file = make_uniq<TemporaryFileHandle>(files.size(),db, temp_directory, new_file_index);
471473
handle = new_file.get();
472474
files[new_file_index] =std::move(new_file);
473475

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp