Skip to content

Commit 7563868

Browse files
committed
Re-insert into the tree if there was a user in done state that gets a new task
1 parent 62cfe97 commit 7563868

File tree

2 files changed

+59
-24
lines changed

2 files changed

+59
-24
lines changed

batch/scheduler/scheduler.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ Scheduler.prototype.add = function(user) {
5050
if (taskEntity) {
5151
if (taskEntity.status === STATUS.DONE) {
5252
taskEntity.status = STATUS.PENDING;
53+
this.tasksTree.insert(taskEntity);
54+
this.emit('add');
5355
}
5456

5557
return true;
@@ -147,6 +149,10 @@ Scheduler.prototype.acquire = function(callback) {
147149
return null;
148150
}
149151

152+
if (candidate) {
153+
self.emit('acquired', candidate.user);
154+
}
155+
150156
return callback(null, candidate);
151157
});
152158
};

test/integration/batch/scheduler.js

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
'use strict';
22

33
require('../../helper');
4+
var debug = require('../../../batch/util/debug')('scheduler-test');
45
var assert = require('../../support/assert');
56
var Scheduler = require('../../../batch/scheduler/scheduler');
67
var FixedCapacity = require('../../../batch/scheduler/capacity/fixed');
78

89
describe('scheduler', function() {
910

11+
var USER_FINISHED = true;
12+
1013
var USER_A = 'userA';
1114
var USER_B = 'userB';
1215
var USER_C = 'userC';
@@ -24,6 +27,27 @@ describe('scheduler', function() {
2427
}.bind(this), 50);
2528
};
2629

30+
function ManualTaskRunner() {
31+
this.userTasks = {};
32+
}
33+
34+
ManualTaskRunner.prototype.run = function(user, callback) {
35+
if (!this.userTasks.hasOwnProperty(user)) {
36+
this.userTasks[user] = [];
37+
}
38+
this.userTasks[user].push(callback);
39+
};
40+
41+
ManualTaskRunner.prototype.dispatch = function(user, isDone) {
42+
if (this.userTasks.hasOwnProperty(user)) {
43+
var cb = this.userTasks[user].shift();
44+
if (cb) {
45+
return cb(null, isDone);
46+
}
47+
}
48+
};
49+
50+
2751
// simulate one by one or infinity capacity
2852
var capacities = [new FixedCapacity(1), new FixedCapacity(2), new FixedCapacity(Infinity)];
2953

@@ -54,42 +78,47 @@ describe('scheduler', function() {
5478
scheduler.schedule();
5579
});
5680

57-
it('regression #2', function (done) {
58-
var taskRunner = new TaskRunner({
59-
userA: 2,
60-
userB: 2,
61-
userC: 2,
62-
userD: 1
63-
});
81+
it('regression #2: it should restart task after it was done but got re-scheduled', function (done) {
82+
var taskRunner = new ManualTaskRunner();
6483
var scheduler = new Scheduler(capacity, taskRunner);
84+
debug('Adding users A and B');
6585
scheduler.add(USER_A);
6686
scheduler.add(USER_B);
6787

68-
scheduler.on('done', function() {
69-
var results = taskRunner.results;
88+
var acquiredUsers = [];
7089

71-
assert.equal(results.length, 7);
90+
scheduler.on('done', function() {
91+
debug('Users %j', acquiredUsers);
92+
assert.equal(acquiredUsers[0], USER_A);
93+
assert.equal(acquiredUsers[1], USER_B);
94+
assert.equal(acquiredUsers[2], USER_A);
95+
assert.equal(acquiredUsers[3], USER_B);
7296

73-
assert.equal(results[0], USER_A);
74-
assert.equal(results[1], USER_B);
75-
assert.equal(results[2], USER_C);
76-
assert.equal(results[3], 'userD');
77-
assert.equal(results[4], USER_A);
78-
assert.equal(results[5], USER_B);
79-
assert.equal(results[6], USER_C);
97+
assert.equal(acquiredUsers.length, 4);
8098

8199
return done();
82100
});
83101

84-
setTimeout(function() {
85-
scheduler.add(USER_C);
86-
}, 10);
87-
88-
setTimeout(function() {
89-
scheduler.add('userD');
90-
}, 20);
102+
scheduler.on('acquired', function(user) {
103+
debug('Acquired user %s', user);
104+
acquiredUsers.push(user);
105+
});
91106

92107
scheduler.schedule();
108+
109+
debug('User A will be mark as DONE');
110+
taskRunner.dispatch(USER_A, USER_FINISHED);
111+
112+
debug('User B should be running');
113+
debug('User A submit a new task');
114+
scheduler.add(USER_A);
115+
116+
debug('User B will get another task to run');
117+
taskRunner.dispatch(USER_B);
118+
119+
debug('User A should start working on this new task');
120+
taskRunner.dispatch(USER_A, USER_FINISHED);
121+
taskRunner.dispatch(USER_B, USER_FINISHED);
93122
});
94123

95124
it('should run tasks', function (done) {

0 commit comments

Comments
 (0)