Skip to content

Commit 9fced9a

Browse files
committed
Add solutions for 6th chapter
1 parent b8c2f2e commit 9fced9a

File tree

12 files changed

+389
-1
lines changed

12 files changed

+389
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
6.1 Data compression efficiency: Write a command-line script that takes a
3+
file as input and compresses it using the different algorithms available in the
4+
zlib module (Brotli, Deflate, Gzip). You want to produce a summary table
5+
that compares the algorithm's compression time and compression efficiency
6+
on the given file. Hint: This could be a good use case for the fork pattern, but
7+
remember that we made some important performance considerations when
8+
we discussed it earlier in this chapter.
9+
*/
10+
11+
import { PassThrough, pipeline } from 'stream';
12+
import { createReadStream, createWriteStream } from 'fs';
13+
import { createBrotliCompress, createGzip, createDeflate } from 'zlib';
14+
import path from 'path';
15+
import { hrtime } from 'process';
16+
17+
class Profiler {
18+
constructor(label) {
19+
this.label = label
20+
}
21+
start = () => {
22+
this.startTime = hrtime.bigint();
23+
}
24+
end = () => {
25+
const endTime = hrtime.bigint();
26+
const diff = endTime - this.startTime;
27+
console.log(`${this.label} took ${diff / BigInt(1000000)} ms`);
28+
return diff;
29+
}
30+
}
31+
32+
const sourceFilePath = process.argv[2];
33+
const destinationPath = path.dirname(sourceFilePath);
34+
35+
const compress = (algoritm) => {
36+
let label;
37+
let createCompressStream;
38+
if (algoritm === 'gzip') {
39+
[label, createCompressStream] = ['Gzip', createGzip];
40+
} else if (algoritm === 'brotli') {
41+
[label, createCompressStream] = ['Brotli', createBrotliCompress];
42+
} else if (algoritm === 'deflate') {
43+
[label, createCompressStream] = ['Deflate', createDeflate];
44+
} else {
45+
throw new Error('Invalid algorithm', algoritm);
46+
}
47+
const profiler = new Profiler(label);
48+
49+
const readStream = createReadStream(sourceFilePath);
50+
const compressStream = createCompressStream();
51+
const writeStream = createWriteStream(path.join(destinationPath, `book${algoritm}.gz`));
52+
53+
let size = 0;
54+
const passThrough = new PassThrough();
55+
passThrough.on('data', (chunk) => {
56+
size += chunk.length;
57+
})
58+
const cb = (err) => {
59+
if (err) throw err;
60+
console.log('====================================')
61+
profiler.end();
62+
console.log(`Total byte size is ${size}`);
63+
}
64+
65+
profiler.start();
66+
pipeline(readStream, compressStream, passThrough, writeStream, cb);
67+
}
68+
69+
// Algorithms data compression test should be done isolated
70+
// In fork mode slowest algorithm slows down others
71+
compress('gzip');
72+
compress('brotli');
73+
compress('deflate');
74+
75+
/* Sample output
76+
====================================
77+
Deflate took 288 ms
78+
Total byte size is 4540894
79+
====================================
80+
Gzip took 295 ms
81+
Total byte size is 4540906
82+
====================================
83+
Brotli took 12788 ms
84+
Total byte size is 4208085
85+
*/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { Transform } from 'stream';
2+
3+
export class CountCrimesByParam extends Transform {
4+
constructor(options = {}) {
5+
options.objectMode = true
6+
super(options)
7+
this.param = options.param;
8+
this.crimesByParam = {};
9+
}
10+
11+
_transform(row, enc, cb) {
12+
try {
13+
if (!row || row.year === 'year') return cb(); // skip header
14+
const paramVal = row[this.param];
15+
if (!this.crimesByParam[paramVal]) this.crimesByParam[paramVal] = 0;
16+
this.crimesByParam[paramVal] += Number(row.value);
17+
cb()
18+
} catch (e) {
19+
cb(e);
20+
}
21+
22+
}
23+
24+
_flush(cb) {
25+
try {
26+
this.push(Object.entries(this.crimesByParam));
27+
cb()
28+
} catch (e) {
29+
cb(e);
30+
}
31+
}
32+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Transform } from 'stream';
2+
3+
export class ResultToString extends Transform {
4+
constructor(options = {}) {
5+
options.objectMode = true
6+
super(options)
7+
this.label = options.label || '';
8+
}
9+
10+
_transform(arr, enc, cb) {
11+
let text = `\n${this.label}\n`;
12+
for (const entry of arr) {
13+
text = text + `${entry[0]}: ${entry[1]}\n`;
14+
}
15+
this.push(text);
16+
cb();
17+
}
18+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { Transform } from 'stream';
2+
3+
export class SortAndLimit extends Transform {
4+
constructor(options = {}) {
5+
options.objectMode = true;
6+
super(options);
7+
8+
this.sortDirection = options.sortDirection || 'desc';
9+
this.limit = options.limit;
10+
}
11+
12+
_transform(arr, enc, cb) {
13+
try {
14+
arr.sort((a, b) => this.sortDirection === 'desc' ? b[1] - a[1] : a[1] - b[1]);
15+
const result = this.limit ? arr.slice(0, this.limit) : arr;
16+
this.push(result);
17+
cb();
18+
} catch (e) {
19+
cb(e);
20+
}
21+
}
22+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { Transform } from 'stream';
2+
3+
export class TopCategoryPerArea extends Transform {
4+
constructor(options = {}) {
5+
options.objectMode = true
6+
super(options)
7+
this.countCategoriesByArea = {};
8+
}
9+
10+
_transform(row, enc, cb) {
11+
if (!row || row.year === 'year') return cb(); // skip header
12+
try {
13+
const area = this.countCategoriesByArea[row.borough];
14+
if (area) {
15+
if (!area[row.major_category]) area[row.major_category] = 0;
16+
area[row.major_category] += Number(row.value);
17+
} else {
18+
this.countCategoriesByArea[row.borough] = {
19+
[row.major_category]: Number(row.value),
20+
}
21+
}
22+
cb();
23+
} catch (e) {
24+
cb(e);
25+
}
26+
}
27+
28+
_flush(cb) {
29+
try {
30+
const res = Object.entries(this.countCategoriesByArea).map(([key, val]) => {
31+
const maxVal = Object.entries(val).sort(([, a], [, b]) => b - a)[0];
32+
return [`${key}, ${maxVal[0]}`, maxVal[1]]
33+
});
34+
35+
this.push(res);
36+
cb();
37+
} catch (e) {
38+
cb(e);
39+
}
40+
}
41+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
6.2 Stream data processing: On Kaggle, you can find a lot of interesting data
3+
sets, such as the London Crime Data ( nodejsdp.link/london-crime ). You can
4+
download the data in CSV format and build a stream processing script that
5+
analyzes the data and tries to answer the following questions:
6+
• Did the number of crimes go up or down over the years?
7+
• What are the most dangerous areas of London?
8+
• What is the most common crime per area?
9+
• What is the least common crime?
10+
*/
11+
12+
import { createReadStream } from 'fs';
13+
import { parse } from 'csv-parse'
14+
import { pipeline } from 'stream';
15+
import { CountCrimesByParam } from './CountCrimesByParam.js';
16+
import { ResultToString } from './ResultToString.js';
17+
import { SortAndLimit } from './SortAndLimit.js';
18+
import { TopCategoryPerArea } from './TopCategoryPerArea.js';
19+
20+
21+
const fileDest = '/home/baiaman_dev/Documents/short.csv';
22+
23+
const readStream = createReadStream(fileDest);
24+
const csvParser = parse({ skip_records_with_error: true, columns: true });
25+
26+
const cb = (err) => {
27+
console.log('Error occurred in pipeline', err);
28+
}
29+
30+
// Did the number of crimes go up or down over the years?
31+
pipeline(
32+
readStream,
33+
csvParser,
34+
new CountCrimesByParam({ param: 'year' }),
35+
new ResultToString({ label: 'Number of crimes by year' }),
36+
process.stdout,
37+
cb
38+
);
39+
40+
// What are the most dangerous areas of London?
41+
pipeline(
42+
readStream,
43+
csvParser,
44+
new CountCrimesByParam({ param: 'borough' }),
45+
new SortAndLimit({ sortDirection: 'desc', limit: 20 }),
46+
new ResultToString({ label: 'Most dangerous areas in London' }),
47+
process.stdout,
48+
cb
49+
);
50+
51+
// What is the least common crime?
52+
pipeline(
53+
readStream,
54+
csvParser,
55+
new CountCrimesByParam({ param: 'major_category' }),
56+
new SortAndLimit({ sortDirection: 'asc', limit: 20 }),
57+
new ResultToString({ label: 'Top least common crimes' }),
58+
process.stdout,
59+
cb
60+
);
61+
62+
// What is the most common crime per area?
63+
pipeline(
64+
readStream,
65+
csvParser,
66+
new TopCategoryPerArea(),
67+
new ResultToString({ label: 'Most common crime per area' }),
68+
process.stdout,
69+
cb
70+
);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { createReadStream } from 'fs';
2+
import net from 'net';
3+
import { pipeline } from 'stream';
4+
import path from 'path';
5+
6+
const filePath = process.argv[2] || '/home/baiaman_dev/Documents/spider.jpg';
7+
const filename = path.basename(filePath);
8+
9+
const client = net.createConnection({ port: 3000 }, () => {
10+
console.log('Connection listener');
11+
})
12+
13+
client.on('connect', () => {
14+
client.write(filename);
15+
pipeline(createReadStream(filePath), client, (err) => {
16+
if (err) console.log(err);
17+
18+
client.end();
19+
})
20+
})
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import net from 'net';
2+
import { createWriteStream, write } from 'fs';
3+
import { dirname, join } from 'path';
4+
import { fileURLToPath } from 'url';
5+
6+
const __dirname = dirname(fileURLToPath(import.meta.url));
7+
let writeStream;
8+
9+
const server = net.createServer((socket) => {
10+
socket.on('data', (data) => {
11+
// first chunk is filename
12+
// ideally fixed byte length should be allocated
13+
if (!writeStream) {
14+
const file = data.toString();
15+
writeStream = createWriteStream(join(__dirname, file));
16+
console.log('Started accepting file', file);
17+
return;
18+
}
19+
20+
writeStream.write(data);
21+
});
22+
23+
socket.on('close', () => {
24+
writeStream.destroy();
25+
writeStream = null;
26+
console.log('closed')
27+
})
28+
29+
socket.on('error', (e) => {
30+
console.log('Error', e)
31+
})
32+
})
33+
34+
server.listen(3000);
177 KB
Loading

package-lock.json

Lines changed: 59 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)