@@ -1691,7 +1691,8 @@ added: v17.4.0
1691
1691
1692
1692
> Stability: 1 - Experimental
1693
1693
1694
- * ` fn ` {Function|AsyncFunction} a function to map over every item in the stream.
1694
+ * ` fn ` {Function|AsyncFunction} a function to map over every chunk in the
1695
+ stream.
1695
1696
* ` data ` {any} a chunk of data from the stream.
1696
1697
* ` options ` {Object}
1697
1698
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1704,16 +1705,16 @@ added: v17.4.0
1704
1705
* Returns: {Readable} a stream mapped with the function ` fn ` .
1705
1706
1706
1707
This method allows mapping over the stream. The ` fn ` function will be called
1707
- for every item in the stream. If the ` fn ` function returns a promise - that
1708
+ for every chunk in the stream. If the ` fn ` function returns a promise - that
1708
1709
promise will be ` await ` ed before being passed to the result stream.
1709
1710
1710
1711
``` mjs
1711
1712
import { Readable } from ' stream' ;
1712
1713
import { Resolver } from ' dns/promises' ;
1713
1714
1714
1715
// With a synchronous mapper.
1715
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).map ((x ) => x * 2 )) {
1716
- console .log (item ); // 2, 4, 6, 8
1716
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).map ((x ) => x * 2 )) {
1717
+ console .log (chunk ); // 2, 4, 6, 8
1717
1718
}
1718
1719
// With an asynchronous mapper, making at most 2 queries at a time.
1719
1720
const resolver = new Resolver ();
@@ -1735,7 +1736,7 @@ added: v17.4.0
1735
1736
1736
1737
> Stability: 1 - Experimental
1737
1738
1738
- * ` fn ` {Function|AsyncFunction} a function to filter items from stream.
1739
+ * ` fn ` {Function|AsyncFunction} a function to filter chunks from the stream.
1739
1740
* ` data ` {any} a chunk of data from the stream.
1740
1741
* ` options ` {Object}
1741
1742
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1747,8 +1748,8 @@ added: v17.4.0
1747
1748
aborted.
1748
1749
* Returns: {Readable} a stream filtered with the predicate ` fn ` .
1749
1750
1750
- This method allows filtering the stream. For each item in the stream the ` fn `
1751
- function will be called and if it returns a truthy value, the item will be
1751
+ This method allows filtering the stream. For each chunk in the stream the ` fn `
1752
+ function will be called and if it returns a truthy value, the chunk will be
1752
1753
passed to the result stream. If the ` fn ` function returns a promise - that
1753
1754
promise will be ` await ` ed.
1754
1755
@@ -1757,8 +1758,8 @@ import { Readable } from 'stream';
1757
1758
import { Resolver } from ' dns/promises' ;
1758
1759
1759
1760
// With a synchronous predicate.
1760
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1761
- console .log (item ); // 3, 4
1761
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1762
+ console .log (chunk ); // 3, 4
1762
1763
}
1763
1764
// With an asynchronous predicate, making at most 2 queries at a time.
1764
1765
const resolver = new Resolver ();
@@ -1784,7 +1785,7 @@ added: REPLACEME
1784
1785
1785
1786
> Stability: 1 - Experimental
1786
1787
1787
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1788
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1788
1789
* ` data ` {any} a chunk of data from the stream.
1789
1790
* ` options ` {Object}
1790
1791
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1796,12 +1797,12 @@ added: REPLACEME
1796
1797
aborted.
1797
1798
* Returns: {Promise} a promise for when the stream has finished.
1798
1799
1799
- This method allows iterating a stream. For each item in the stream the
1800
+ This method allows iterating a stream. For each chunk in the stream the
1800
1801
` fn ` function will be called. If the ` fn ` function returns a promise - that
1801
1802
promise will be ` await ` ed.
1802
1803
1803
1804
This method is different from ` for await...of ` loops in that it can optionally
1804
- process items concurrently. In addition, a ` forEach ` iteration can only be
1805
+ process chunks concurrently. In addition, a ` forEach ` iteration can only be
1805
1806
stopped by having passed a ` signal ` option and aborting the related
1806
1807
` AbortController ` while ` for await...of ` can be stopped with ` break ` or
1807
1808
` return ` . In either case the stream will be destroyed.
@@ -1815,8 +1816,8 @@ import { Readable } from 'stream';
1815
1816
import { Resolver } from ' dns/promises' ;
1816
1817
1817
1818
// With a synchronous predicate.
1818
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1819
- console .log (item ); // 3, 4
1819
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1820
+ console .log (chunk ); // 3, 4
1820
1821
}
1821
1822
// With an asynchronous predicate, making at most 2 queries at a time.
1822
1823
const resolver = new Resolver ();
@@ -1881,7 +1882,7 @@ added: REPLACEME
1881
1882
1882
1883
> Stability: 1 - Experimental
1883
1884
1884
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1885
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1885
1886
* ` data ` {any} a chunk of data from the stream.
1886
1887
* ` options ` {Object}
1887
1888
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1922,6 +1923,56 @@ console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
1922
1923
console .log (' done' ); // Stream has finished
1923
1924
```
1924
1925
1926
+ ### ` readable.find(fn[, options]) `
1927
+
1928
+ <!-- YAML
1929
+ added: REPLACEME
1930
+ -->
1931
+
1932
+ > Stability: 1 - Experimental
1933
+
1934
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1935
+ * ` data ` {any} a chunk of data from the stream.
1936
+ * ` options ` {Object}
1937
+ * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
1938
+ abort the ` fn ` call early.
1939
+ * ` options ` {Object}
1940
+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
1941
+ on the stream at once. ** Default:** ` 1 ` .
1942
+ * ` signal ` {AbortSignal} allows destroying the stream if the signal is
1943
+ aborted.
1944
+ * Returns: {Promise} a promise evaluating to the first chunk for which ` fn `
1945
+ evaluated with a truthy value, or ` undefined ` if no element was found.
1946
+
1947
+ This method is similar to ` Array.prototype.find ` and calls ` fn ` on each chunk
1948
+ in the stream to find a chunk with a truthy value for ` fn ` . Once an ` fn ` call's
1949
+ awaited return value is truthy, the stream is destroyed and the promise is
1950
+ fulfilled with value for which ` fn ` returned a truthy value. If all of the
1951
+ ` fn ` calls on the chunks return a falsy value, the promise is fulfilled with
1952
+ ` undefined ` .
1953
+
1954
+ ``` mjs
1955
+ import { Readable } from ' stream' ;
1956
+ import { stat } from ' fs/promises' ;
1957
+
1958
+ // With a synchronous predicate.
1959
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 2 ); // 3
1960
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 0 ); // 1
1961
+ await Readable .from ([1 , 2 , 3 , 4 ]).find ((x ) => x > 10 ); // undefined
1962
+
1963
+ // With an asynchronous predicate, making at most 2 file checks at a time.
1964
+ const foundBigFile = await Readable .from ([
1965
+ ' file1' ,
1966
+ ' file2' ,
1967
+ ' file3' ,
1968
+ ]).find (async (fileName ) => {
1969
+ const stats = await stat (fileName);
1970
+ return stat .size > 1024 * 1024 ;
1971
+ }, { concurrency: 2 });
1972
+ console .log (foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
1973
+ console .log (' done' ); // Stream has finished
1974
+ ```
1975
+
1925
1976
### ` readable.every(fn[, options]) `
1926
1977
1927
1978
<!-- YAML
@@ -1930,7 +1981,7 @@ added: REPLACEME
1930
1981
1931
1982
> Stability: 1 - Experimental
1932
1983
1933
- * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1984
+ * ` fn ` {Function|AsyncFunction} a function to call on each chunk of the stream.
1934
1985
* ` data ` {any} a chunk of data from the stream.
1935
1986
* ` options ` {Object}
1936
1987
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -1980,7 +2031,7 @@ added: REPLACEME
1980
2031
> Stability: 1 - Experimental
1981
2032
1982
2033
* ` fn ` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
1983
- every item in the stream.
2034
+ every chunk in the stream.
1984
2035
* ` data ` {any} a chunk of data from the stream.
1985
2036
* ` options ` {Object}
1986
2037
* ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
@@ -2004,8 +2055,8 @@ import { Readable } from 'stream';
2004
2055
import { createReadStream } from ' fs' ;
2005
2056
2006
2057
// With a synchronous mapper.
2007
- for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).flatMap ((x ) => [x, x])) {
2008
- console .log (item ); // 1, 1, 2, 2, 3, 3, 4, 4
2058
+ for await (const chunk of Readable .from ([1 , 2 , 3 , 4 ]).flatMap ((x ) => [x, x])) {
2059
+ console .log (chunk ); // 1, 1, 2, 2, 3, 3, 4, 4
2009
2060
}
2010
2061
// With an asynchronous mapper, combine the contents of 4 files
2011
2062
const concatResult = Readable .from ([
0 commit comments