src, lib: fixup lint and format issues for DataQueue/Blob

Co-authored-by: flakey5 <73616808+flakey5@users.noreply.github.com>
PR-URL: https://github.com/nodejs/node/pull/45258
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell
2022-12-17 13:58:26 -08:00
parent 950cec4c26
commit 71fb06fd64
18 changed files with 1261 additions and 1137 deletions

View File

@@ -202,15 +202,6 @@ When operating on file handles, the mode cannot be changed from what it was set
to with [`fsPromises.open()`][]. Therefore, this is equivalent to
[`filehandle.writeFile()`][].
#### `filehandle.blob()`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Returns a {Blob} whose data is backed by this file.
#### `filehandle.chmod(mode)`
<!-- YAML
@@ -3333,6 +3324,45 @@ a colon, Node.js will open a file system stream, as described by
Functions based on `fs.open()` exhibit this behavior as well:
`fs.writeFile()`, `fs.readFile()`, etc.
### `fs.openAsBlob(path[, options])`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* `path` {string|Buffer|URL}
* `options` {Object}
* `type` {string} An optional mime type for the blob.
* Return: {Promise} containing {Blob}
Returns a {Blob} whose data is backed by the given file.
The file must not be modified after the {Blob} is created. Any modifications
will cause reading the {Blob} data to fail with a `DOMException`.
error. Synchronous stat operations on the file when the `Blob` is created, and
before each read in order to detect whether the file data has been modified
on disk.
```mjs
import { openAsBlob } from 'node:fs';
const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
```
```cjs
const { openAsBlob } = require('node:fs');
(async () => {
const blob = await openAsBlob('the.file.txt');
const ab = await blob.arrayBuffer();
blob.stream();
})();
```
### `fs.opendir(path[, options], callback)`
<!-- YAML

View File

@@ -32,6 +32,7 @@ const {
ObjectDefineProperties,
ObjectDefineProperty,
Promise,
PromiseResolve,
ReflectApply,
SafeMap,
SafeSet,
@@ -62,6 +63,9 @@ const { isArrayBufferView } = require('internal/util/types');
// it's re-initialized after deserialization.
const binding = internalBinding('fs');
const { createBlobFromFilePath } = require('internal/blob');
const { Buffer } = require('buffer');
const {
aggregateTwoErrors,
@@ -586,6 +590,20 @@ function openSync(path, flags, mode) {
return result;
}
/**
* @param {string | Buffer | URL } path
* @returns {Promise<Blob>}
*/
function openAsBlob(path, options = kEmptyObject) {
validateObject(options, 'options');
const type = options.type || '';
validateString(type, 'options.type');
// The underlying implementation here returns the Blob synchronously for now.
// To give ourselves flexibility to maybe return the Blob asynchronously,
// this API returns a Promise.
return PromiseResolve(createBlobFromFilePath(getValidatedPath(path), { type }));
}
/**
* Reads file from the specified `fd` (file descriptor).
* @param {number} fd
@@ -3022,6 +3040,7 @@ module.exports = fs = {
mkdtempSync,
open,
openSync,
openAsBlob,
readdir,
readdirSync,
read,

View File

@@ -1,6 +1,7 @@
'use strict';
const {
ArrayBuffer,
ArrayFrom,
MathMax,
MathMin,
@@ -21,7 +22,7 @@ const {
const {
createBlob: _createBlob,
createBlobFromFileHandle: _createBlobFromFileHandle,
createBlobFromFilePath: _createBlobFromFilePath,
concat,
getDataObject,
} = internalBinding('blob');
@@ -48,6 +49,7 @@ const {
customInspectSymbol: kInspect,
kEmptyObject,
kEnumerableProperty,
lazyDOMException,
} = require('internal/util');
const { inspect } = require('internal/util/inspect');
@@ -58,7 +60,6 @@ const {
ERR_INVALID_THIS,
ERR_BUFFER_TOO_LARGE,
},
errnoException,
} = require('internal/errors');
const {
@@ -66,6 +67,10 @@ const {
validateDictionary,
} = require('internal/validators');
const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');
const kHandle = Symbol('kHandle');
const kType = Symbol('kType');
const kLength = Symbol('kLength');
@@ -265,16 +270,23 @@ class Blob {
return PromiseResolve(new ArrayBuffer(0));
}
const { promise, resolve } = createDeferredPromise();
const { promise, resolve, reject } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === -1) {
if (status === 0) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
return;
}
if (buffer !== undefined)
buffers.push(buffer);
@@ -319,7 +331,7 @@ class Blob {
},
pull(c) {
const { promise, resolve, reject } = createDeferredPromise();
this.pendingPulls.push({resolve, reject});
this.pendingPulls.push({ resolve, reject });
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
@@ -328,18 +340,24 @@ class Blob {
return;
}
const pending = this.pendingPulls.shift();
if (status === -1 || (status === 0 && buffer === undefined)) {
if (status === 0) {
// EOS
c.close();
pending.resolve();
return;
} else if (status < 0) {
const error = errnoException(status, 'read');
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
c.error(error);
pending.reject(error);
return;
}
c.enqueue(new Uint8Array(buffer));
if (buffer !== undefined) {
c.enqueue(new Uint8Array(buffer));
}
pending.resolve();
});
return promise;
@@ -422,16 +440,22 @@ function resolveObjectURL(url) {
}
}
function createBlobFromFileHandle(handle) {
const [blob, length] = _createBlobFromFileHandle(handle);
return createBlob(blob, length);
// TODO(@jasnell): Now that the File class exists, we might consider having
// this return a `File` instead of a `Blob`.
function createBlobFromFilePath(path, options) {
const maybeBlob = _createBlobFromFilePath(path);
if (maybeBlob === undefined) {
return lazyDOMException('The blob could not be read', 'NotReadableError');
}
const { 0: blob, 1: length } = maybeBlob;
return createBlob(blob, length, options?.type);
}
module.exports = {
Blob,
ClonedBlob,
createBlob,
createBlobFromFileHandle,
createBlobFromFilePath,
isBlob,
kHandle,
resolveObjectURL,

View File

@@ -25,8 +25,6 @@ const {
S_IFREG
} = constants;
const { createBlobFromFileHandle } = require('internal/blob');
const binding = internalBinding('fs');
const { Buffer } = require('buffer');
@@ -312,14 +310,6 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return new WriteStream(undefined, { ...options, fd: this });
}
/**
* @typedef {import('../blob').Blob} Blob
* @returns {Blob}
*/
blob() {
return createBlobFromFileHandle(this[kHandle]);
}
[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',

View File

@@ -31,49 +31,49 @@
namespace node {
#define NODE_ASYNC_NON_CRYPTO_PROVIDER_TYPES(V) \
V(NONE) \
V(DIRHANDLE) \
V(DNSCHANNEL) \
V(ELDHISTOGRAM) \
V(FILEHANDLE) \
V(FILEHANDLECLOSEREQ) \
V(BLOBREADER) \
V(FSEVENTWRAP) \
V(FSREQCALLBACK) \
V(FSREQPROMISE) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(HEAPSNAPSHOT) \
V(HTTP2SESSION) \
V(HTTP2STREAM) \
V(HTTP2PING) \
V(HTTP2SETTINGS) \
V(HTTPINCOMINGMESSAGE) \
V(HTTPCLIENTREQUEST) \
V(JSSTREAM) \
V(JSUDPWRAP) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(PROMISE) \
V(QUERYWRAP) \
V(SHUTDOWNWRAP) \
V(SIGNALWRAP) \
V(STATWATCHER) \
V(STREAMPIPE) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WRITEWRAP) \
#define NODE_ASYNC_NON_CRYPTO_PROVIDER_TYPES(V) \
V(NONE) \
V(DIRHANDLE) \
V(DNSCHANNEL) \
V(ELDHISTOGRAM) \
V(FILEHANDLE) \
V(FILEHANDLECLOSEREQ) \
V(BLOBREADER) \
V(FSEVENTWRAP) \
V(FSREQCALLBACK) \
V(FSREQPROMISE) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(HEAPSNAPSHOT) \
V(HTTP2SESSION) \
V(HTTP2STREAM) \
V(HTTP2PING) \
V(HTTP2SETTINGS) \
V(HTTPINCOMINGMESSAGE) \
V(HTTPCLIENTREQUEST) \
V(JSSTREAM) \
V(JSUDPWRAP) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(PROMISE) \
V(QUERYWRAP) \
V(SHUTDOWNWRAP) \
V(SIGNALWRAP) \
V(STATWATCHER) \
V(STREAMPIPE) \
V(TCPCONNECTWRAP) \
V(TCPSERVERWRAP) \
V(TCPWRAP) \
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WRITEWRAP) \
V(ZLIB)
#if HAVE_OPENSSL

File diff suppressed because it is too large Load Diff

View File

@@ -3,15 +3,16 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include <base_object.h>
#include <memory_tracker.h>
#include <node.h>
#include <node_bob.h>
#include <node_file.h>
#include <memory_tracker.h>
#include <stream_base.h>
#include <v8.h>
#include <uv.h>
#include <v8.h>
#include <memory>
#include <optional>
#include <vector>
namespace node {
@@ -101,17 +102,18 @@ namespace node {
// To read from a DataQueue, we use the node::bob::Source API
// (see src/node_bob.h).
//
// std::unique_ptr<DataQueue::Reader> reader = data_queue->getReader();
// std::shared_ptr<DataQueue::Reader> reader = data_queue->get_reader();
//
// reader->Pull(
// [](int status, const DataQueue::Vec* vecs, size_t count, Done done) {
// [](int status, const DataQueue::Vec* vecs,
// uint64_t count, Done done) {
// // status is one of node::bob::Status
// // vecs is zero or more data buffers containing the read data
// // count is the number of vecs
// // done is a callback to be invoked when done processing the data
// }, options, nullptr, 0, 16);
//
// Keep calling Pull() until status is equal to node::bob::Status::STATUS_END.
// Keep calling Pull() until status is equal to node::bob::Status::STATUS_EOS.
//
// For idempotent DataQueues, any number of readers can be created and
// pull concurrently from the same DataQueue. The DataQueue can be read
@@ -126,15 +128,14 @@ class DataQueue : public MemoryRetainer {
public:
struct Vec {
uint8_t* base;
size_t len;
uint64_t len;
};
// A DataQueue::Reader consumes the DataQueue. If the data queue is
// idempotent, multiple Readers can be attached to the DataQueue at
// any given time, all guaranteed to yield the same result when the
// data is read. Otherwise, only a single Reader can be attached.
class Reader : public MemoryRetainer,
public bob::Source<Vec> {
class Reader : public MemoryRetainer, public bob::Source<Vec> {
public:
using Next = bob::Next<Vec>;
using Done = bob::Done;
@@ -150,26 +151,25 @@ class DataQueue : public MemoryRetainer {
// offset is omitted, the slice extends to the end of the
// data.
//
// Creating a slice is only possible if isIdempotent() returns
// Creating a slice is only possible if is_idempotent() returns
// true. This is because consuming either the original entry or
// the new entry would change the state of the other in non-
// deterministic ways. When isIdempotent() returns false, slice()
// deterministic ways. When is_idempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
// entry is known. If size() returns v8::Nothing<size_t>, slice()
// entry is known. If size() returns std::nullopt, slice()
// must return a nulled unique_ptr.
virtual std::unique_ptr<Entry> slice(
size_t start,
v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
// Returns the number of bytes represented by this Entry if it is
// known. Certain types of entries, such as those backed by streams
// might not know the size in advance and therefore cannot provide
// a value. In such cases, size() must return v8::Nothing<size_t>.
// a value. In such cases, size() must return v8::Nothing<uint64_t>.
//
// If the entry is idempotent, a size should always be available.
virtual v8::Maybe<size_t> size() const = 0;
virtual std::optional<uint64_t> size() const = 0;
// When true, multiple reads on the object must produce the exact
// same data or the reads will fail. Some sources of entry data,
@@ -177,7 +177,7 @@ class DataQueue : public MemoryRetainer {
// and therefore must not claim to be. If an entry claims to be
// idempotent and cannot preserve that quality, subsequent reads
// must fail with an error when a variance is detected.
virtual bool isIdempotent() const = 0;
virtual bool is_idempotent() const = 0;
};
// Creates an idempotent DataQueue with a pre-established collection
@@ -190,7 +190,7 @@ class DataQueue : public MemoryRetainer {
// mutated and updated such that multiple reads are not guaranteed
// to produce the same result. The entries added can be of any type.
static std::shared_ptr<DataQueue> Create(
v8::Maybe<size_t> capped = v8::Nothing<size_t>());
std::optional<uint64_t> capped = std::nullopt);
// Creates an idempotent Entry from a v8::ArrayBufferView. To help
// ensure idempotency, the underlying ArrayBuffer is detached from
@@ -207,26 +207,26 @@ class DataQueue : public MemoryRetainer {
// is not detachable, nullptr will be returned.
static std::unique_ptr<Entry> CreateInMemoryEntryFromBackingStore(
std::shared_ptr<v8::BackingStore> store,
size_t offset,
size_t length);
uint64_t offset,
uint64_t length);
static std::unique_ptr<Entry> CreateDataQueueEntry(
std::shared_ptr<DataQueue> data_queue);
static std::unique_ptr<Entry> CreateFdEntry(
BaseObjectPtr<fs::FileHandle> handle);
static std::unique_ptr<Entry> CreateFdEntry(Environment* env,
v8::Local<v8::Value> path);
// Creates a Reader for the given queue. If the queue is idempotent,
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
// permitted.
virtual std::unique_ptr<Reader> getReader() = 0;
virtual std::shared_ptr<Reader> get_reader() = 0;
// Append a single new entry to the queue. Appending is only allowed
// when isIdempotent() is false. v8::Nothing<bool>() will be returned
// if isIdempotent() is true. v8::Just(false) will be returned if the
// when is_idempotent() is false. std::nullopt will be returned
// if is_idempotent() is true. std::optional(false) will be returned if the
// data queue is not idempotent but the entry otherwise cannot be added.
virtual v8::Maybe<bool> append(std::unique_ptr<Entry> entry) = 0;
virtual std::optional<bool> append(std::unique_ptr<Entry> entry) = 0;
// Caps the size of this DataQueue preventing additional entries to
// be added if those cause the size to extend beyond the specified
@@ -239,12 +239,12 @@ class DataQueue : public MemoryRetainer {
// If the size of the data queue is not known, the limit will be
// ignored and no additional entries will be allowed at all.
//
// If isIdempotent is true capping is unnecessary because the data
// If is_idempotent is true capping is unnecessary because the data
// queue cannot be appended to. In that case, cap() is a non-op.
//
// If the data queue has already been capped, cap can be called
// again with a smaller size.
virtual void cap(size_t limit = 0) = 0;
virtual void cap(uint64_t limit = 0) = 0;
// Returns a new DataQueue that is a view over this queues data
// from the start offset to the ending offset. If the end offset
@@ -252,39 +252,38 @@ class DataQueue : public MemoryRetainer {
//
// The slice will coverage a range from start up to, but excluding, end.
//
// Creating a slice is only possible if isIdempotent() returns
// Creating a slice is only possible if is_idempotent() returns
// true. This is because consuming either the original DataQueue or
// the new queue would change the state of the other in non-
// deterministic ways. When isIdempotent() returns false, slice()
// deterministic ways. When is_idempotent() returns false, slice()
// must return a nulled unique_ptr.
//
// Creating a slice is also only possible if the size of the
// DataQueue is known. If size() returns v8::Nothing<size_t>, slice()
// DataQueue is known. If size() returns std::nullopt, slice()
// must return a null unique_ptr.
virtual std::shared_ptr<DataQueue> slice(
size_t start,
v8::Maybe<size_t> end = v8::Nothing<size_t>()) = 0;
uint64_t start, std::optional<uint64_t> end = std::nullopt) = 0;
// The size of DataQueue is the total size of all of its member entries.
// If any of the entries is not able to specify a size, the DataQueue
// will also be incapable of doing so, in which case size() must return
// v8::Nothing<size_t>.
virtual v8::Maybe<size_t> size() const = 0;
// std::nullopt.
virtual std::optional<uint64_t> size() const = 0;
// A DataQueue is idempotent only if all of its member entries are
// idempotent.
virtual bool isIdempotent() const = 0;
virtual bool is_idempotent() const = 0;
// True only if cap is called or the data queue is a limited to a
// fixed size.
virtual bool isCapped() const = 0;
virtual bool is_capped() const = 0;
// If the data queue has been capped, and the size of the data queue
// is known, maybeCapRemaining will return the number of additional
// bytes the data queue can receive before reaching the cap limit.
// If the size of the queue cannot be known, or the cap has not
// been set, maybeCapRemaining() will return v8::Nothing<size_t>.
virtual v8::Maybe<size_t> maybeCapRemaining() const = 0;
// been set, maybeCapRemaining() will return std::nullopt.
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);

View File

@@ -371,7 +371,6 @@
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)
#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \
V(async_hooks_after_function, v8::Function) \
V(async_hooks_before_function, v8::Function) \

View File

@@ -1,7 +1,6 @@
#include "node_blob.h"
#include "async_wrap-inl.h"
#include "base_object-inl.h"
#include "base_object.h"
#include "env-inl.h"
#include "memory_tracker-inl.h"
#include "node_bob-inl.h"
@@ -19,7 +18,6 @@ using v8::ArrayBuffer;
using v8::ArrayBufferView;
using v8::BackingStore;
using v8::Context;
using v8::EscapableHandleScope;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
@@ -28,8 +26,6 @@ using v8::HandleScope;
using v8::Int32;
using v8::Isolate;
using v8::Local;
using v8::MaybeLocal;
using v8::Number;
using v8::Object;
using v8::String;
using v8::Uint32;
@@ -54,55 +50,58 @@ void Concat(const FunctionCallbackInfo<Value>& args) {
std::vector<View> views;
size_t total = 0;
const auto doConcat = [&](View* view, size_t size) {
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), total);
uint8_t* ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < size; n++) {
uint8_t* from = static_cast<uint8_t*>(view[n].store->Data()) + view[n].offset;
std::copy(from, from + view[n].length, ptr);
ptr += view[n].length;
}
return ArrayBuffer::New(env->isolate(), store);
};
for (uint32_t n = 0; n < array->Length(); n++) {
Local<Value> val;
if (!array->Get(env->context(), n).ToLocal(&val))
return;
if (!array->Get(env->context(), n).ToLocal(&val)) return;
if (val->IsArrayBuffer()) {
auto ab = val.As<ArrayBuffer>();
views.push_back(View { ab->GetBackingStore(), ab->ByteLength(), 0 });
views.push_back(View{ab->GetBackingStore(), ab->ByteLength(), 0});
total += ab->ByteLength();
} else {
CHECK(val->IsArrayBufferView());
auto view = val.As<ArrayBufferView>();
views.push_back(View {
view->Buffer()->GetBackingStore(),
view->ByteLength(),
view->ByteOffset()
});
views.push_back(View{view->Buffer()->GetBackingStore(),
view->ByteLength(),
view->ByteOffset()});
total += view->ByteLength();
}
}
args.GetReturnValue().Set(doConcat(views.data(), views.size()));
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), total);
uint8_t* ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < views.size(); n++) {
uint8_t* from =
static_cast<uint8_t*>(views[n].store->Data()) + views[n].offset;
std::copy(from, from + views[n].length, ptr);
ptr += views[n].length;
}
args.GetReturnValue().Set(ArrayBuffer::New(env->isolate(), std::move(store)));
}
void BlobFromFileHandle(const FunctionCallbackInfo<Value>& args) {
void BlobFromFilePath(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
fs::FileHandle* fileHandle;
ASSIGN_OR_RETURN_UNWRAP(&fileHandle, args[0]);
auto entry = DataQueue::CreateFdEntry(env, args[0]);
if (entry == nullptr) {
return THROW_ERR_INVALID_ARG_VALUE(env, "Unabled to open file as blob");
}
std::vector<std::unique_ptr<DataQueue::Entry>> entries;
entries.push_back(DataQueue::CreateFdEntry(BaseObjectPtr<fs::FileHandle>(fileHandle)));
entries.push_back(std::move(entry));
auto blob = Blob::Create(env, DataQueue::CreateIdempotent(std::move(entries)));
auto blob =
Blob::Create(env, DataQueue::CreateIdempotent(std::move(entries)));
auto array = Array::New(env->isolate(), 2);
USE(array->Set(env->context(), 0, blob->object()));
USE(array->Set(env->context(), 1, Uint32::NewFromUnsigned(env->isolate(), blob->length())));
if (blob) {
auto array = Array::New(env->isolate(), 2);
USE(array->Set(env->context(), 0, blob->object()));
USE(array->Set(env->context(),
1,
Uint32::NewFromUnsigned(env->isolate(), blob->length())));
if (blob) args.GetReturnValue().Set(array);
args.GetReturnValue().Set(array);
}
}
} // namespace
@@ -122,7 +121,7 @@ void Blob::Initialize(
SetMethod(context, target, "getDataObject", GetDataObject);
SetMethod(context, target, "revokeDataObject", RevokeDataObject);
SetMethod(context, target, "concat", Concat);
SetMethod(context, target, "createBlobFromFileHandle", BlobFromFileHandle);
SetMethod(context, target, "createBlobFromFilePath", BlobFromFilePath);
}
Local<FunctionTemplate> Blob::GetConstructorTemplate(Environment* env) {
@@ -146,9 +145,8 @@ bool Blob::HasInstance(Environment* env, v8::Local<v8::Value> object) {
return GetConstructorTemplate(env)->HasInstance(object);
}
BaseObjectPtr<Blob> Blob::Create(
Environment* env,
std::shared_ptr<DataQueue> data_queue) {
BaseObjectPtr<Blob> Blob::Create(Environment* env,
std::shared_ptr<DataQueue> data_queue) {
HandleScope scope(env->isolate());
Local<Function> ctor;
@@ -175,10 +173,9 @@ void Blob::New(const FunctionCallbackInfo<Value>& args) {
return;
}
const auto entryFromArrayBuffer = [env](
v8::Local<v8::ArrayBuffer> buf,
size_t byte_length,
size_t byte_offset = 0) {
const auto entryFromArrayBuffer = [env](v8::Local<v8::ArrayBuffer> buf,
size_t byte_length,
size_t byte_offset = 0) {
if (buf->IsDetachable()) {
std::shared_ptr<BackingStore> store = buf->GetBackingStore();
USE(buf->Detach(Local<Value>()));
@@ -210,9 +207,7 @@ void Blob::New(const FunctionCallbackInfo<Value>& args) {
} else if (entry->IsArrayBufferView()) {
Local<ArrayBufferView> view = entry.As<ArrayBufferView>();
entries[i] = entryFromArrayBuffer(
view->Buffer(),
view->ByteLength(),
view->ByteOffset());
view->Buffer(), view->ByteLength(), view->ByteOffset());
} else if (Blob::HasInstance(env, entry)) {
Blob* blob;
ASSIGN_OR_RETURN_UNWRAP(&blob, entry);
@@ -251,33 +246,31 @@ void Blob::ToSlice(const FunctionCallbackInfo<Value>& args) {
}
void Blob::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("data_queue_", data_queue_);
tracker->TrackField("data_queue_", data_queue_, "std::shared_ptr<DataQueue>");
}
BaseObjectPtr<Blob> Blob::Slice(Environment* env, size_t start, size_t end) {
return Create(env, this->data_queue_->slice(start, v8::Just(end)));
return Create(env,
this->data_queue_->slice(start, static_cast<uint64_t>(end)));
}
Blob::Blob(
Environment* env,
v8::Local<v8::Object> obj,
std::shared_ptr<DataQueue> data_queue)
: BaseObject(env, obj),
data_queue_(data_queue) {
Blob::Blob(Environment* env,
v8::Local<v8::Object> obj,
std::shared_ptr<DataQueue> data_queue)
: BaseObject(env, obj), data_queue_(data_queue) {
MakeWeak();
}
Blob::Reader::Reader(
Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr)
Blob::Reader::Reader(Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_BLOBREADER),
inner_(strong_ptr->data_queue_->getReader()),
inner_(strong_ptr->data_queue_->get_reader()),
strong_ptr_(std::move(strong_ptr)) {
MakeWeak();
}
bool Blob::Reader::HasInstance(Environment *env, v8::Local<v8::Value> value) {
bool Blob::Reader::HasInstance(Environment* env, v8::Local<v8::Value> value) {
return GetConstructorTemplate(env)->HasInstance(value);
}
@@ -289,27 +282,27 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
tmpl->InstanceTemplate()->SetInternalFieldCount(
BaseObject::kInternalFieldCount);
tmpl->Inherit(BaseObject::GetConstructorTemplate(env));
tmpl->SetClassName(
FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
env->set_blob_reader_constructor_template(tmpl);
}
return tmpl;
}
BaseObjectPtr<Blob::Reader> Blob::Reader::Create(
Environment* env,
BaseObjectPtr<Blob> blob) {
BaseObjectPtr<Blob::Reader> Blob::Reader::Create(Environment* env,
BaseObjectPtr<Blob> blob) {
Local<Object> obj;
if (!GetConstructorTemplate(env)->InstanceTemplate()
->NewInstance(env->context()).ToLocal(&obj)) {
if (!GetConstructorTemplate(env)
->InstanceTemplate()
->NewInstance(env->context())
.ToLocal(&obj)) {
return BaseObjectPtr<Blob::Reader>();
}
return MakeBaseObject<Blob::Reader>(env, obj, std::move(blob));
}
void Blob::Reader::Pull(const FunctionCallbackInfo<Value> &args) {
void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Blob::Reader* reader;
ASSIGN_OR_RETURN_UNWRAP(&reader, args.Holder());
@@ -329,16 +322,18 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value> &args) {
Global<Function> callback;
Environment* env;
};
// TODO(@jasnell): A unique_ptr is likely better here but making this a unique
// pointer that is passed into the lambda causes the std::move(next) below to
// complain about std::function needing to be copy-constructible.
Impl* impl = new Impl();
impl->reader = BaseObjectPtr<Blob::Reader>(reader);
impl->callback.Reset(env->isolate(), fn);
impl->env = env;
auto next = [impl](
int status,
const DataQueue::Vec* vecs,
size_t count,
bob::Done doneCb) mutable {
auto next = [impl](int status,
const DataQueue::Vec* vecs,
size_t count,
bob::Done doneCb) mutable {
auto dropMe = std::unique_ptr<Impl>(impl);
Environment* env = impl->env;
HandleScope handleScope(env->isolate());
@@ -360,17 +355,15 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value> &args) {
}
// Since we copied the data buffers, signal that we're done with them.
std::move(doneCb)(0);
Local<Value> argv[2] = {
Uint32::New(env->isolate(), status),
ArrayBuffer::New(env->isolate(), store)
};
Local<Value> argv[2] = {Uint32::New(env->isolate(), status),
ArrayBuffer::New(env->isolate(), store)};
impl->reader->MakeCallback(fn, arraysize(argv), argv);
return;
}
Local<Value> argv[2] = {
Int32::New(env->isolate(), status),
Undefined(env->isolate()),
Int32::New(env->isolate(), status),
Undefined(env->isolate()),
};
impl->reader->MakeCallback(fn, arraysize(argv), argv);
};
@@ -473,7 +466,7 @@ void Blob::GetDataObject(const v8::FunctionCallbackInfo<v8::Value>& args) {
void BlobBindingData::StoredDataObject::MemoryInfo(
MemoryTracker* tracker) const {
tracker->TrackField("blob", blob);
tracker->TrackField("blob", blob, "BaseObjectPtr<Blob>");
}
BlobBindingData::StoredDataObject::StoredDataObject(
@@ -490,7 +483,9 @@ BlobBindingData::BlobBindingData(Environment* env, Local<Object> wrap)
}
void BlobBindingData::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("data_objects_", data_objects_);
tracker->TrackField("data_objects_",
data_objects_,
"std::unordered_map<std::string, StoredDataObject>");
}
void BlobBindingData::store_data_object(
@@ -551,7 +546,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::RevokeDataObject);
registry->Register(Blob::Reader::Pull);
registry->Register(Concat);
registry->Register(BlobFromFileHandle);
registry->Register(BlobFromFilePath);
}
} // namespace node

View File

@@ -42,7 +42,8 @@ class Blob : public BaseObject {
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<Blob> Create(Environment* env, std::shared_ptr<DataQueue> data_queue);
static BaseObjectPtr<Blob> Create(Environment* env,
std::shared_ptr<DataQueue> data_queue);
static bool HasInstance(Environment* env, v8::Local<v8::Value> object);
@@ -52,12 +53,12 @@ class Blob : public BaseObject {
BaseObjectPtr<Blob> Slice(Environment* env, size_t start, size_t end);
inline size_t length() const { return this->data_queue_->size().ToChecked(); }
inline size_t length() const { return this->data_queue_->size().value(); }
class BlobTransferData : public worker::TransferData {
public:
explicit BlobTransferData(std::shared_ptr<DataQueue> data_queue)
: data_queue(data_queue) {}
: data_queue(data_queue) {}
BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
@@ -69,30 +70,28 @@ class Blob : public BaseObject {
SET_NO_MEMORY_INFO()
private:
std::shared_ptr<DataQueue> data_queue;
std::shared_ptr<DataQueue> data_queue;
};
class Reader final : public AsyncWrap {
public:
public:
static bool HasInstance(Environment* env, v8::Local<v8::Value> value);
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(
Environment* env);
static BaseObjectPtr<Reader> Create(
Environment* env,
BaseObjectPtr<Blob> blob);
static BaseObjectPtr<Reader> Create(Environment* env,
BaseObjectPtr<Blob> blob);
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
explicit Reader(
Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr);
explicit Reader(Environment* env,
v8::Local<v8::Object> obj,
BaseObjectPtr<Blob> strong_ptr);
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(Blob::Reader)
SET_SELF_SIZE(Reader)
private:
std::unique_ptr<DataQueue::Reader> inner_;
private:
std::shared_ptr<DataQueue::Reader> inner_;
BaseObjectPtr<Blob> strong_ptr_;
bool eos_ = false;
};
@@ -100,10 +99,9 @@ class Blob : public BaseObject {
BaseObject::TransferMode GetTransferMode() const override;
std::unique_ptr<worker::TransferData> CloneForMessaging() const override;
Blob(
Environment* env,
v8::Local<v8::Object> obj,
std::shared_ptr<DataQueue> data_queue);
Blob(Environment* env,
v8::Local<v8::Object> obj,
std::shared_ptr<DataQueue> data_queue);
DataQueue& getDataQueue() const { return *data_queue_; }

View File

@@ -25,8 +25,7 @@ int SourceImpl<T>::Pull(
status = DoPull(std::move(next), options, data, count, max_count_hint);
if (status == bob::Status::STATUS_END)
eos_ = true;
if (status == bob::Status::STATUS_EOS) eos_ = true;
return status;
}

View File

@@ -10,17 +10,8 @@ constexpr size_t kMaxCountHint = 16;
// Negative status codes indicate error conditions.
enum Status : int {
// Indicates that there was an error while pulling.
// Should be treated similar to STATUS_EOS
STATUS_FAILED = -2,
// Indicates that an attempt was made to pull after end.
STATUS_EOS = -1,
// Indicates the end of the stream. No additional
// data will be available and the consumer should stop
// pulling.
STATUS_END = 0,
STATUS_EOS = 0,
// Indicates that there is additional data available
// and the consumer may continue to pull.

View File

@@ -242,14 +242,20 @@ FileHandle::FileHandle(BindingData* binding_data,
}
FileHandle* FileHandle::New(BindingData* binding_data,
int fd, Local<Object> obj) {
int fd,
Local<Object> obj,
std::optional<int64_t> maybeOffset,
std::optional<int64_t> maybeLength) {
Environment* env = binding_data->env();
if (obj.IsEmpty() && !env->fd_constructor_template()
->NewInstance(env->context())
.ToLocal(&obj)) {
return nullptr;
}
return new FileHandle(binding_data, obj, fd);
auto handle = new FileHandle(binding_data, obj, fd);
if (maybeOffset.has_value()) handle->read_offset_ = maybeOffset.value();
if (maybeLength.has_value()) handle->read_length_ = maybeLength.value();
return handle;
}
void FileHandle::New(const FunctionCallbackInfo<Value>& args) {
@@ -258,13 +264,18 @@ void FileHandle::New(const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
CHECK(args[0]->IsInt32());
FileHandle* handle =
FileHandle::New(binding_data, args[0].As<Int32>()->Value(), args.This());
if (handle == nullptr) return;
std::optional<int64_t> maybeOffset = std::nullopt;
std::optional<int64_t> maybeLength = std::nullopt;
if (args[1]->IsNumber())
handle->read_offset_ = args[1]->IntegerValue(env->context()).FromJust();
maybeOffset = args[1]->IntegerValue(env->context()).FromJust();
if (args[2]->IsNumber())
handle->read_length_ = args[2]->IntegerValue(env->context()).FromJust();
maybeLength = args[2]->IntegerValue(env->context()).FromJust();
FileHandle::New(binding_data,
args[0].As<Int32>()->Value(),
args.This(),
maybeOffset,
maybeLength);
}
FileHandle::~FileHandle() {
@@ -503,10 +514,15 @@ void FileHandle::Close(const FunctionCallbackInfo<Value>& args) {
void FileHandle::ReleaseFD(const FunctionCallbackInfo<Value>& args) {
FileHandle* fd;
ASSIGN_OR_RETURN_UNWRAP(&fd, args.Holder());
// Just act as if this FileHandle has been closed.
fd->AfterClose();
fd->Release();
}
int FileHandle::Release() {
int fd = GetFD();
// Just pretend that Close was called and we're all done.
AfterClose();
return fd;
}
void FileHandle::AfterClose() {
closing_ = false;

View File

@@ -3,6 +3,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include <optional>
#include "aliased_buffer.h"
#include "node_messaging.h"
#include "node_snapshotable.h"
@@ -302,13 +303,17 @@ class FileHandle final : public AsyncWrap, public StreamBase {
static FileHandle* New(BindingData* binding_data,
int fd,
v8::Local<v8::Object> obj = v8::Local<v8::Object>());
v8::Local<v8::Object> obj = v8::Local<v8::Object>(),
std::optional<int64_t> maybeOffset = std::nullopt,
std::optional<int64_t> maybeLength = std::nullopt);
~FileHandle() override;
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
int GetFD() override { return fd_; }
int Release();
// Will asynchronously close the FD and return a Promise that will
// be resolved once closing is complete.
static void Close(const v8::FunctionCallbackInfo<v8::Value>& args);

View File

@@ -1,8 +1,7 @@
#include "dataqueue/queue.h"
#include "node_bob-inl.h"
#include "node_bob.h"
#include "util-inl.h"
#include "gtest/gtest.h"
#include <dataqueue/queue.h>
#include <gtest/gtest.h>
#include <node_bob-inl.h>
#include <util-inl.h>
#include <v8.h>
#include <memory>
#include <vector>
@@ -10,68 +9,70 @@
using node::DataQueue;
using v8::ArrayBuffer;
using v8::BackingStore;
using v8::Just;
TEST(DataQueue, InMemoryEntry) {
char buffer[] = "hello world";
size_t len = strlen(buffer);
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(
&buffer, len, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store = ArrayBuffer::NewBackingStore(
&buffer, len, [](void*, size_t, void*) {}, nullptr);
// We can create an InMemoryEntry from a v8::BackingStore.
std::unique_ptr<DataQueue::Entry> entry =
DataQueue::CreateInMemoryEntryFromBackingStore(store, 0, len);
// The entry is idempotent.
CHECK(entry->isIdempotent());
CHECK(entry->is_idempotent());
// The size is known.
CHECK_EQ(entry->size().ToChecked(), len);
CHECK_EQ(entry->size().value(), len);
// We can slice it.
// slice: "llo world"
std::unique_ptr<DataQueue::Entry> slice1 = entry->slice(2);
// The slice is idempotent.
CHECK(slice1->isIdempotent());
CHECK(slice1->is_idempotent());
// The slice size is known.
CHECK_EQ(slice1->size().ToChecked(), len - 2);
CHECK_EQ(slice1->size().value(), len - 2);
// We can slice the slice with a length.
// slice: "o w"
std::unique_ptr<DataQueue::Entry> slice2 = slice1->slice(2, Just(5UL));
uint64_t end = 5;
std::unique_ptr<DataQueue::Entry> slice2 = slice1->slice(2, end);
// That slice is idempotent.
CHECK(slice2->isIdempotent());
CHECK(slice2->is_idempotent());
// That slice size is known.
CHECK_EQ(slice2->size().ToChecked(), 3);
CHECK_EQ(slice2->size().value(), 3);
// The slice end can extend beyond the actual size and will be adjusted.
// slice: "orld"
std::unique_ptr<DataQueue::Entry> slice3 = slice1->slice(5, Just(100UL));
end = 100;
std::unique_ptr<DataQueue::Entry> slice3 = slice1->slice(5, end);
CHECK_NOT_NULL(slice3);
// The slice size is known.
CHECK_EQ(slice3->size().ToChecked(), 4);
CHECK_EQ(slice3->size().value(), 4);
// If the slice start is greater than the length, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice4 = entry->slice(100);
CHECK_NOT_NULL(slice4);
CHECK_EQ(slice4->size().ToChecked(), 0);
CHECK_EQ(slice4->size().value(), 0);
// If the slice end is less than the start, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice5 = entry->slice(2, Just(1UL));
end = 1;
std::unique_ptr<DataQueue::Entry> slice5 = entry->slice(2, end);
CHECK_NOT_NULL(slice5);
CHECK_EQ(slice5->size().ToChecked(), 0);
CHECK_EQ(slice5->size().value(), 0);
// If the slice end equal to the start, we get a zero length slice.
std::unique_ptr<DataQueue::Entry> slice6 = entry->slice(2, Just(2UL));
end = 2;
std::unique_ptr<DataQueue::Entry> slice6 = entry->slice(2, end);
CHECK_NOT_NULL(slice6);
CHECK_EQ(slice6->size().ToChecked(), 0);
CHECK_EQ(slice6->size().value(), 0);
// The shared_ptr for the BackingStore should show only 5 uses because
// the zero-length slices do not maintain a reference to it.
@@ -86,57 +87,58 @@ TEST(DataQueue, IdempotentDataQueue) {
size_t len2 = strlen(buffer2);
size_t len3 = strlen(buffer3);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store1 = ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 = ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::vector<std::unique_ptr<DataQueue::Entry>> list;
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
list.push_back(
DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(
DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
// We can create an idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::CreateIdempotent(std::move(list));
std::shared_ptr<DataQueue> data_queue =
DataQueue::CreateIdempotent(std::move(list));
CHECK_NOT_NULL(data_queue);
// The data_queue is idempotent.
CHECK(data_queue->isIdempotent());
CHECK(data_queue->is_idempotent());
// The data_queue is capped.
CHECK(data_queue->isCapped());
CHECK(data_queue->is_capped());
// maybeCapRemaining() returns zero.
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
CHECK_EQ(data_queue->maybeCapRemaining().value(), 0);
// Calling cap() is a nonop but doesn't crash or error.
data_queue->cap();
data_queue->cap(100);
// maybeCapRemaining() still returns zero.
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
CHECK_EQ(data_queue->maybeCapRemaining().value(), 0);
// The size is known to be the sum of the in memory-entries.
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
CHECK_EQ(data_queue->size().value(), len1 + len2);
std::shared_ptr<BackingStore> store3 =
ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store3 = ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
// Trying to append a new entry does not crash, but returns v8::Nothing.
CHECK(data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store3, 0, len3))
.IsNothing());
// Trying to append a new entry does not crash, but returns std::nullopt.
CHECK(!data_queue
->append(DataQueue::CreateInMemoryEntryFromBackingStore(
store3, 0, len3))
.has_value());
// The size has not changed after the append.
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
CHECK_EQ(data_queue->size().value(), len1 + len2);
// We can acquire multiple readers from the data_queue.
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader1 = data_queue->get_reader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->get_reader();
CHECK_NOT_NULL(reader1);
CHECK_NOT_NULL(reader2);
@@ -149,13 +151,17 @@ TEST(DataQueue, IdempotentDataQueue) {
// The first read produces buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
@@ -164,29 +170,49 @@ TEST(DataQueue, IdempotentDataQueue) {
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
// The second read should have status CONTINUE but no buffer.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The third read produces EOS
// The third read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
@@ -202,13 +228,13 @@ TEST(DataQueue, IdempotentDataQueue) {
CHECK_NOT_NULL(slice1);
// The slice is idempotent.
CHECK(slice1->isIdempotent());
CHECK(slice1->is_idempotent());
// And capped.
CHECK(slice1->isCapped());
CHECK(slice1->is_capped());
// The size is two-bytes less than the original.
CHECK_EQ(slice1->size().ToChecked(), data_queue->size().ToChecked() - 2);
CHECK_EQ(slice1->size().value(), data_queue->size().value() - 2);
const auto testSlice = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
@@ -218,13 +244,17 @@ TEST(DataQueue, IdempotentDataQueue) {
// The first read produces a slice of buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 2);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 2, len1 - 2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 2);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 2, len1 - 2), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
@@ -233,43 +263,67 @@ TEST(DataQueue, IdempotentDataQueue) {
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read should have status CONTINUE but no buffer.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The third read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader3 = slice1->getReader();
std::shared_ptr<DataQueue::Reader> reader3 = slice1->get_reader();
testSlice(reader3);
// We can slice correctly across boundaries.
std::shared_ptr<DataQueue> slice2 = data_queue->slice(5, Just(20UL));
uint64_t end = 20;
std::shared_ptr<DataQueue> slice2 = data_queue->slice(5, end);
// The size is known.
CHECK_EQ(slice2->size().ToChecked(), 15);
CHECK_EQ(slice2->size().value(), 15);
const auto testSlice2 = [&](auto& reader) {
// We can read the expected data from reader. Because the entries are
@@ -279,14 +333,18 @@ TEST(DataQueue, IdempotentDataQueue) {
// The first read produces a slice of buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 5);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1 - 5);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
@@ -295,36 +353,58 @@ TEST(DataQueue, IdempotentDataQueue) {
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2 - 7);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2 - 7), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The third read produces EOS
// The next read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2 - 7);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2 - 7), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The next read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
};
// We can read the expected slice data.
std::unique_ptr<DataQueue::Reader> reader4 = slice2->getReader();
std::shared_ptr<DataQueue::Reader> reader4 = slice2->get_reader();
testSlice2(reader4);
}
@@ -336,56 +416,58 @@ TEST(DataQueue, NonIdempotentDataQueue) {
size_t len2 = strlen(buffer2);
size_t len3 = strlen(buffer3);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store1 = ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 = ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store3 =
ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store3 = ArrayBuffer::NewBackingStore(
&buffer3, len3, [](void*, size_t, void*) {}, nullptr);
// We can create an non-idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::Create();
CHECK(!data_queue->isIdempotent());
CHECK_EQ(data_queue->size().ToChecked(), 0);
CHECK(!data_queue->is_idempotent());
CHECK_EQ(data_queue->size().value(), 0);
data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
CHECK_EQ(data_queue->size().ToChecked(), len1);
data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
CHECK_EQ(data_queue->size().value(), len1);
data_queue->append(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
CHECK_EQ(data_queue->size().ToChecked(), len1 + len2);
data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
CHECK_EQ(data_queue->size().value(), len1 + len2);
CHECK(!data_queue->isCapped());
CHECK(data_queue->maybeCapRemaining().IsNothing());
CHECK(!data_queue->is_capped());
CHECK(!data_queue->maybeCapRemaining().has_value());
data_queue->cap(100);
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 100 - (len1 + len2));
CHECK(data_queue->is_capped());
CHECK_EQ(data_queue->maybeCapRemaining().value(), 100 - (len1 + len2));
data_queue->cap(101);
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 100 - (len1 + len2));
CHECK(data_queue->is_capped());
CHECK_EQ(data_queue->maybeCapRemaining().value(), 100 - (len1 + len2));
data_queue->cap();
CHECK(data_queue->isCapped());
CHECK_EQ(data_queue->maybeCapRemaining().ToChecked(), 0);
CHECK(data_queue->is_capped());
CHECK_EQ(data_queue->maybeCapRemaining().value(), 0);
// We can't add any more because the data queue is capped.
CHECK_EQ(data_queue->append(
DataQueue::CreateInMemoryEntryFromBackingStore(store3, 0, len3)).FromJust(), false);
CHECK_EQ(data_queue
->append(DataQueue::CreateInMemoryEntryFromBackingStore(
store3, 0, len3))
.value(),
false);
// We cannot slice a non-idempotent data queue
std::shared_ptr<DataQueue> slice1 = data_queue->slice(2);
CHECK_NULL(slice1);
// We can acquire only a single reader for a non-idempotent data queue
std::unique_ptr<DataQueue::Reader> reader1 = data_queue->getReader();
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader1 = data_queue->get_reader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->get_reader();
CHECK_NOT_NULL(reader1);
CHECK_NULL(reader2);
@@ -398,13 +480,17 @@ TEST(DataQueue, NonIdempotentDataQueue) {
// The first read produces buffer1
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
@@ -413,29 +499,51 @@ TEST(DataQueue, NonIdempotentDataQueue) {
// InMemoryEntry instances, reads will be fully synchronous here.
waitingForPull = true;
// The second read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_END);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The third read produces EOS
// The next read produces buffer2, and should be the end.
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
}, node::bob::OPTIONS_SYNC, nullptr, 0, node::bob::kMaxCountHint);
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
CHECK_EQ(count, 1);
CHECK_EQ(vecs[0].len, len2);
CHECK_EQ(memcmp(vecs[0].base, buffer2, len2), 0);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
// The next read produces EOS
status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
waitingForPull = false;
CHECK_EQ(status, node::bob::STATUS_EOS);
CHECK_EQ(count, 0);
CHECK_NULL(vecs);
std::move(done)(0);
},
node::bob::OPTIONS_SYNC,
nullptr,
0,
node::bob::kMaxCountHint);
CHECK(!waitingForPull);
CHECK_EQ(status, node::bob::STATUS_EOS);
@@ -445,7 +553,7 @@ TEST(DataQueue, NonIdempotentDataQueue) {
testRead(reader1);
// We still cannot acquire another reader.
std::unique_ptr<DataQueue::Reader> reader3 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader3 = data_queue->get_reader();
CHECK_NULL(reader3);
CHECK_NOT_NULL(data_queue);
@@ -457,20 +565,21 @@ TEST(DataQueue, DataQueueEntry) {
size_t len1 = strlen(buffer1);
size_t len2 = strlen(buffer2);
std::shared_ptr<BackingStore> store1 =
ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store1 = ArrayBuffer::NewBackingStore(
&buffer1, len1, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 =
ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::shared_ptr<BackingStore> store2 = ArrayBuffer::NewBackingStore(
&buffer2, len2, [](void*, size_t, void*) {}, nullptr);
std::vector<std::unique_ptr<DataQueue::Entry>> list;
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
list.push_back(
DataQueue::CreateInMemoryEntryFromBackingStore(store1, 0, len1));
list.push_back(
DataQueue::CreateInMemoryEntryFromBackingStore(store2, 0, len2));
// We can create an idempotent DataQueue from a list of entries.
std::shared_ptr<DataQueue> data_queue = DataQueue::CreateIdempotent(std::move(list));
std::shared_ptr<DataQueue> data_queue =
DataQueue::CreateIdempotent(std::move(list));
CHECK_NOT_NULL(data_queue);
@@ -479,37 +588,41 @@ TEST(DataQueue, DataQueueEntry) {
DataQueue::CreateDataQueueEntry(data_queue);
// The entry should be idempotent since the data queue is idempotent.
CHECK(entry->isIdempotent());
CHECK(entry->is_idempotent());
// The entry size should match the data queue size.
CHECK_EQ(entry->size().ToChecked(), data_queue->size().ToChecked());
CHECK_EQ(entry->size().value(), data_queue->size().value());
// We can slice it since it is idempotent.
std::unique_ptr<DataQueue::Entry> slice = entry->slice(5, Just(20UL));
uint64_t end = 20;
std::unique_ptr<DataQueue::Entry> slice = entry->slice(5, end);
// The slice has the expected length.
CHECK_EQ(slice->size().ToChecked(), 15);
CHECK_EQ(slice->size().value(), 15);
// We can add it to another data queue, even if the new one is not
// idempotent.
std::shared_ptr<DataQueue> data_queue2 = DataQueue::Create();
CHECK(data_queue2->append(std::move(slice)).IsJust());
CHECK(data_queue2->append(std::move(slice)).value());
// Our original data queue should have a use count of 2.
CHECK_EQ(data_queue.use_count(), 2);
std::unique_ptr<DataQueue::Reader> reader = data_queue2->getReader();
std::shared_ptr<DataQueue::Reader> reader = data_queue2->get_reader();
bool pullIsPending = true;
int status = reader->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
}, node::bob::OPTIONS_SYNC, nullptr, 0);
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1 + 5, len1 - 5), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
},
node::bob::OPTIONS_SYNC,
nullptr,
0);
// All of the actual entries are in-memory entries so reads should be sync.
CHECK(!pullIsPending);
@@ -517,26 +630,29 @@ TEST(DataQueue, DataQueueEntry) {
// Read to completion...
while (status != node::bob::STATUS_EOS) {
status = reader->Pull([&](auto, auto, auto, auto) {},
node::bob::OPTIONS_SYNC, nullptr, 0);
status = reader->Pull(
[&](auto, auto, auto, auto) {}, node::bob::OPTIONS_SYNC, nullptr, 0);
}
// Because the original data queue is idempotent, we can still read from it,
// even though we have already consumed the non-idempotent data queue that
// contained it.
std::unique_ptr<DataQueue::Reader> reader2 = data_queue->getReader();
std::shared_ptr<DataQueue::Reader> reader2 = data_queue->get_reader();
CHECK_NOT_NULL(reader2);
pullIsPending = true;
status = reader2->Pull(
[&](int status, const DataQueue::Vec* vecs, size_t count, auto done) {
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
}, node::bob::OPTIONS_SYNC, nullptr, 0);
pullIsPending = false;
CHECK_EQ(count, 1);
CHECK_EQ(memcmp(vecs[0].base, buffer1, len1), 0);
CHECK_EQ(status, node::bob::STATUS_CONTINUE);
},
node::bob::OPTIONS_SYNC,
nullptr,
0);
// All of the actual entries are in-memory entries so reads should be sync.
CHECK(!pullIsPending);

View File

@@ -1,25 +1,34 @@
'use strict';
const common = require('../common');
const { strictEqual } = require('assert');
const { open } = require('fs/promises');
const { TextEncoder } = require('util');
const fs = require('fs');
const {
strictEqual,
rejects,
} = require('assert');
const { TextDecoder } = require('util');
const {
writeFileSync,
openAsBlob,
} = require('fs');
const {
unlink
} = require('fs/promises');
const path = require('path');
const { Blob } = require('buffer');
const tmpdir = require('../common/tmpdir');
const testfile = path.join(tmpdir.path, 'test.txt');
const testfile = path.join(tmpdir.path, 'test-file-backed-blob.txt');
const testfile2 = path.join(tmpdir.path, 'test-file-backed-blob2.txt');
tmpdir.refresh();
const data = `${'a'.repeat(1000)}${'b'.repeat(2000)}`;
fs.writeFileSync(testfile, data);
writeFileSync(testfile, data);
writeFileSync(testfile2, data.repeat(100));
(async () => {
const fh = await open(testfile);
const blob = fh.blob();
const blob = await openAsBlob(testfile);
const ab = await blob.arrayBuffer();
const dec = new TextDecoder();
@@ -27,21 +36,46 @@ fs.writeFileSync(testfile, data);
strictEqual(dec.decode(new Uint8Array(ab)), data);
strictEqual(await blob.text(), data);
// Can be read multiple times
let stream = blob.stream();
let check = '';
for await (const chunk of stream)
check = dec.decode(chunk);
strictEqual(check, data);
// Can be combined with other Blob's and read
const combined = new Blob(['hello', blob, 'world']);
const ab2 = await combined.arrayBuffer();
strictEqual(dec.decode(ab2.slice(0, 5)), 'hello');
strictEqual(dec.decode(ab2.slice(5, -5)), data);
strictEqual(dec.decode(ab2.slice(-5)), 'world');
// If the file is modified tho, the stream errors.
fs.writeFileSync(testfile, data + 'abc');
writeFileSync(testfile, data + 'abc');
stream = blob.stream();
try {
for await (const chunk of stream) {}
} catch (err) {
strictEqual(err.message, 'read EINVAL');
strictEqual(err.code, 'EINVAL');
}
const read = async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of stream) {}
};
await rejects(read(), { name: 'NotReadableError' });
await unlink(testfile);
})().then(common.mustCall());
(async () => {
const blob = await openAsBlob(testfile2);
const stream = blob.stream();
const read = async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of stream) {
writeFileSync(testfile2, data + 'abc');
}
};
await rejects(read(), { name: 'NotReadableError' });
await unlink(testfile2);
})().then(common.mustCall());

View File

@@ -9,83 +9,86 @@ const common = require('../common');
const assert = require('assert');
const expectedModules = new Set([
'Internal Binding async_wrap',
'Internal Binding blob',
'Internal Binding buffer',
'Internal Binding builtins',
'Internal Binding config',
'Internal Binding constants',
'Internal Binding contextify',
'Internal Binding credentials',
'Internal Binding errors',
'Internal Binding fs',
'Internal Binding messaging',
'Internal Binding mksnapshot',
'Internal Binding module_wrap',
'Internal Binding options',
'Internal Binding performance',
'Internal Binding process_methods',
'Internal Binding string_decoder',
'Internal Binding symbols',
'Internal Binding task_queue',
'Internal Binding timers',
'Internal Binding trace_events',
'Internal Binding types',
'Internal Binding url',
'Internal Binding util',
'Internal Binding wasm_web_api',
'Internal Binding worker',
'NativeModule async_hooks',
'NativeModule buffer',
'NativeModule events',
'NativeModule fs',
'NativeModule internal/assert',
'NativeModule internal/errors',
'Internal Binding config',
'Internal Binding timers',
'Internal Binding async_wrap',
'Internal Binding task_queue',
'Internal Binding symbols',
'NativeModule internal/async_hooks',
'Internal Binding constants',
'Internal Binding types',
'NativeModule internal/util',
'NativeModule internal/util/types',
'NativeModule internal/validators',
'NativeModule internal/linkedlist',
'NativeModule internal/priority_queue',
'NativeModule internal/assert',
'NativeModule internal/util/inspect',
'NativeModule internal/util/debuglog',
'NativeModule internal/timers',
'NativeModule events',
'Internal Binding buffer',
'Internal Binding string_decoder',
'NativeModule internal/buffer',
'NativeModule buffer',
'Internal Binding messaging',
'NativeModule internal/worker/js_transferable',
'Internal Binding process_methods',
'NativeModule internal/process/per_thread',
'Internal Binding credentials',
'NativeModule internal/process/promises',
'NativeModule internal/fixed_queue',
'NativeModule async_hooks',
'NativeModule internal/process/task_queues',
'NativeModule timers',
'Internal Binding trace_events',
'NativeModule internal/constants',
'NativeModule path',
'NativeModule internal/process/execution',
'NativeModule internal/process/warning',
'NativeModule internal/console/constructor',
'NativeModule internal/console/global',
'NativeModule internal/constants',
'NativeModule internal/dns/utils',
'NativeModule internal/errors',
'NativeModule internal/querystring',
'NativeModule querystring',
'Internal Binding url',
'Internal Binding blob',
'NativeModule internal/url',
'NativeModule util',
'Internal Binding performance',
'NativeModule internal/perf/utils',
'NativeModule internal/event_target',
'NativeModule internal/fixed_queue',
'Internal Binding mksnapshot',
'NativeModule internal/v8/startup_snapshot',
'NativeModule internal/process/signal',
'Internal Binding fs',
'NativeModule internal/encoding',
'NativeModule internal/webstreams/util',
'NativeModule internal/webstreams/queuingstrategies',
'NativeModule internal/blob',
'NativeModule internal/fs/utils',
'NativeModule fs',
'NativeModule internal/idna',
'NativeModule internal/linkedlist',
'NativeModule internal/modules/cjs/loader',
'NativeModule internal/modules/esm/utils',
'Internal Binding options',
'NativeModule internal/options',
'NativeModule internal/source_map/source_map_cache',
'Internal Binding contextify',
'NativeModule internal/vm',
'NativeModule internal/modules/helpers',
'NativeModule internal/modules/package_json_reader',
'Internal Binding module_wrap',
'NativeModule internal/modules/cjs/loader',
'NativeModule internal/vm/module',
'NativeModule internal/modules/esm/utils',
'Internal Binding wasm_web_api',
'Internal Binding worker',
'NativeModule internal/modules/run_main',
'NativeModule internal/net',
'NativeModule internal/options',
'NativeModule internal/perf/utils',
'NativeModule internal/priority_queue',
'NativeModule internal/process/execution',
'NativeModule internal/process/per_thread',
'NativeModule internal/dns/utils',
'NativeModule internal/process/pre_execution',
'NativeModule internal/process/promises',
'NativeModule internal/process/signal',
'NativeModule internal/process/task_queues',
'NativeModule internal/process/warning',
'NativeModule internal/querystring',
'NativeModule internal/source_map/source_map_cache',
'NativeModule internal/timers',
'NativeModule internal/url',
'NativeModule internal/util',
'NativeModule internal/util/debuglog',
'NativeModule internal/util/inspect',
'NativeModule internal/util/types',
'NativeModule internal/v8/startup_snapshot',
'NativeModule internal/validators',
'NativeModule internal/vm',
'NativeModule internal/vm/module',
'NativeModule internal/worker/js_transferable',
'NativeModule path',
'NativeModule querystring',
'NativeModule timers',
'NativeModule url',
'NativeModule util',
]);
if (!common.isMainThread) {
@@ -127,6 +130,7 @@ if (common.isWindows) {
if (common.hasIntl) {
expectedModules.add('Internal Binding icu');
expectedModules.add('NativeModule url');
} else {
expectedModules.add('NativeModule url');
}

View File

@@ -5,20 +5,18 @@ const cp = require('child_process');
const fs = require('fs');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const { scheduler } = require('timers/promises');
if (!common.hasCrypto)
common.skip('missing crypto');
const { hkdf } = require('crypto');
const { deflate } = require('zlib');
const { Blob } = require('buffer');
if (process.env.isChild === '1') {
hkdf('sha512', 'key', 'salt', 'info', 64, () => {});
deflate('hello', () => {});
// Make async call
const blob = new Blob(['h'.repeat(4096 * 2)]);
blob.arrayBuffer();
scheduler.wait(10);
return;
}
@@ -46,7 +44,6 @@ const traces = JSON.parse(data.toString()).traceEvents;
assert(traces.length > 0);
let blobCount = 0;
let zlibCount = 0;
let cryptoCount = 0;
@@ -55,9 +52,7 @@ traces.forEach((item) => {
'node,node.threadpoolwork,node.threadpoolwork.sync',
'node,node.threadpoolwork,node.threadpoolwork.async',
].includes(item.cat)) {
if (item.name === 'blob') {
blobCount++;
} else if (item.name === 'zlib') {
if (item.name === 'zlib') {
zlibCount++;
} else if (item.name === 'crypto') {
cryptoCount++;
@@ -65,7 +60,6 @@ traces.forEach((item) => {
}
});
// There are three types, each type has two async events and sync events at least
assert.ok(blobCount >= 4);
// There are two types, each type has two async events and sync events at least
assert.ok(zlibCount >= 4);
assert.ok(cryptoCount >= 4);