Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

src: implement DataQueue and non-memory resident Blob#45258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
jasnell wants to merge3 commits intonodejs:mainfromjasnell:dataqueue
Closed
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletionsdoc/api/fs.md
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3324,6 +3324,45 @@ a colon, Node.js will open a file system stream, as described by
Functions based on `fs.open()` exhibit this behavior as well:
`fs.writeFile()`, `fs.readFile()`, etc.

### `fs.openAsBlob(path[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `path` {string|Buffer|URL}
* `options` {Object}
* `type` {string} An optional mime type for the blob.
* Return: {Promise} containing {Blob}

Returns a {Blob} whose data is backed by the given file.

The file must not be modified after the {Blob} is created. Any modifications
will cause reading the {Blob} data to fail with a `DOMException`.
error. Synchronous stat operations on the file when the `Blob` is created, and
before each read in order to detect whether the file data has been modified
on disk.

```mjs
import { openAsBlob } from 'node:fs';

const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
```

```cjs
const { openAsBlob } = require('node:fs');

(async () => {
const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
})();
```

### `fs.opendir(path[, options], callback)`

<!-- YAML
Expand Down
19 changes: 19 additions & 0 deletionslib/fs.js
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -32,6 +32,7 @@ const {
ObjectDefineProperties,
ObjectDefineProperty,
Promise,
PromiseResolve,
ReflectApply,
SafeMap,
SafeSet,
Expand DownExpand Up@@ -62,6 +63,9 @@ const { isArrayBufferView } = require('internal/util/types');
// it's re-initialized after deserialization.

const binding = internalBinding('fs');

const { createBlobFromFilePath } = require('internal/blob');

const { Buffer } = require('buffer');
const {
aggregateTwoErrors,
Expand DownExpand Up@@ -586,6 +590,20 @@ function openSync(path, flags, mode) {
return result;
}

/**
* @param {string | Buffer | URL } path
* @returns {Promise<Blob>}
*/
function openAsBlob(path, options = kEmptyObject) {
validateObject(options, 'options');
const type = options.type || '';
validateString(type, 'options.type');
// The underlying implementation here returns the Blob synchronously for now.
// To give ourselves flexibility to maybe return the Blob asynchronously,
// this API returns a Promise.
return PromiseResolve(createBlobFromFilePath(getValidatedPath(path), { type }));
}

/**
* Reads file from the specified `fd` (file descriptor).
* @param {number} fd
Expand DownExpand Up@@ -3022,6 +3040,7 @@ module.exports = fs = {
mkdtempSync,
open,
openSync,
openAsBlob,
readdir,
readdirSync,
read,
Expand Down
158 changes: 102 additions & 56 deletionslib/internal/blob.js
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
'use strict';

const {
ArrayBuffer,
ArrayFrom,
MathMax,
MathMin,
ObjectDefineProperties,
ObjectDefineProperty,
PromiseResolve,
PromiseReject,
SafePromisePrototypeFinally,
PromiseResolve,
ReflectConstruct,
RegExpPrototypeExec,
RegExpPrototypeSymbolReplace,
Expand All@@ -22,7 +22,8 @@ const {

const {
createBlob: _createBlob,
FixedSizeBlobCopyJob,
createBlobFromFilePath: _createBlobFromFilePath,
concat,
getDataObject,
} = internalBinding('blob');

Expand All@@ -48,32 +49,31 @@ const {
customInspectSymbol: kInspect,
kEmptyObject,
kEnumerableProperty,
lazyDOMException,
} = require('internal/util');
const { inspect } = require('internal/util/inspect');

const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
}
},
} = require('internal/errors');

const {
isUint32,
validateDictionary,
} = require('internal/validators');

const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const kHandle = Symbol('kHandle');
const kState = Symbol('kState');
const kIndex = Symbol('kIndex');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
const kArrayBufferPromise = Symbol('kArrayBufferPromise');

const kMaxChunkSize = 65536;

const disallowedTypeCharacters = /[^\u{0020}-\u{007E}]/u;

Expand DownExpand Up@@ -266,40 +266,35 @@ class Blob {
if (!isBlob(this))
return PromiseReject(new ERR_INVALID_THIS('Blob'));

// If there's already a promise in flight for the content,
// reuse it, but only while it's in flight. After the cached
// promise resolves it will be cleared, allowing it to be
// garbage collected as soon as possible.
if (this[kArrayBufferPromise])
return this[kArrayBufferPromise];

const job = new FixedSizeBlobCopyJob(this[kHandle]);

const ret = job.run();

// If the job returns a value immediately, the ArrayBuffer
// was generated synchronously and should just be returned
// directly.
if (ret !== undefined)
return PromiseResolve(ret);
if (this.size === 0) {
return PromiseResolve(new ArrayBuffer(0));
}

const {
promise,
resolve,
reject,
} = createDeferredPromise();

job.ondone = (err, ab) => {
if (err !== undefined)
return reject(new AbortError(undefined, { cause: err }));
resolve(ab);
const { promise, resolve, reject } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === 0) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
return;
}
if (buffer !== undefined)
buffers.push(buffer);
readNext();
});
};
this[kArrayBufferPromise] =
SafePromisePrototypeFinally(
promise,
() => this[kArrayBufferPromise] = undefined);

return this[kArrayBufferPromise];
readNext();
return promise;
}

/**
Expand All@@ -321,24 +316,63 @@ class Blob {
if (!isBlob(this))
throw new ERR_INVALID_THIS('Blob');

const self = this;
if (this.size === 0) {
return new lazyReadableStream({
start(c) { c.close(); }
});
}

const reader = this[kHandle].getReader();
return new lazyReadableStream({
async start() {
this[kState] = await self.arrayBuffer();
this[kIndex] = 0;
start(c) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
},

pull(controller) {
if (this[kState].byteLength - this[kIndex] <= kMaxChunkSize) {
controller.enqueue(new Uint8Array(this[kState], this[kIndex]));
controller.close();
this[kState] = undefined;
} else {
controller.enqueue(new Uint8Array(this[kState], this[kIndex], kMaxChunkSize));
this[kIndex] += kMaxChunkSize;
pull(c) {
const { promise, resolve, reject } = createDeferredPromise();
this.pendingPulls.push({ resolve, reject });
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// we can simply exit.
if (this.pendingPulls.length === 0) {
return;
}
const pending = this.pendingPulls.shift();
if (status === 0) {
// EOS
c.close();
pending.resolve();
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');

c.error(error);
pending.reject(error);
return;
}
if (buffer !== undefined) {
c.enqueue(new Uint8Array(buffer));
}
pending.resolve();
});
return promise;
},
cancel(reason) {
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
}
this.pendingPulls = [];
}
});
// We set the highWaterMark to 0 because we do not want the stream to
// start reading immediately on creation. We want it to wait until read
// is called.
}, new CountQueuingStrategy({ highWaterMark: 0 }));
}
}

Expand DownExpand Up@@ -406,10 +440,22 @@ function resolveObjectURL(url) {
}
}

// TODO(@jasnell): Now that the File class exists, we might consider having
// this return a `File` instead of a `Blob`.
function createBlobFromFilePath(path, options) {
const maybeBlob = _createBlobFromFilePath(path);
if (maybeBlob === undefined) {
return lazyDOMException('The blob could not be read', 'NotReadableError');
}
const { 0: blob, 1: length } = maybeBlob;
return createBlob(blob, length, options?.type);
}

module.exports = {
Blob,
ClonedBlob,
createBlob,
createBlobFromFilePath,
isBlob,
kHandle,
resolveObjectURL,
Expand Down
3 changes: 3 additions & 0 deletionsnode.gyp
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -477,6 +477,7 @@
'src/cleanup_queue.cc',
'src/connect_wrap.cc',
'src/connection_wrap.cc',
'src/dataqueue/queue.cc',
'src/debug_utils.cc',
'src/env.cc',
'src/fs_event_wrap.cc',
Expand DownExpand Up@@ -580,6 +581,7 @@
'src/cleanup_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/dataqueue/queue.h',
'src/debug_utils.h',
'src/debug_utils-inl.h',
'src/env_properties.h',
Expand DownExpand Up@@ -991,6 +993,7 @@
'test/cctest/test_sockaddr.cc',
'test/cctest/test_traced_value.cc',
'test/cctest/test_util.cc',
'test/cctest/test_dataqueue.cc',
],

'conditions': [
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp