Skip to content

Commit e0117e7

Browse files
committed
streams: add whatwg stream interop
1 parent d2e32a1 commit e0117e7

File tree

6 files changed

+151
-0
lines changed

6 files changed

+151
-0
lines changed

doc/api/stream.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,23 @@ reader.pipe(writer);
341341
reader.unpipe(writer);
342342
```
343343

344+
##### writable.acquireStandardStream()
345+
<!-- YAML
346+
added: REPLACEME
347+
-->
348+
349+
> Stability: 1 - Experimental
350+
351+
* Returns: {WritableStream} to feed this stream.
352+
353+
```js
354+
const fs = require('fs');
355+
356+
const stream = fs.createWriteStream('file').acquireStandardStream();
357+
const writer = stream.getWriter();
358+
writer.write('hi!');
359+
```
360+
344361
##### writable.cork()
345362
<!-- YAML
346363
added: v0.11.2
@@ -827,6 +844,22 @@ If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
827844
takes precedence in controlling the flow, i.e. `'data'` will be emitted
828845
only when [`stream.read()`][stream-read] is called.
829846

847+
##### readable.acquireStandardStream()
848+
<!-- YAML
849+
added: REPLACEME
850+
-->
851+
852+
> Stability: 1 - Experimental
853+
854+
* Returns: {ReadableStream} to fully consume the stream.
855+
856+
```js
857+
const fs = require('fs');
858+
859+
const stream = fs.createReadStream('file').acquireStandardStream();
860+
const reader = stream.getReader();
861+
```
862+
830863
##### readable.destroy([error])
831864
<!-- YAML
832865
added: v8.0.0
@@ -1266,6 +1299,26 @@ Examples of `Duplex` streams include:
12661299
* [zlib streams][zlib]
12671300
* [crypto streams][crypto]
12681301

1302+
##### duplex.acquireStandardStream
1303+
<!-- YAML
1304+
added: REPLACEME
1305+
-->
1306+
1307+
> Stability: 1 - Experimental
1308+
1309+
* Returns {Object}
1310+
* `readable` {ReadableStream}
1311+
* `writable` {WritableStream}
1312+
1313+
Creates a WHATWG stream pair to represent this duplex stream.
1314+
1315+
```js
1316+
const stream = getDuplexSomehow();
1317+
const { readable, writable } = stream.acquireStandardStream();
1318+
readable.getReader();
1319+
writable.getWriter();
1320+
```
1321+
12691322
#### Class: stream.Transform
12701323
<!-- YAML
12711324
added: v0.9.4
@@ -2200,6 +2253,15 @@ by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted
22002253
after all data has been output, which occurs after the callback in
22012254
[`transform._flush()`][stream-_flush] has been called.
22022255

2256+
##### transform.acquireStandardStream()
2257+
<!-- YAML
2258+
added: REPLACEME
2259+
-->
2260+
2261+
> Stability: 1 - Experimental
2262+
2263+
* Returns: {TransformStream}
2264+
22032265
#### transform.\_flush(callback)
22042266

22052267
* `callback` {Function} A callback function (optionally with an error

lib/_stream_duplex.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ module.exports = Duplex;
3131
const util = require('util');
3232
const Readable = require('_stream_readable');
3333
const Writable = require('_stream_writable');
34+
const { emitExperimentalWarning } = require('internal/util');
3435

3536
util.inherits(Duplex, Readable);
3637

@@ -66,6 +67,13 @@ function Duplex(options) {
6667
}
6768
}
6869

70+
Duplex.prototype.acquireStandardStream = function() {
71+
emitExperimentalWarning('Duplex.acquireStandardStream');
72+
const readable = Readable.prototype.acquireStandardStream.call(this);
73+
const writable = Writable.prototype.acquireStandardStream.call(this);
74+
return { readable, writable };
75+
};
76+
6977
Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
7078
// making it explicit this property is not enumerable
7179
// because otherwise some prototype manipulation in

lib/_stream_readable.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,29 @@ Readable.prototype[Symbol.asyncIterator] = function() {
988988
return new ReadableAsyncIterator(this);
989989
};
990990

991+
Readable.prototype.acquireStandardStream = function() {
992+
emitExperimentalWarning('Readable.acquireStandardStream');
993+
return new ReadableStream({
994+
start: (controller) => {
995+
this.pause();
996+
this.on('data', (chunk) => {
997+
controller.enqueue(chunk);
998+
this.pause();
999+
});
1000+
this.once('end', () => controller.close());
1001+
this.once('error', (e) => controller.error(e));
1002+
},
1003+
pull: () => {
1004+
this.resume();
1005+
},
1006+
cancel: () => {
1007+
this.destroy();
1008+
},
1009+
}, {
1010+
highWaterMark: this.readableHighWaterMark,
1011+
});
1012+
};
1013+
9911014
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
9921015
// making it explicit this property is not enumerable
9931016
// because otherwise some prototype manipulation in

lib/_stream_transform.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ const {
7272
} = require('internal/errors').codes;
7373
const Duplex = require('_stream_duplex');
7474
const util = require('util');
75+
const { emitExperimentalWarning } = require('internal/util');
7576
util.inherits(Transform, Duplex);
7677

7778

@@ -202,6 +203,32 @@ Transform.prototype._destroy = function(err, cb) {
202203
});
203204
};
204205

206+
Transform.prototype.acquireStandardStream = function() {
207+
emitExperimentalWarning('Transform.acquireStandardStream');
208+
return new TransformStream({
209+
start: (controller) => {
210+
this.on('data', (chunk) => {
211+
controller.enqueue(chunk);
212+
});
213+
this.once('end', () => controller.close());
214+
this.once('error', (e) => controller.error(e));
215+
},
216+
transform: (chunk) => {
217+
return new Promise((resolve) => {
218+
const underHighWaterMark = this.write(chunk);
219+
if (!underHighWaterMark) {
220+
this.once('drain', resolve);
221+
} else {
222+
resolve();
223+
}
224+
});
225+
},
226+
flush: () => {
227+
this.end();
228+
},
229+
});
230+
};
231+
205232

206233
function done(stream, er, data) {
207234
if (er)

lib/_stream_writable.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,33 @@ Writable.prototype.end = function(chunk, encoding, cb) {
591591
return this;
592592
};
593593

594+
Writable.prototype.acquireStandardStream = function() {
595+
internalUtil.emitExperimentalWarning('Writable.acquireStandardStream');
596+
return new WritableStream({
597+
start: (controller) => {
598+
this.once('error', (e) => controller.error(e));
599+
},
600+
write: (chunk) => {
601+
return new Promise((resolve) => {
602+
const underHighWaterMark = this.write(chunk);
603+
if (!underHighWaterMark) {
604+
this.once('drain', resolve);
605+
} else {
606+
resolve();
607+
}
608+
});
609+
},
610+
close: (controller) => {
611+
this.end();
612+
},
613+
abort: (reason) => {
614+
this.destroy(reason);
615+
},
616+
}, {
617+
highWaterMark: this.writableHighWaterMark,
618+
});
619+
};
620+
594621
Object.defineProperty(Writable.prototype, 'writableLength', {
595622
// making it explicit this property is not enumerable
596623
// because otherwise some prototype manipulation in

tools/doc/type-parser.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ const customTypesMap = {
101101

102102
'readline.Interface': 'readline.html#readline_class_interface',
103103

104+
'ReadableStream': 'https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream',
105+
'WritableStream': 'https://developer.mozilla.org/en-US/docs/Web/API/WritableStream',
106+
'TransformStream': 'https://streams.spec.whatwg.org/#ts-class',
107+
104108
'Stream': 'stream.html#stream_stream',
105109
'stream.Duplex': 'stream.html#stream_class_stream_duplex',
106110
'stream.Readable': 'stream.html#stream_class_stream_readable',

0 commit comments

Comments
 (0)