Skip to content

Commit 1213de8

Browse files
uasanwdavidw
andauthored
feat(csv-parse): implement TransformStream (#445)
* feat(csv-parse): implement TransformStream * feat(csv-parse): add test api.web_stream and fix controller.terminate * feat(csv-parse): added errors handle to TransformStream * test(csv-parse): error thrown by webstream * build(csv-parse): exclude web_stream ts test from ci in node 14 * fix(csv-parse): satisfy test and re-habilitate node stream import --------- Co-authored-by: David Worms <[email protected]>
1 parent 6ecf900 commit 1213de8

File tree

5 files changed

+110
-27
lines changed

5 files changed

+110
-27
lines changed
+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
import { Options } from './index.js';
3+
4+
declare function parse(options?: Options): TransformStream;
5+
// export default parse;
6+
export { parse };
7+
8+
export {
9+
CastingContext, CastingFunction, CastingDateFunction,
10+
ColumnOption, Options, Info, CsvErrorCode, CsvError
11+
} from './index.js';

packages/csv-parse/lib/stream.d.ts

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
import { Options } from './index.js';
3+
4+
declare function parse(options?: Options): TransformStream;
5+
// export default parse;
6+
export { parse };
7+
8+
export {
9+
CastingContext, CastingFunction, CastingDateFunction,
10+
ColumnOption, Options, Info, CsvErrorCode, CsvError
11+
} from './index.js';

packages/csv-parse/lib/stream.js

+32-26
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,40 @@
1-
import { TransformStream } from "node:stream/web";
1+
import { TransformStream, CountQueuingStrategy } from "node:stream/web";
22
import { transform } from "./api/index.js";
33

44
const parse = (opts) => {
55
const api = transform(opts);
6-
return new TransformStream({
7-
async transform(chunk, controller) {
8-
api.parse(
9-
chunk,
10-
false,
11-
(record) => {
12-
controller.enqueue(record);
13-
},
14-
() => {
15-
controller.close();
16-
},
17-
);
18-
},
19-
async flush(controller) {
20-
api.parse(
21-
undefined,
22-
true,
23-
(record) => {
24-
controller.enqueue(record);
25-
},
26-
() => {
27-
controller.close();
28-
},
29-
);
6+
let controller;
7+
const enqueue = (record) => {
8+
controller.enqueue(record);
9+
};
10+
const terminate = () => {
11+
controller.terminate();
12+
};
13+
return new TransformStream(
14+
{
15+
start(ctr) {
16+
controller = ctr;
17+
},
18+
transform(chunk) {
19+
const error = api.parse(chunk, false, enqueue, terminate);
20+
21+
if (error) {
22+
controller.error(error);
23+
throw error;
24+
}
25+
},
26+
flush() {
27+
const error = api.parse(undefined, true, enqueue, terminate);
28+
29+
if (error) {
30+
controller.error(error);
31+
throw error;
32+
}
33+
},
3034
},
31-
});
35+
new CountQueuingStrategy({ highWaterMark: 1024 }),
36+
new CountQueuingStrategy({ highWaterMark: 1024 }),
37+
);
3238
};
3339

3440
export { parse };

packages/csv-parse/package.json

+11-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@
5252
"default": "./dist/cjs/sync.cjs"
5353
}
5454
},
55+
"./stream": {
56+
"import": {
57+
"types": "./lib/stream.d.ts",
58+
"default": "./lib/stream.js"
59+
},
60+
"require": {
61+
"types": "./dist/cjs/stream.d.cts",
62+
"default": "./dist/cjs/stream.cjs"
63+
}
64+
},
5565
"./browser/esm": {
5666
"types": "./dist/esm/index.d.ts",
5767
"default": "./dist/esm/index.js"
@@ -123,7 +133,7 @@
123133
"preversion": "npm run build && git add dist",
124134
"pretest": "npm run build",
125135
"test": "mocha 'test/**/*.{coffee,ts}'",
126-
"test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'"
136+
"test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.web_stream.ts --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'"
127137
},
128138
"type": "module",
129139
"types": "dist/esm/index.d.ts",
+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import 'should'
2+
import {parse as parseStream} from '../lib/stream.js'
3+
import { CsvError } from '../lib/index.js'
4+
5+
describe('API Web Stream', () => {
6+
7+
describe('stream/web/TransformStream', () => {
8+
9+
it('simple parse', async () => {
10+
const stream = parseStream();
11+
const writer = stream.writable.getWriter();
12+
const reader = stream.readable.getReader();
13+
await writer.write(Buffer.from("A,B,C\nD,E,F"));
14+
await writer.close();
15+
await reader.read().should.finally.eql({
16+
done: false,
17+
value: ['A', 'B', 'C'],
18+
});
19+
await reader.read().should.finally.eql({
20+
done: false,
21+
value: ['D', 'E', 'F'],
22+
});
23+
await reader.read().should.finally.eql({
24+
done: true,
25+
value: undefined,
26+
});
27+
})
28+
29+
it("cat error parse", async function () {
30+
const stream = parseStream();
31+
const writer = stream.writable.getWriter();
32+
try {
33+
await writer.write(Buffer.from("A,B,C\nD,E"));
34+
await writer.close();
35+
throw Error("Shall not be called");
36+
} catch (err) {
37+
if (!(err instanceof CsvError)) {
38+
throw Error("Invalid error type");
39+
}
40+
err.code.should.eql("CSV_RECORD_INCONSISTENT_FIELDS_LENGTH");
41+
}
42+
});
43+
44+
})
45+
})

0 commit comments

Comments
 (0)