Skip to content

Commit 1afd376

Browse files
author
Sylvestre
authored
MySQL2 to handle MongoDB connector BI (#67)
* mysql2 to handle mongo * LONGLONG as numbers * mysql2 for database-proxy * use single connection for database-proxy and set ssl * clean up * adjusted types for test to pass * route for mongosql * typo
1 parent 50c5c94 commit 1afd376

File tree

6 files changed

+299
-89
lines changed

6 files changed

+299
-89
lines changed

lib/mysql.js

Lines changed: 210 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,189 @@
1-
import {json} from "micro";
2-
import {createPool} from "mysql";
3-
import connectionConfig from "mysql/lib/ConnectionConfig.js";
4-
import types from "mysql/lib/protocol/constants/types.js";
51
import JSONStream from "JSONStream";
2+
import {json} from "micro";
3+
import mysql, {createConnection} from "mysql2";
4+
import {failedCheck} from "./errors.js";
65

7-
const {parseUrl} = connectionConfig;
6+
const {Types, ConnectionConfig} = mysql;
87

9-
export default (url) => {
10-
const pool = createPool(parseUrl(url));
8+
export async function query(req, res, pool) {
9+
const {sql, params} = await json(req);
10+
const keepAlive = setInterval(() => res.write("\n"), 25e3);
1111

12-
return async function query(req, res) {
13-
const {sql, params} = await json(req);
12+
let fields;
13+
let rowCount = 0;
14+
let bytes = 0;
15+
try {
16+
await new Promise((resolve, reject) => {
17+
const stream = pool
18+
.query({sql, timeout: 240e3}, params)
19+
.once("fields", (f) => {
20+
res.write(`{"schema":${JSON.stringify(schema((fields = f)))}`);
21+
})
22+
.stream()
23+
.on("end", resolve)
24+
.on("error", (error) => {
25+
if (!stream.destroyed) stream.destroy();
26+
reject(error);
27+
})
28+
.once("readable", () => clearInterval(keepAlive))
29+
.pipe(JSONStream.stringify(`,"data":[`, ",", "]}"))
30+
.on("data", (chunk) => {
31+
bytes += chunk.length;
32+
rowCount++;
33+
if (rowCount && rowCount % 2e3 === 0)
34+
req.log({
35+
progress: {
36+
rows: rowCount,
37+
fields: fields.length,
38+
bytes,
39+
done: false,
40+
},
41+
});
42+
});
43+
stream.pipe(res, {end: false});
44+
});
45+
} catch (error) {
46+
if (!error.statusCode) error.statusCode = 400;
47+
throw error;
48+
} finally {
49+
clearInterval(keepAlive);
50+
}
1451

15-
let fields;
52+
req.log({
53+
progress: {
54+
rows: rowCount,
55+
fields: fields ? fields.length : 0,
56+
bytes,
57+
done: true,
58+
},
59+
});
60+
61+
res.end();
62+
}
63+
64+
export async function queryStream(req, res, pool) {
65+
const {sql, params} = await json(req);
66+
res.setHeader("Content-Type", "text/plain");
67+
const keepAlive = setInterval(() => res.write("\n"), 25e3);
68+
69+
let fields;
70+
let rowCount = 0;
71+
let bytes = 0;
72+
73+
try {
1674
await new Promise((resolve, reject) => {
1775
const stream = pool
18-
.query({sql, timeout: 30e3}, params)
19-
.on("fields", (f) => (fields = f))
76+
.query({sql, timeout: 240e3}, params)
77+
.once("fields", (f) => {
78+
res.write(JSON.stringify(schema((fields = f))));
79+
res.write("\n");
80+
})
2081
.stream()
2182
.on("end", resolve)
2283
.on("error", (error) => {
23-
stream.destroy();
84+
if (!stream.destroyed) stream.destroy();
2485
reject(error);
2586
})
26-
.pipe(JSONStream.stringify(`{"data":[`, ",", "]"));
87+
.once("readable", () => clearInterval(keepAlive))
88+
.pipe(JSONStream.stringify("", "\n", "\n"))
89+
.on("data", (chunk) => {
90+
bytes += chunk.length;
91+
rowCount++;
92+
if (rowCount % 2e3 === 0)
93+
req.log({
94+
progress: {
95+
rows: rowCount,
96+
fields: fields.length,
97+
bytes,
98+
done: false,
99+
},
100+
});
101+
});
27102
stream.pipe(res, {end: false});
28103
});
104+
} catch (error) {
105+
if (!error.statusCode) error.statusCode = 400;
106+
throw error;
107+
} finally {
108+
clearInterval(keepAlive);
109+
}
110+
111+
req.log({
112+
progress: {
113+
rows: rowCount,
114+
fields: fields ? fields.length : 0,
115+
bytes,
116+
done: true,
117+
},
118+
});
119+
120+
res.end();
121+
}
122+
123+
const READ_ONLY = new Set(["SELECT", "SHOW DATABASES", "SHOW VIEW", "USAGE"]);
124+
export async function check(req, res, pool) {
125+
const rows = await new Promise((resolve, reject) => {
126+
pool.query("SHOW GRANTS FOR CURRENT_USER", (error, results) => {
127+
error ? reject(failedCheck(error.message)) : resolve(results);
128+
});
129+
});
130+
const grants = [].concat(
131+
...rows.map((grant) =>
132+
Object.values(grant)[0]
133+
.match(/^GRANT (.+) ON/)[1]
134+
.split(", ")
135+
)
136+
);
137+
const permissive = grants.filter((g) => !READ_ONLY.has(g));
138+
if (permissive.length)
139+
throw failedCheck(
140+
`User has too permissive grants: ${permissive.join(", ")}`
141+
);
142+
143+
return {ok: true};
144+
}
29145

30-
const schema = {
31-
type: "array",
32-
items: {
33-
type: "object",
34-
properties: fields.reduce(
35-
(schema, {name, type, charsetNr}) => (
36-
(schema[name] = dataTypeSchema({type, charsetNr})), schema
37-
),
38-
{}
146+
export default (url) => async (req, res) => {
147+
const config = ConnectionConfig.parseUrl(url);
148+
149+
// Unless specified as a property of the url connection string, ssl is used with the default.
150+
// See https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-security.html#cj-conn-prop_sslMode
151+
if (config.sslMode !== "DISABLED") {
152+
config.ssl = {};
153+
}
154+
155+
// the mysql2.createConnection method is not happy if we pass any extra properties not recognized by it.
156+
delete config.sslMode;
157+
158+
const connection = createConnection({
159+
...config,
160+
decimalNumbers: true,
161+
});
162+
163+
if (req.method === "POST") {
164+
if (req.url === "/query") return query(req, res, connection);
165+
if (req.url === "/query-stream") return queryStream(req, res, connection);
166+
if (req.url === "/check") return check(req, res, connection);
167+
}
168+
169+
throw notFound();
170+
};
171+
172+
function schema(fields) {
173+
return {
174+
type: "array",
175+
items: {
176+
type: "object",
177+
properties: fields.reduce(
178+
(schema, {name, type, characterSet}) => (
179+
(schema[name] = dataTypeSchema({type, charsetNr: characterSet})),
180+
schema
39181
),
40-
},
41-
};
42-
res.end(`,"schema":${JSON.stringify(schema)}}`);
182+
{}
183+
),
184+
},
43185
};
44-
};
186+
}
45187

46188
// https://github.com/mysqljs/mysql/blob/5569e02ad72789f4b396d9a901f0390fe11b5b4e/lib/protocol/constants/types.js
47189
// https://github.com/mysqljs/mysql/blob/5569e02ad72789f4b396d9a901f0390fe11b5b4e/lib/protocol/packets/RowDataPacket.js#L53
@@ -52,46 +194,53 @@ const boolean = ["null", "boolean"],
52194
string = ["null", "string"];
53195
function dataTypeSchema({type, charsetNr}) {
54196
switch (type) {
55-
case types.BIT:
197+
case Types.BIT:
56198
return {type: boolean};
57-
case types.TINY:
58-
case types.SHORT:
59-
case types.LONG:
60-
return {type: integer};
61-
case types.INT24:
62-
case types.YEAR:
63-
case types.FLOAT:
64-
case types.DOUBLE:
65-
case types.DECIMAL:
66-
case types.NEWDECIMAL:
67-
return {type: number};
68-
case types.TIMESTAMP:
69-
case types.DATE:
70-
case types.DATETIME:
71-
case types.NEWDATE:
72-
case types.TIMESTAMP2:
73-
case types.DATETIME2:
74-
case types.TIME2:
199+
case Types.TINY:
200+
return {type: integer, tiny: true};
201+
case Types.SHORT:
202+
return {type: integer, short: true};
203+
case Types.LONG:
204+
return {type: integer, long: true};
205+
case Types.INT24:
206+
return {type: number, int24: true};
207+
case Types.YEAR:
208+
return {type: number, year: true};
209+
case Types.FLOAT:
210+
return {type: number, float: true};
211+
case Types.DOUBLE:
212+
return {type: number, double: true};
213+
case Types.DECIMAL:
214+
return {type: number, decimal: true};
215+
case Types.NEWDECIMAL:
216+
return {type: number, newdecimal: true};
217+
case Types.TIMESTAMP:
218+
case Types.DATE:
219+
case Types.DATETIME:
220+
case Types.NEWDATE:
221+
case Types.TIMESTAMP2:
222+
case Types.DATETIME2:
223+
case Types.TIME2:
75224
return {type: string, date: true};
76-
case types.LONGLONG: // TODO
225+
case Types.LONGLONG:
77226
return {type: string, bigint: true};
78-
case types.TINY_BLOB:
79-
case types.MEDIUM_BLOB:
80-
case types.LONG_BLOB:
81-
case types.BLOB:
82-
case types.VAR_STRING:
83-
case types.VARCHAR:
84-
case types.STRING:
227+
case Types.TINY_BLOB:
228+
case Types.MEDIUM_BLOB:
229+
case Types.LONG_BLOB:
230+
case Types.BLOB:
231+
case Types.VAR_STRING:
232+
case Types.VARCHAR:
233+
case Types.STRING:
85234
return charsetNr === 63 // binary
86235
? {type: object, buffer: true}
87236
: {type: string};
88-
case types.JSON:
89-
return {type: object};
90-
case types.TIME: // TODO
91-
case types.ENUM: // TODO
92-
case types.SET: // TODO
93-
case types.GEOMETRY: // TODO
94-
case types.NULL: // TODO
237+
case Types.JSON:
238+
return {type: object, json: true};
239+
case Types.TIME: // TODO
240+
case Types.ENUM: // TODO
241+
case Types.SET: // TODO
242+
case Types.GEOMETRY: // TODO
243+
case Types.NULL: // TODO
95244
default:
96245
return {type: string};
97246
}

lib/server.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import https from "https";
66
import {readFileSync} from "fs";
77
import {run} from "micro";
88
import {createHmac, timingSafeEqual} from "crypto";
9-
import serializeErrors from "./serialize-errors.js";
9+
import serializeErrors from "../middleware/serialize-errors.js";
1010
import {notFound, unauthorized, exit} from "./errors.js";
1111
import mysql from "./mysql.js";
1212
import postgres from "./postgres.js";
1313
import snowflake from "./snowflake.js";
1414
import mssql from "./mssql.js";
1515
import oracle from "./oracle.js";
1616
import databricks from "./databricks.js";
17+
import logger from "../middleware/logger.js";
1718

1819
export async function server(config, argv) {
1920
const development = process.env.NODE_ENV === "development";
@@ -34,7 +35,7 @@ export async function server(config, argv) {
3435
} = config;
3536

3637
const handler =
37-
type === "mysql"
38+
type === "mysql" || type === "mongosql"
3839
? mysql(url)
3940
: type === "postgres"
4041
? postgres(url)
@@ -63,11 +64,11 @@ export async function server(config, argv) {
6364
const sslcert = readFileSync(argv.sslcert);
6465
const sslkey = readFileSync(argv.sslkey);
6566
server = https.createServer({cert: sslcert, key: sslkey}, (req, res) =>
66-
run(req, res, serializeErrors(index))
67+
run(req, res, logger(serializeErrors(index)))
6768
);
6869
} else {
6970
server = http.createServer((req, res) =>
70-
run(req, res, serializeErrors(index))
71+
run(req, res, logger(serializeErrors(index)))
7172
);
7273
}
7374

middleware/logger.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Recursively flatten key names separated by dots.
2+
function* entries(data, prefix = []) {
3+
if (
4+
data instanceof Object &&
5+
Object.getPrototypeOf(data) === Object.prototype
6+
) {
7+
for (const [key, value] of Object.entries(data))
8+
yield* entries(value, prefix.concat(key));
9+
} else {
10+
yield [prefix.join("."), data];
11+
}
12+
}
13+
14+
export default (handler) => (req, res) => {
15+
req.log = function log(data) {
16+
const requestId = req.headers["x-request-id"];
17+
const parts = requestId ? [`http.request_id=${requestId}`] : [];
18+
for (const [key, value] of entries(data)) parts.push(`${key}=${value}`);
19+
console.log(parts.join(" "));
20+
};
21+
return handler(req, res);
22+
};
File renamed without changes.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"ajv": "^8.11.0",
2424
"micro": "^9.3.4",
2525
"mssql": "^9.0.1",
26-
"mysql": "^2.17.1",
26+
"mysql2": "^3.0.1",
2727
"open": "^6.3.0",
2828
"pg": "^8.7.1",
2929
"pg-query-stream": "^4.2.1",

0 commit comments

Comments
 (0)