Skip to content

Commit a70ff81

Browse files
author
Daniel García Aubert
committed
Merge tag '1.42.3' into cdb
1.42.3 - 2016-11-07 ------------------- Announcements: * Raise payload limit for batch-queries to 16kb.
2 parents ad3428e + 7d0c69e commit a70ff81

File tree

12 files changed

+628
-276
lines changed

12 files changed

+628
-276
lines changed

NEWS.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,31 @@
1+
1.42.3 - 2016-11-07
2+
-------------------
3+
4+
Announcements:
5+
* Raise payload limit for batch-queries to 16kb.
6+
7+
8+
1.42.2 - 2016-11-07
9+
-------------------
10+
11+
Bug fixes:
12+
* Improve error handling while registering jobs to be tracked.
13+
14+
15+
1.42.1 - 2016-11-03
16+
-------------------
17+
18+
Bug fixes:
19+
* Avoid to use SCAN command to find work-in-progress queues.
20+
21+
22+
1.42.0 - 2016-11-02
23+
-------------------
24+
25+
Announcements:
26+
* Adds endpoint to check running batch queries
27+
28+
129
1.41.0 - 2016-10-21
230
-------------------
331

app/controllers/job_controller.js

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ var authenticatedMiddleware = require('../middlewares/authenticated-request');
88
var handleException = require('../utils/error_handler');
99

1010
var ONE_KILOBYTE_IN_BYTES = 1024;
11-
var MAX_LIMIT_QUERY_SIZE_IN_KB = 8;
11+
var MAX_LIMIT_QUERY_SIZE_IN_KB = 16;
1212
var MAX_LIMIT_QUERY_SIZE_IN_BYTES = MAX_LIMIT_QUERY_SIZE_IN_KB * ONE_KILOBYTE_IN_BYTES;
1313

1414
function getMaxSizeErrorMessage(sql) {
@@ -48,6 +48,11 @@ JobController.prototype.route = function (app) {
4848
bodyPayloadSizeMiddleware, userMiddleware, authenticatedMiddleware(this.userDatabaseService),
4949
this.createJob.bind(this)
5050
);
51+
app.get(
52+
global.settings.base_url + '/sql/job/wip',
53+
userMiddleware, authenticatedMiddleware(this.userDatabaseService),
54+
this.listWorkInProgressJobs.bind(this)
55+
);
5156
app.get(
5257
global.settings.base_url + '/sql/job/:job_id',
5358
userMiddleware, authenticatedMiddleware(this.userDatabaseService),
@@ -82,6 +87,36 @@ JobController.prototype.createJob = function (req, res) {
8287
this.jobService.create(data, jobResponse(req, res, this.statsdClient, 'create', 201));
8388
};
8489

90+
JobController.prototype.listWorkInProgressJobs = function (req, res) {
91+
var self = this;
92+
93+
this.jobService.listWorkInProgressJobs(function (err, list) {
94+
if (err) {
95+
self.statsdClient.increment('sqlapi.job.error');
96+
return handleException(err, res);
97+
}
98+
99+
res.header('X-Served-By-DB-Host', req.context.userDatabase.host);
100+
101+
req.profiler.done('list');
102+
req.profiler.end();
103+
req.profiler.sendStats();
104+
105+
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
106+
self.statsdClient.increment('sqlapi.job.success');
107+
108+
if (process.env.NODE_ENV !== 'test') {
109+
console.info(JSON.stringify({
110+
type: 'sql_api_batch_job',
111+
username: req.context.user,
112+
action: 'list'
113+
}));
114+
}
115+
116+
res.status(200).send(list);
117+
});
118+
};
119+
85120
function jobResponse(req, res, statsdClient, action, status) {
86121
return function handler(err, job) {
87122
status = status || 200;

batch/batch.js

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe
1717
this.jobService = jobService;
1818
this.jobPublisher = jobPublisher;
1919
this.logger = logger;
20-
this.hostScheduler = new HostScheduler(name, { run: this.processJob.bind(this) }, redisPool);
20+
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
2121

2222
// map: user => jobId. Will be used for draining jobs.
2323
this.workInProgressJobs = {};
@@ -63,10 +63,7 @@ Batch.prototype.processJob = function (user, callback) {
6363
return callback(null, EMPTY_QUEUE);
6464
}
6565

66-
self.setWorkInProgressJob(user, jobId);
67-
self.jobRunner.run(jobId, function (err, job) {
68-
self.clearWorkInProgressJob(user);
69-
66+
self._processWorkInProgressJob(user, jobId, function (err, job) {
7067
if (err) {
7168
debug(err);
7269
if (err.name === 'JobNotRunnable') {
@@ -87,6 +84,26 @@ Batch.prototype.processJob = function (user, callback) {
8784
});
8885
};
8986

87+
Batch.prototype._processWorkInProgressJob = function (user, jobId, callback) {
88+
var self = this;
89+
90+
self.setWorkInProgressJob(user, jobId, function (errSet) {
91+
if (errSet) {
92+
debug(new Error('Could not add job to work-in-progress list. Reason: ' + errSet.message));
93+
}
94+
95+
self.jobRunner.run(jobId, function (err, job) {
96+
self.clearWorkInProgressJob(user, jobId, function (errClear) {
97+
if (errClear) {
98+
debug(new Error('Could not clear job from work-in-progress list. Reason: ' + errClear.message));
99+
}
100+
101+
return callback(err, job);
102+
});
103+
});
104+
});
105+
};
106+
90107
Batch.prototype.drain = function (callback) {
91108
var self = this;
92109
var workingUsers = this.getWorkInProgressUsers();
@@ -138,16 +155,18 @@ Batch.prototype.stop = function (callback) {
138155

139156
/* Work in progress jobs */
140157

141-
Batch.prototype.setWorkInProgressJob = function(user, jobId) {
158+
Batch.prototype.setWorkInProgressJob = function(user, jobId, callback) {
142159
this.workInProgressJobs[user] = jobId;
160+
this.jobService.addWorkInProgressJob(user, jobId, callback);
143161
};
144162

145163
Batch.prototype.getWorkInProgressJob = function(user) {
146164
return this.workInProgressJobs[user];
147165
};
148166

149-
Batch.prototype.clearWorkInProgressJob = function(user) {
167+
Batch.prototype.clearWorkInProgressJob = function(user, jobId, callback) {
150168
delete this.workInProgressJobs[user];
169+
this.jobService.clearWorkInProgressJob(user, jobId, callback);
151170
};
152171

153172
Batch.prototype.getWorkInProgressUsers = function() {

batch/job_backend.js

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
var REDIS_PREFIX = 'batch:jobs:';
44
var REDIS_DB = 5;
55
var JobStatus = require('./job_status');
6+
var queue = require('queue-async');
7+
var debug = require('./util/debug')('job-backend');
68

79
function JobBackend(metadataBackend, jobQueue) {
810
this.metadataBackend = metadataBackend;
911
this.jobQueue = jobQueue;
1012
this.maxNumberOfQueuedJobs = global.settings.batch_max_queued_jobs || 64;
1113
this.inSecondsJobTTLAfterFinished = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
14+
this.hostname = global.settings.api_hostname || 'batch';
1215
}
1316

1417
function toRedisParams(job) {
@@ -164,6 +167,102 @@ JobBackend.prototype.save = function (job, callback) {
164167
});
165168
};
166169

170+
var WORK_IN_PROGRESS_JOB = {
171+
DB: 5,
172+
PREFIX_USER: 'batch:wip:user:',
173+
USER_INDEX_KEY: 'batch:wip:users'
174+
};
175+
176+
JobBackend.prototype.addWorkInProgressJob = function (user, jobId, callback) {
177+
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
178+
debug('add job %s to user %s (%s)', jobId, user, userWIPKey);
179+
this.metadataBackend.redisMultiCmd(WORK_IN_PROGRESS_JOB.DB, [
180+
['SADD', WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user],
181+
['RPUSH', userWIPKey, jobId]
182+
], callback);
183+
};
184+
185+
JobBackend.prototype.clearWorkInProgressJob = function (user, jobId, callback) {
186+
var self = this;
187+
var DB = WORK_IN_PROGRESS_JOB.DB;
188+
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
189+
190+
var params = [userWIPKey, 0, jobId];
191+
self.metadataBackend.redisCmd(DB, 'LREM', params, function (err) {
192+
if (err) {
193+
return callback(err);
194+
}
195+
196+
params = [userWIPKey, 0, -1];
197+
self.metadataBackend.redisCmd(DB, 'LRANGE', params, function (err, workInProgressJobs) {
198+
if (err) {
199+
return callback(err);
200+
}
201+
202+
debug('user %s has work in progress jobs %j', user, workInProgressJobs);
203+
204+
if (workInProgressJobs.length < 0) {
205+
return callback();
206+
}
207+
208+
debug('delete user %s from index', user);
209+
210+
params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user];
211+
self.metadataBackend.redisCmd(DB, 'SREM', params, function (err) {
212+
if (err) {
213+
return callback(err);
214+
}
215+
216+
return callback();
217+
});
218+
});
219+
});
220+
};
221+
222+
JobBackend.prototype.listWorkInProgressJobByUser = function (user, callback) {
223+
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
224+
var params = [userWIPKey, 0, -1];
225+
this.metadataBackend.redisCmd(WORK_IN_PROGRESS_JOB.DB, 'LRANGE', params, callback);
226+
};
227+
228+
JobBackend.prototype.listWorkInProgressJobs = function (callback) {
229+
var self = this;
230+
var DB = WORK_IN_PROGRESS_JOB.DB;
231+
232+
var params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY];
233+
this.metadataBackend.redisCmd(DB, 'SMEMBERS', params, function (err, workInProgressUsers) {
234+
if (err) {
235+
return callback(err);
236+
}
237+
238+
if (workInProgressUsers < 1) {
239+
return callback(null, {});
240+
}
241+
242+
debug('found %j work in progress users', workInProgressUsers);
243+
244+
var usersQueue = queue(4);
245+
246+
workInProgressUsers.forEach(function (user) {
247+
usersQueue.defer(self.listWorkInProgressJobByUser.bind(self), user);
248+
});
249+
250+
usersQueue.awaitAll(function (err, userWorkInProgressJobs) {
251+
if (err) {
252+
return callback(err);
253+
}
254+
255+
var workInProgressJobs = workInProgressUsers.reduce(function (users, user, index) {
256+
users[user] = userWorkInProgressJobs[index];
257+
debug('found %j work in progress jobs for user %s', userWorkInProgressJobs[index], user);
258+
return users;
259+
}, {});
260+
261+
callback(null, workInProgressJobs);
262+
});
263+
});
264+
};
265+
167266
JobBackend.prototype.setTTL = function (job, callback) {
168267
var self = this;
169268
var redisKey = REDIS_PREFIX + job.job_id;

batch/job_service.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,15 @@ JobService.prototype.drain = function (job_id, callback) {
122122
});
123123
});
124124
};
125+
126+
JobService.prototype.addWorkInProgressJob = function (user, jobId, callback) {
127+
this.jobBackend.addWorkInProgressJob(user, jobId, callback);
128+
};
129+
130+
JobService.prototype.clearWorkInProgressJob = function (user, jobId, callback) {
131+
this.jobBackend.clearWorkInProgressJob(user, jobId, callback);
132+
};
133+
134+
JobService.prototype.listWorkInProgressJobs = function (callback) {
135+
this.jobBackend.listWorkInProgressJobs(callback);
136+
};

doc/batch_queries.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
A Batch Query enables you to request queries with long-running CPU processing times. Typically, these kind of requests raise timeout errors when using the SQL API. In order to avoid timeouts, you can use Batch Queries to [create](#create-a-job), [read](#read-a-job) and [cancel](#cancel-a-job) queries. You can also run a [chained batch query](#chaining-batch-queries) to chain several SQL queries into one job. A Batch Query schedules the incoming jobs and allows you to request the job status for each query.
44

5-
_Batch Queries are not intended to be used for large query payloads that contain over 8192 characters (8kb). For instance, if you are inserting a large number of rows into your table, you still need to use the [Import API](https://carto.com/docs/carto-engine/import-api/) or [SQL API](https://carto.com/docs/carto-engine/sql-api/) for this type of data management. Batch Queries are specific to queries and CPU usage._
5+
_Batch Queries are not intended to be used for large query payloads that contain over 16384 characters (16kb). For instance, if you are inserting a large number of rows into your table, you still need to use the [Import API](https://carto.com/docs/carto-engine/import-api/) or [SQL API](https://carto.com/docs/carto-engine/sql-api/) for this type of data management. Batch Queries are specific to queries and CPU usage._
66

77
**Note:** In order to use Batch Queries, you **must** be [authenticated](https://carto.com/docs/carto-engine/sql-api/authentication/) using API keys.
88

@@ -486,7 +486,6 @@ For best practices, follow these recommended usage notes when using Batch Querie
486486
487487
- Batch Queries are not intended for large query payloads (e.g: inserting thousands of rows), use the [Import API](https://carto.com/docs/carto-engine/import-api/) for this type of data management.
488488
489-
- There is a limit of 8kb per job. The following error message appears if your job exceeds this size:
490-
491-
`Your payload is too large. Max size allowed is 8192 (8kb)`
489+
- There is a limit of 16kb per job. The following error message appears if your job exceeds this size:
492490
491+
`Your payload is too large. Max size allowed is 16384 (16kb)`

0 commit comments

Comments
 (0)