@@ -36,6 +36,7 @@ function from(Readable, iterable, opts) {
3636 throw new ERR_INVALID_ARG_TYPE ( 'iterable' , [ 'Iterable' ] , iterable ) ;
3737 }
3838
39+
3940 const readable = new Readable ( {
4041 objectMode : true ,
4142 highWaterMark : 1 ,
@@ -46,11 +47,19 @@ function from(Readable, iterable, opts) {
4647 // Flag to protect against _read
4748 // being called before last iteration completion.
4849 let reading = false ;
50+ let isAsyncValues = false ;
4951
5052 readable . _read = function ( ) {
5153 if ( ! reading ) {
5254 reading = true ;
53- next ( ) ;
55+
56+ if ( isAsync ) {
57+ nextAsync ( ) ;
58+ } else if ( isAsyncValues ) {
59+ nextSyncWithAsyncValues ( ) ;
60+ } else {
61+ nextSyncWithSyncValues ( ) ;
62+ }
5463 }
5564 } ;
5665
@@ -78,29 +87,115 @@ function from(Readable, iterable, opts) {
7887 }
7988 }
8089
81- async function next ( ) {
90+ // There are a lot of duplication here, it's done on purpose for performance
91+ // reasons - avoid await when not needed.
92+
93+ function nextSyncWithSyncValues ( ) {
94+ for ( ; ; ) {
95+ try {
96+ const { value, done } = iterator . next ( ) ;
97+
98+ if ( done ) {
99+ readable . push ( null ) ;
100+ return ;
101+ }
102+
103+ if ( value &&
104+ typeof value . then === 'function' ) {
105+ return changeToAsyncValues ( value ) ;
106+ }
107+
108+ if ( value === null ) {
109+ reading = false ;
110+ throw new ERR_STREAM_NULL_VALUES ( ) ;
111+ }
112+
113+ if ( readable . push ( value ) ) {
114+ continue ;
115+ }
116+
117+ reading = false ;
118+ } catch ( err ) {
119+ readable . destroy ( err ) ;
120+ }
121+ break ;
122+ }
123+ }
124+
125+ async function changeToAsyncValues ( value ) {
126+ isAsyncValues = true ;
127+
128+ try {
129+ const res = await value ;
130+
131+ if ( res === null ) {
132+ reading = false ;
133+ throw new ERR_STREAM_NULL_VALUES ( ) ;
134+ }
135+
136+ if ( readable . push ( res ) ) {
137+ nextSyncWithAsyncValues ( ) ;
138+ return ;
139+ }
140+
141+ reading = false ;
142+ } catch ( err ) {
143+ readable . destroy ( err ) ;
144+ }
145+ }
146+
147+ async function nextSyncWithAsyncValues ( ) {
82148 for ( ; ; ) {
83149 try {
84- const { value, done } = isAsync ?
85- await iterator . next ( ) :
86- iterator . next ( ) ;
150+ const { value, done } = iterator . next ( ) ;
87151
88152 if ( done ) {
89153 readable . push ( null ) ;
90- } else {
91- const res = ( value &&
92- typeof value . then === 'function' ) ?
93- await value :
94- value ;
95- if ( res === null ) {
96- reading = false ;
97- throw new ERR_STREAM_NULL_VALUES ( ) ;
98- } else if ( readable . push ( res ) ) {
99- continue ;
100- } else {
101- reading = false ;
102- }
154+ return ;
155+ }
156+
157+ const res = ( value &&
158+ typeof value . then === 'function' ) ?
159+ await value :
160+ value ;
161+
162+ if ( res === null ) {
163+ reading = false ;
164+ throw new ERR_STREAM_NULL_VALUES ( ) ;
103165 }
166+
167+ if ( readable . push ( res ) ) {
168+ continue ;
169+ }
170+
171+ reading = false ;
172+ } catch ( err ) {
173+ readable . destroy ( err ) ;
174+ }
175+ break ;
176+ }
177+ }
178+
179+ async function nextAsync ( ) {
180+ for ( ; ; ) {
181+ try {
182+ const { value, done } = await iterator . next ( ) ;
183+
184+ if ( done ) {
185+ readable . push ( null ) ;
186+ return ;
187+ }
188+
189+ if ( value === null ) {
190+ reading = false ;
191+ throw new ERR_STREAM_NULL_VALUES ( ) ;
192+ }
193+
194+ if ( readable . push ( value ) ) {
195+ continue ;
196+ }
197+
198+ reading = false ;
104199 } catch ( err ) {
105200 readable . destroy ( err ) ;
106201 }
0 commit comments