3
3
var util = require ( 'util' ) ;
4
4
var EventEmitter = require ( 'events' ) . EventEmitter ;
5
5
var debug = require ( './util/debug' ) ( 'batch' ) ;
6
- var forever = require ( './util/forever' ) ;
7
6
var queue = require ( 'queue-async' ) ;
8
- var Locker = require ( './leader/locker ' ) ;
7
+ var HostScheduler = require ( './scheduler/host-scheduler ' ) ;
9
8
10
- function Batch ( name , jobSubscriber , jobQueue , jobRunner , jobService , jobPublisher , redisConfig , logger ) {
9
+ var EMPTY_QUEUE = true ;
10
+
11
+ function Batch ( name , jobSubscriber , jobQueue , jobRunner , jobService , jobPublisher , redisPool , logger ) {
11
12
EventEmitter . call ( this ) ;
12
13
this . name = name || 'batch' ;
13
14
this . jobSubscriber = jobSubscriber ;
@@ -16,10 +17,10 @@ function Batch(name, jobSubscriber, jobQueue, jobRunner, jobService, jobPublishe
16
17
this . jobService = jobService ;
17
18
this . jobPublisher = jobPublisher ;
18
19
this . logger = logger ;
19
- this . locker = Locker . create ( 'redis-distlock' , { redisConfig : redisConfig } ) ;
20
+ this . hostScheduler = new HostScheduler ( name , { run : this . processJob . bind ( this ) } , redisPool ) ;
20
21
21
- // map: host => jobId
22
- this . workingQueues = { } ;
22
+ // map: user => jobId. Will be used for draining jobs.
23
+ this . workInProgressJobs = { } ;
23
24
}
24
25
util . inherits ( Batch , EventEmitter ) ;
25
26
@@ -29,33 +30,16 @@ Batch.prototype.start = function () {
29
30
var self = this ;
30
31
31
32
this . jobSubscriber . subscribe (
32
- function onJobHandler ( host ) {
33
- if ( self . isProcessingHost ( host ) ) {
34
- return debug ( '%s is already processing host=%s' , self . name , host ) ;
35
- }
36
-
37
- // do forever, it does not throw a stack overflow
38
- forever (
39
- function ( next ) {
40
- self . locker . lock ( host , function ( err ) {
41
- // we didn't get the lock for the host
42
- if ( err ) {
43
- debug ( 'Could not lock host=%s from %s. Reason: %s' , host , self . name , err . message ) ;
44
- return next ( err ) ;
45
- }
46
- debug ( 'Locked host=%s from %s' , host , self . name ) ;
47
- self . processNextJob ( host , next ) ;
48
- } ) ;
49
- } ,
50
- function ( err ) {
51
- if ( err ) {
52
- debug ( err . name === 'EmptyQueue' ? err . message : err ) ;
53
- }
54
-
55
- self . finishedProcessingHost ( host ) ;
56
- self . locker . unlock ( host , debug ) ;
33
+ function onJobHandler ( user , host ) {
34
+ debug ( '[%s] onJobHandler(%s, %s)' , self . name , user , host ) ;
35
+ self . hostScheduler . add ( host , user , function ( err ) {
36
+ if ( err ) {
37
+ return debug (
38
+ 'Could not schedule host=%s user=%s from %s. Reason: %s' ,
39
+ host , self . name , user , err . message
40
+ ) ;
57
41
}
58
- ) ;
42
+ } ) ;
59
43
} ,
60
44
function onJobSubscriberReady ( err ) {
61
45
if ( err ) {
@@ -67,50 +51,49 @@ Batch.prototype.start = function () {
67
51
) ;
68
52
} ;
69
53
70
- Batch . prototype . processNextJob = function ( host , callback ) {
54
+ Batch . prototype . processJob = function ( user , callback ) {
71
55
var self = this ;
72
- self . jobQueue . dequeue ( host , function ( err , jobId ) {
56
+ self . jobQueue . dequeue ( user , function ( err , jobId ) {
73
57
if ( err ) {
74
- return callback ( err ) ;
58
+ return callback ( new Error ( 'Could not get job from "' + user + '". Reason: ' + err . message ) , ! EMPTY_QUEUE ) ;
75
59
}
76
60
77
61
if ( ! jobId ) {
78
- var emptyQueueError = new Error ( 'Queue ' + host + ' is empty' ) ;
79
- emptyQueueError . name = 'EmptyQueue' ;
80
- return callback ( emptyQueueError ) ;
62
+ debug ( 'Queue empty user=%s' , user ) ;
63
+ return callback ( null , EMPTY_QUEUE ) ;
81
64
}
82
65
83
- self . setProcessingJobId ( host , jobId ) ;
84
-
66
+ self . setWorkInProgressJob ( user , jobId ) ;
85
67
self . jobRunner . run ( jobId , function ( err , job ) {
86
- self . setProcessingJobId ( host , null ) ;
68
+ self . clearWorkInProgressJob ( user ) ;
87
69
88
70
if ( err ) {
89
71
debug ( err ) ;
90
72
if ( err . name === 'JobNotRunnable' ) {
91
- return callback ( ) ;
73
+ return callback ( null , ! EMPTY_QUEUE ) ;
92
74
}
93
- return callback ( err ) ;
75
+ return callback ( err , ! EMPTY_QUEUE ) ;
94
76
}
95
77
96
- debug ( 'Job[%s] status=%s in host=%s (failed_reason=%s)' , jobId , job . data . status , host , job . failed_reason ) ;
78
+ debug (
79
+ '[%s] Job=%s status=%s user=%s (failed_reason=%s)' ,
80
+ self . name , jobId , job . data . status , user , job . failed_reason
81
+ ) ;
97
82
98
83
self . logger . log ( job ) ;
99
84
100
- self . emit ( 'job:' + job . data . status , jobId ) ;
101
-
102
- callback ( ) ;
85
+ return callback ( null , ! EMPTY_QUEUE ) ;
103
86
} ) ;
104
87
} ) ;
105
88
} ;
106
89
107
90
Batch . prototype . drain = function ( callback ) {
108
91
var self = this ;
109
- var workingHosts = this . getWorkingHosts ( ) ;
110
- var batchQueues = queue ( workingHosts . length ) ;
92
+ var workingUsers = this . getWorkInProgressUsers ( ) ;
93
+ var batchQueues = queue ( workingUsers . length ) ;
111
94
112
- workingHosts . forEach ( function ( host ) {
113
- batchQueues . defer ( self . _drainJob . bind ( self ) , host ) ;
95
+ workingUsers . forEach ( function ( user ) {
96
+ batchQueues . defer ( self . _drainJob . bind ( self ) , user ) ;
114
97
} ) ;
115
98
116
99
batchQueues . awaitAll ( function ( err ) {
@@ -124,9 +107,9 @@ Batch.prototype.drain = function (callback) {
124
107
} ) ;
125
108
} ;
126
109
127
- Batch . prototype . _drainJob = function ( host , callback ) {
110
+ Batch . prototype . _drainJob = function ( user , callback ) {
128
111
var self = this ;
129
- var job_id = this . getProcessingJobId ( host ) ;
112
+ var job_id = this . getWorkInProgressJob ( user ) ;
130
113
131
114
if ( ! job_id ) {
132
115
return process . nextTick ( function ( ) {
@@ -143,7 +126,7 @@ Batch.prototype._drainJob = function (host, callback) {
143
126
return callback ( err ) ;
144
127
}
145
128
146
- self . jobQueue . enqueueFirst ( job_id , host , callback ) ;
129
+ self . jobQueue . enqueueFirst ( user , job_id , callback ) ;
147
130
} ) ;
148
131
} ;
149
132
@@ -152,22 +135,21 @@ Batch.prototype.stop = function (callback) {
152
135
this . jobSubscriber . unsubscribe ( callback ) ;
153
136
} ;
154
137
155
- Batch . prototype . isProcessingHost = function ( host ) {
156
- return this . workingQueues . hasOwnProperty ( host ) ;
157
- } ;
158
138
159
- Batch . prototype . getWorkingHosts = function ( ) {
160
- return Object . keys ( this . workingQueues ) ;
139
+ /* Work in progress jobs */
140
+
141
+ Batch . prototype . setWorkInProgressJob = function ( user , jobId ) {
142
+ this . workInProgressJobs [ user ] = jobId ;
161
143
} ;
162
144
163
- Batch . prototype . setProcessingJobId = function ( host , jobId ) {
164
- this . workingQueues [ host ] = jobId ;
145
+ Batch . prototype . getWorkInProgressJob = function ( user ) {
146
+ return this . workInProgressJobs [ user ] ;
165
147
} ;
166
148
167
- Batch . prototype . getProcessingJobId = function ( host ) {
168
- return this . workingQueues [ host ] ;
149
+ Batch . prototype . clearWorkInProgressJob = function ( user ) {
150
+ delete this . workInProgressJobs [ user ] ;
169
151
} ;
170
152
171
- Batch . prototype . finishedProcessingHost = function ( host ) {
172
- delete this . workingQueues [ host ] ;
153
+ Batch . prototype . getWorkInProgressUsers = function ( ) {
154
+ return Object . keys ( this . workInProgressJobs ) ;
173
155
} ;
0 commit comments