Skip to content

Commit 66a1c33

Browse files
committed
Simplify listener subscription logic
Always remove pending listeners on acquire call. Always register add and release listeners on acquire.
1 parent 58deb49 commit 66a1c33

File tree

2 files changed

+54
-23
lines changed

2 files changed

+54
-23
lines changed

batch/scheduler/scheduler.js

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ Scheduler.prototype.schedule = function() {
109109
};
110110

111111
Scheduler.prototype.acquire = function(callback) {
112+
this.removeAllListeners('add');
113+
this.removeAllListeners('release');
114+
112115
if (this.tasks.every(is(STATUS.DONE))) {
113116
return callback(null, null);
114117
}
@@ -119,39 +122,29 @@ Scheduler.prototype.acquire = function(callback) {
119122
return callback(err);
120123
}
121124

122-
function addListener() {
123-
self.removeListener('release', releaseListener);
125+
debug('Trying to acquire task');
126+
var running = self.tasks.filter(is(STATUS.RUNNING));
127+
debug('[capacity=%d, running=%d] candidates=%j', capacity, running.length, self.tasks);
128+
129+
self.once('add', function() {
124130
debug('Got a new task');
125131
self.acquire(callback);
126-
}
127-
128-
function releaseListener() {
129-
self.removeListener('add', addListener);
132+
});
133+
self.once('release', function() {
130134
debug('Slot was released');
131135
self.acquire(callback);
132-
}
133-
134-
debug('Trying to acquire task');
135-
136-
var allRunning = self.tasks.every(is(STATUS.RUNNING));
137-
var running = self.tasks.filter(is(STATUS.RUNNING));
138-
debug('[capacity=%d, running=%d, all=%s] candidates=%j', capacity, running.length, allRunning, self.tasks);
139-
140-
if (allRunning && running.length < capacity) {
141-
debug('Waiting for tasks');
142-
self.once('add', addListener);
143-
}
136+
});
144137

145138
if (running.length >= capacity) {
146-
debug('Waiting for slot');
147-
return self.once('release', releaseListener);
139+
debug('Not enough capacity');
140+
return null;
148141
}
149142

150143
var isRunningAny = self.tasks.some(is(STATUS.RUNNING));
151144
var candidate = self.tasksTree.min();
152145
if (isRunningAny && candidate === null) {
153146
debug('Waiting for last task to finish');
154-
return self.once('release', releaseListener);
147+
return null;
155148
}
156149

157150
return callback(null, candidate);

test/integration/batch/scheduler.js

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ describe('scheduler', function() {
2121
};
2222

2323
// simulate one by one or infinity capacity
24-
var capacities = [new FixedCapacity(1), new FixedCapacity(Infinity)];
24+
var capacities = [new FixedCapacity(1), new FixedCapacity(2), new FixedCapacity(Infinity)];
2525

2626
capacities.forEach(function(capacity) {
2727

28-
it('regression', function (done) {
28+
it('regression #1', function (done) {
2929
var taskRunner = new TaskRunner({
3030
userA: 2,
3131
userB: 2
@@ -50,6 +50,44 @@ describe('scheduler', function() {
5050
scheduler.schedule();
5151
});
5252

53+
it('regression #2', function (done) {
54+
var taskRunner = new TaskRunner({
55+
userA: 2,
56+
userB: 2,
57+
userC: 2,
58+
userD: 1
59+
});
60+
var scheduler = new Scheduler(capacity, taskRunner);
61+
scheduler.add('userA');
62+
scheduler.add('userB');
63+
64+
scheduler.on('done', function() {
65+
var results = taskRunner.results;
66+
67+
assert.equal(results.length, 7);
68+
69+
assert.equal(results[0], 'userA');
70+
assert.equal(results[1], 'userB');
71+
assert.equal(results[2], 'userC');
72+
assert.equal(results[3], 'userD');
73+
assert.equal(results[4], 'userA');
74+
assert.equal(results[5], 'userB');
75+
assert.equal(results[6], 'userC');
76+
77+
return done();
78+
});
79+
80+
setTimeout(function() {
81+
scheduler.add('userC');
82+
}, 10);
83+
84+
setTimeout(function() {
85+
scheduler.add('userD');
86+
}, 20);
87+
88+
scheduler.schedule();
89+
});
90+
5391
it('should run tasks', function (done) {
5492
var taskRunner = new TaskRunner({
5593
userA: 1

0 commit comments

Comments
 (0)