From 58011f690bb80e8d10deee1c1a9cb796bcd0305c Mon Sep 17 00:00:00 2001 From: Mert Can Altin Date: Mon, 5 Jan 2026 00:56:35 +0300 Subject: [PATCH 1/4] stream: add kStreamBase marker for internal pipe optimization --- benchmark/webstreams/internal-pipe.js | 106 ++++++++++++++++ lib/internal/webstreams/adapters.js | 16 ++- lib/internal/webstreams/readablestream.js | 33 +++++ lib/internal/webstreams/util.js | 2 + .../test-whatwg-webstreams-internal-pipe.js | 115 ++++++++++++++++++ 5 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 benchmark/webstreams/internal-pipe.js create mode 100644 test/parallel/test-whatwg-webstreams-internal-pipe.js diff --git a/benchmark/webstreams/internal-pipe.js b/benchmark/webstreams/internal-pipe.js new file mode 100644 index 00000000000000..f81566a931fe16 --- /dev/null +++ b/benchmark/webstreams/internal-pipe.js @@ -0,0 +1,106 @@ +'use strict'; +const common = require('../common.js'); +const fsp = require('fs/promises'); +const path = require('path'); +const os = require('os'); +const { pipeline } = require('stream/promises'); +const { + ReadableStream, + WritableStream, +} = require('node:stream/web'); + +const bench = common.createBenchmark(main, { + type: [ + 'node-streams', + 'webstream-js', + 'webstream-file-read', + ], + size: [1024, 16384, 65536], + n: [1e4, 1e5], +}); + +async function main({ type, size, n }) { + const chunk = Buffer.alloc(size, 'x'); + const totalBytes = size * n; + + switch (type) { + case 'node-streams': { + // Baseline: Node.js streams + let received = 0; + const readable = new (require('stream').Readable)({ + read() { + for (let i = 0; i < 100 && received < n; i++) { + this.push(chunk); + received++; + } + if (received >= n) this.push(null); + }, + }); + + const writable = new (require('stream').Writable)({ + write(data, enc, cb) { cb(); }, + }); + + bench.start(); + await pipeline(readable, writable); + bench.end(totalBytes); + break; + } + + case 'webstream-js': { + // Web streams with pure JS source/sink + let sent = 0; + const rs = new ReadableStream({ + pull(controller) { + if (sent++ < n) { + controller.enqueue(chunk); + } else { + controller.close(); + } + }, + }); + + const ws = new WritableStream({ + write() {}, + close() { bench.end(totalBytes); }, + }); + + bench.start(); + await rs.pipeTo(ws); + break; + } + + case 'webstream-file-read': { + // Create a temporary file with test data + const tmpDir = os.tmpdir(); + const tmpFile = path.join(tmpDir, `bench-webstream-${process.pid}.tmp`); + + // Write test data to file + const fd = await fsp.open(tmpFile, 'w'); + for (let i = 0; i < n; i++) { + await fd.write(chunk); + } + await fd.close(); + + // Read using readableWebStream + const readFd = await fsp.open(tmpFile, 'r'); + const rs = readFd.readableWebStream({ type: 'bytes' }); + + const ws = new WritableStream({ + write() {}, + close() { + bench.end(totalBytes); + // Cleanup + readFd.close().then(() => fsp.unlink(tmpFile)); + }, + }); + + bench.start(); + await rs.pipeTo(ws); + break; + } + + default: + throw new Error(`Unknown type: ${type}`); + } +} diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 7650831da837f7..ae965e83cefa15 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -35,6 +35,10 @@ const { ByteLengthQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); +const { + kStreamBase, +} = require('internal/webstreams/util'); + const { Writable, Readable, @@ -946,7 +950,7 @@ function newWritableStreamFromStreamBase(streamBase, strategy) { return promise.promise; } - return new WritableStream({ + const stream = new WritableStream({ write(chunk, controller) { current = current !== undefined ? PromisePrototypeThen( @@ -967,6 +971,10 @@ function newWritableStreamFromStreamBase(streamBase, strategy) { return promise.promise; }, }, strategy); + + stream[kStreamBase] = streamBase; + + return stream; } /** @@ -1017,7 +1025,7 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO } }; - return new ReadableStream({ + const stream = new ReadableStream({ start(c) { controller = c; }, pull() { @@ -1040,6 +1048,10 @@ function newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyO return promise.promise; }, }, strategy); + + stream[kStreamBase] = streamBase; + + return stream; } module.exports = { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index f9b9e6b4fb2c3e..31932da170c831 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -45,6 +45,10 @@ const { DOMException, } = internalBinding('messaging'); +const { + StreamPipe, +} = internalBinding('stream_pipe'); + const { isArrayBufferView, isDataView, @@ -114,6 +118,7 @@ const { iteratorNext, kType, kState, + kStreamBase, } = require('internal/webstreams/util'); const { @@ -1369,6 +1374,34 @@ function readableStreamPipeTo( preventCancel, signal) { + const sourceStreamBase = source[kStreamBase]; + const destStreamBase = dest[kStreamBase]; + + if (sourceStreamBase !== undefined && + destStreamBase !== undefined && + signal === undefined && + !preventClose && + !preventAbort && + !preventCancel) { + // Use native piping + const promise = PromiseWithResolvers(); + + source[kState].disturbed = true; + + try { + const pipe = new StreamPipe(sourceStreamBase, destStreamBase); + pipe.onunpipe = () => { + promise.resolve(); + }; + pipe.start(); + } catch (error) { + return PromiseReject(error); + } + + return promise.promise; + } + + // Use JS-based piping let reader; let writer; let disposable; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 5bf016f73b7af5..9417bfd2cfe464 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -49,6 +49,7 @@ const { const kState = Symbol('kState'); const kType = Symbol('kType'); +const kStreamBase = Symbol('kStreamBase'); const AsyncIterator = { __proto__: AsyncIteratorPrototype, @@ -296,4 +297,5 @@ module.exports = { iteratorNext, kType, kState, + kStreamBase, }; diff --git a/test/parallel/test-whatwg-webstreams-internal-pipe.js b/test/parallel/test-whatwg-webstreams-internal-pipe.js new file mode 100644 index 00000000000000..628abc2e77d2b9 --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-internal-pipe.js @@ -0,0 +1,115 @@ +// Flags: --expose-internals --no-warnings +'use strict'; + +// Tests for the internal StreamBase pipe optimization infrastructure +// described in nodejs/performance#134 +// +// Note(mertcanaltin): Full fast-path testing requires real StreamBase implementations +// (like HTTP/2 streams or TCP sockets), not JSStream mocks. +// These tests verify the marker attachment and fallback behavior. + +const common = require('../common'); + +const assert = require('assert'); + +const { + internalBinding, +} = require('internal/test/binding'); + +const { + newWritableStreamFromStreamBase, + newReadableStreamFromStreamBase, +} = require('internal/webstreams/adapters'); + +const { + kStreamBase, +} = require('internal/webstreams/util'); + +const { + JSStream, +} = internalBinding('js_stream'); + +// Test 1: kStreamBase marker is attached to ReadableStream +{ + const stream = new JSStream(); + const readable = newReadableStreamFromStreamBase(stream); + + assert.strictEqual(readable[kStreamBase], stream); + + // Cleanup + stream.emitEOF(); +} + +// Test 2: kStreamBase marker is attached to WritableStream +{ + const stream = new JSStream(); + stream.onwrite = common.mustNotCall(); + stream.onshutdown = (req) => req.oncomplete(); + + const writable = newWritableStreamFromStreamBase(stream); + + assert.strictEqual(writable[kStreamBase], stream); + + // Cleanup + writable.close(); +} + +// Test 3: Regular JS streams don't have kStreamBase +{ + const { ReadableStream, WritableStream } = require('stream/web'); + + const rs = new ReadableStream({ + pull(controller) { + controller.enqueue('chunk'); + controller.close(); + }, + }); + + const ws = new WritableStream({ + write() {}, + }); + + assert.strictEqual(rs[kStreamBase], undefined); + assert.strictEqual(ws[kStreamBase], undefined); + + // Pipe should still work (standard path) + rs.pipeTo(ws).then(common.mustCall()); +} + +// Test 4: Mixed streams (one internal, one JS) use standard path +{ + const stream = new JSStream(); + stream.onshutdown = (req) => req.oncomplete(); + const readable = newReadableStreamFromStreamBase(stream); + + const { WritableStream } = require('stream/web'); + const chunks = []; + const ws = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + }); + + // Readable has kStreamBase, ws does not - should use standard path + assert.ok(readable[kStreamBase]); + assert.strictEqual(ws[kStreamBase], undefined); + + const pipePromise = readable.pipeTo(ws); + + stream.readBuffer(Buffer.from('hello')); + stream.emitEOF(); + + pipePromise.then(common.mustCall(() => { + assert.strictEqual(chunks.length, 1); + })); +} + +// Test 5: Verify kStreamBase is the correct symbol from util +{ + const { + kStreamBase: kStreamBase2, + } = require('internal/webstreams/util'); + + // Should be the same symbol + assert.strictEqual(kStreamBase, kStreamBase2); +} From 2828147db8ab06b0d2b5a87bee95e0990ede38a3 Mon Sep 17 00:00:00 2001 From: Mert Can Altin Date: Mon, 5 Jan 2026 00:58:45 +0300 Subject: [PATCH 2/4] update comments --- test/parallel/test-whatwg-webstreams-internal-pipe.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/parallel/test-whatwg-webstreams-internal-pipe.js b/test/parallel/test-whatwg-webstreams-internal-pipe.js index 628abc2e77d2b9..6267d3caace7eb 100644 --- a/test/parallel/test-whatwg-webstreams-internal-pipe.js +++ b/test/parallel/test-whatwg-webstreams-internal-pipe.js @@ -29,7 +29,7 @@ const { JSStream, } = internalBinding('js_stream'); -// Test 1: kStreamBase marker is attached to ReadableStream +// kStreamBase marker is attached to ReadableStream { const stream = new JSStream(); const readable = newReadableStreamFromStreamBase(stream); @@ -40,7 +40,7 @@ const { stream.emitEOF(); } -// Test 2: kStreamBase marker is attached to WritableStream +// kStreamBase marker is attached to WritableStream { const stream = new JSStream(); stream.onwrite = common.mustNotCall(); @@ -54,7 +54,7 @@ const { writable.close(); } -// Test 3: Regular JS streams don't have kStreamBase +// Regular JS streams don't have kStreamBase { const { ReadableStream, WritableStream } = require('stream/web'); @@ -76,7 +76,7 @@ const { rs.pipeTo(ws).then(common.mustCall()); } -// Test 4: Mixed streams (one internal, one JS) use standard path +// Mixed streams (one internal, one JS) use standard path { const stream = new JSStream(); stream.onshutdown = (req) => req.oncomplete(); @@ -104,7 +104,7 @@ const { })); } -// Test 5: Verify kStreamBase is the correct symbol from util +// Verify kStreamBase is the correct symbol from util { const { kStreamBase: kStreamBase2, From 7fdc21f8d77897bbb66f292f7cd2df55099c4844 Mon Sep 17 00:00:00 2001 From: Mert Can Altin Date: Tue, 6 Jan 2026 22:17:32 +0300 Subject: [PATCH 3/4] stream: optimize internal pipe handling with optional chaining --- lib/internal/webstreams/readablestream.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 31932da170c831..47c0e9830f70b3 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1374,8 +1374,8 @@ function readableStreamPipeTo( preventCancel, signal) { - const sourceStreamBase = source[kStreamBase]; - const destStreamBase = dest[kStreamBase]; + const sourceStreamBase = source?.[kStreamBase]; + const destStreamBase = dest?.[kStreamBase]; if (sourceStreamBase !== undefined && destStreamBase !== undefined && @@ -1395,7 +1395,7 @@ function readableStreamPipeTo( }; pipe.start(); } catch (error) { - return PromiseReject(error); + promise.reject(error); } return promise.promise; From 0ef5be97f77457a415eaffd5570d0688be0de210 Mon Sep 17 00:00:00 2001 From: Mert Can Altin Date: Tue, 6 Jan 2026 23:09:32 +0300 Subject: [PATCH 4/4] stream: enhance internal pipe handling with additional error management and tests --- lib/internal/webstreams/readablestream.js | 62 ++++++++++++- .../test-whatwg-webstreams-internal-pipe.js | 88 ++++++++++--------- 2 files changed, 108 insertions(+), 42 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 47c0e9830f70b3..5a31b594a87bb8 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -31,6 +31,7 @@ const { const { AbortError, + ErrnoException, codes: { ERR_ILLEGAL_CONSTRUCTOR, ERR_INVALID_ARG_TYPE, @@ -49,6 +50,13 @@ const { StreamPipe, } = internalBinding('stream_pipe'); +const { + kReadBytesOrError, + streamBaseState, +} = internalBinding('stream_wrap'); + +const { UV_EOF } = internalBinding('uv'); + const { isArrayBufferView, isDataView, @@ -130,6 +138,7 @@ const { isWritableStreamDefaultWriter, writableStreamAbort, + writableStreamClose, writableStreamCloseQueuedOrInFlight, writableStreamDefaultWriterCloseWithErrorPropagation, writableStreamDefaultWriterRelease, @@ -1383,18 +1392,67 @@ function readableStreamPipeTo( !preventClose && !preventAbort && !preventCancel) { - // Use native piping + // Native C++ StreamPipe path for internal-to-internal piping. + // Ref: https://github.com/nodejs/performance/issues/134 const promise = PromiseWithResolvers(); source[kState].disturbed = true; + let pipeError = null; + let isComplete = false; + const originalSourceOnread = sourceStreamBase.onread; + + sourceStreamBase.onread = (arrayBuffer) => { + const nread = streamBaseState[kReadBytesOrError]; + if (nread < 0 && nread !== UV_EOF) { + pipeError = new ErrnoException(nread, 'read'); + } + if (originalSourceOnread) { + return originalSourceOnread(arrayBuffer); + } + }; + + function finalize(error) { + if (isComplete) return; + isComplete = true; + sourceStreamBase.onread = originalSourceOnread; + + if (error) { + if (source[kState].state === 'readable') { + readableStreamError(source, error); + } + if (dest[kState].state === 'writable') { + writableStreamAbort(dest, error); + } + promise.reject(error); + } else { + if (source[kState].state === 'readable') { + readableStreamClose(source); + } + if (dest[kState].state === 'writable' && + !writableStreamCloseQueuedOrInFlight(dest)) { + PromisePrototypeThen( + writableStreamClose(dest), + () => promise.resolve(), + (err) => promise.reject(err), + ); + } else { + promise.resolve(); + } + } + } + try { const pipe = new StreamPipe(sourceStreamBase, destStreamBase); pipe.onunpipe = () => { - promise.resolve(); + if (pipeError) { + finalize(pipeError); + } }; + pipe.oncomplete = () => finalize(pipeError); pipe.start(); } catch (error) { + sourceStreamBase.onread = originalSourceOnread; promise.reject(error); } diff --git a/test/parallel/test-whatwg-webstreams-internal-pipe.js b/test/parallel/test-whatwg-webstreams-internal-pipe.js index 6267d3caace7eb..d115474c4ea1f8 100644 --- a/test/parallel/test-whatwg-webstreams-internal-pipe.js +++ b/test/parallel/test-whatwg-webstreams-internal-pipe.js @@ -1,42 +1,25 @@ // Flags: --expose-internals --no-warnings 'use strict'; -// Tests for the internal StreamBase pipe optimization infrastructure -// described in nodejs/performance#134 -// -// Note(mertcanaltin): Full fast-path testing requires real StreamBase implementations -// (like HTTP/2 streams or TCP sockets), not JSStream mocks. -// These tests verify the marker attachment and fallback behavior. +// Tests for internal StreamBase pipe optimization (nodejs/performance#134) const common = require('../common'); - const assert = require('assert'); - -const { - internalBinding, -} = require('internal/test/binding'); +const { internalBinding } = require('internal/test/binding'); const { newWritableStreamFromStreamBase, newReadableStreamFromStreamBase, } = require('internal/webstreams/adapters'); -const { - kStreamBase, -} = require('internal/webstreams/util'); - -const { - JSStream, -} = internalBinding('js_stream'); +const { kStreamBase } = require('internal/webstreams/util'); +const { JSStream } = internalBinding('js_stream'); // kStreamBase marker is attached to ReadableStream { const stream = new JSStream(); const readable = newReadableStreamFromStreamBase(stream); - assert.strictEqual(readable[kStreamBase], stream); - - // Cleanup stream.emitEOF(); } @@ -47,10 +30,7 @@ const { stream.onshutdown = (req) => req.oncomplete(); const writable = newWritableStreamFromStreamBase(stream); - assert.strictEqual(writable[kStreamBase], stream); - - // Cleanup writable.close(); } @@ -65,18 +45,15 @@ const { }, }); - const ws = new WritableStream({ - write() {}, - }); + const ws = new WritableStream({ write() {} }); assert.strictEqual(rs[kStreamBase], undefined); assert.strictEqual(ws[kStreamBase], undefined); - // Pipe should still work (standard path) rs.pipeTo(ws).then(common.mustCall()); } -// Mixed streams (one internal, one JS) use standard path +// Mixed streams use standard JS path { const stream = new JSStream(); stream.onshutdown = (req) => req.oncomplete(); @@ -85,17 +62,13 @@ const { const { WritableStream } = require('stream/web'); const chunks = []; const ws = new WritableStream({ - write(chunk) { - chunks.push(chunk); - }, + write(chunk) { chunks.push(chunk); }, }); - // Readable has kStreamBase, ws does not - should use standard path assert.ok(readable[kStreamBase]); assert.strictEqual(ws[kStreamBase], undefined); const pipePromise = readable.pipeTo(ws); - stream.readBuffer(Buffer.from('hello')); stream.emitEOF(); @@ -104,12 +77,47 @@ const { })); } -// Verify kStreamBase is the correct symbol from util +// Verify kStreamBase symbol identity { - const { - kStreamBase: kStreamBase2, - } = require('internal/webstreams/util'); - - // Should be the same symbol + const { kStreamBase: kStreamBase2 } = require('internal/webstreams/util'); assert.strictEqual(kStreamBase, kStreamBase2); } + +// FileHandle.readableWebStream() uses async reads, not StreamBase +{ + const fs = require('fs/promises'); + const path = require('path'); + const os = require('os'); + + async function testFileStreamPipe() { + const tmpDir = os.tmpdir(); + const testFile = path.join(tmpDir, `test-webstream-pipe-${process.pid}.txt`); + const testData = 'Hello, WebStreams pipe!'; + + await fs.writeFile(testFile, testData); + + try { + const fileHandle = await fs.open(testFile, 'r'); + const readable = fileHandle.readableWebStream(); + + assert.strictEqual(readable[kStreamBase], undefined); + + const chunks = []; + const writable = new (require('stream/web').WritableStream)({ + write(chunk) { chunks.push(chunk); }, + }); + + await readable.pipeTo(writable); + await fileHandle.close(); + + const result = Buffer.concat(chunks.map((c) => + (c instanceof Uint8Array ? Buffer.from(c) : Buffer.from(c)), + )).toString(); + assert.strictEqual(result, testData); + } finally { + await fs.unlink(testFile).catch(() => {}); + } + } + + testFileStreamPipe().then(common.mustCall()); +}