Skip to content

Commit 13f5669

Browse files
authored
Merge pull request #85 from imcaizheng/implement-event-handlers-repost
[re-created]implement local event handling
2 parents 89ba0bb + 2d14088 commit 13f5669

10 files changed

+351
-40
lines changed

app.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const cors = require('cors')
1010
const HttpStatus = require('http-status-codes')
1111
const interceptor = require('express-interceptor')
1212
const logger = require('./src/common/logger')
13+
const eventHandlers = require('./src/eventHandlers')
1314

1415
// setup express app
1516
const app = express()
@@ -91,6 +92,7 @@ app.use((err, req, res, next) => {
9192

9293
const server = app.listen(app.get('port'), () => {
9394
logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` })
95+
eventHandlers.init()
9496
})
9597

9698
if (process.env.NODE_ENV === 'test') {

docs/swagger.yaml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,8 @@ paths:
575575
required: false
576576
schema:
577577
type: string
578-
enum: ['open', 'selected', 'shortlist', 'rejected']
579-
description: The user id.
578+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
579+
description: The job candidate status.
580580
responses:
581581
'200':
582582
description: OK
@@ -1776,8 +1776,8 @@ components:
17761776
description: "The user id."
17771777
status:
17781778
type: string
1779-
enum: ['open', 'selected', 'shortlist', 'rejected']
1780-
description: "The job status."
1779+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
1780+
description: "The job candidate status."
17811781
createdAt:
17821782
type: string
17831783
format: date-time
@@ -1812,7 +1812,7 @@ components:
18121812
properties:
18131813
status:
18141814
type: string
1815-
enum: ['open', 'selected', 'shortlist', 'rejected']
1815+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
18161816
JobPatchRequestBody:
18171817
properties:
18181818
status:
@@ -2253,8 +2253,8 @@ components:
22532253
description: 'The link for the resume that can be downloaded'
22542254
status:
22552255
type: string
2256-
enum: ['open', 'selected', 'shortlist', 'rejected']
2257-
description: "The job status."
2256+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
2257+
description: "The job candidate status."
22582258
skills:
22592259
type: array
22602260
items:

src/bootstrap.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Joi.perPage = () => Joi.number().integer().min(1).default(20)
88
Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly')
99
Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled')
1010
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
11-
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected')
11+
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled')
1212

1313
function buildServices (dir) {
1414
const files = fs.readdirSync(dir)

src/common/eventDispatcher.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Implement an event dispatcher that handles events synchronously.
3+
*/
4+
5+
const handlers = []
6+
7+
/**
8+
* Handle event.
9+
*
10+
* @param {String} topic the topic name
11+
* @param {Object} payload the message payload
12+
* @returns {undefined}
13+
*/
14+
async function handleEvent (topic, payload) {
15+
for (const handler of handlers) {
16+
await handler.handleEvent(topic, payload)
17+
}
18+
}
19+
20+
/**
21+
* Register to the dispatcher.
22+
*
23+
* @param {Object} handler the handler containing the `handleEvent` function
24+
* @returns {undefined}
25+
*/
26+
function register (handler) {
27+
handlers.push(handler)
28+
}
29+
30+
module.exports = {
31+
handleEvent,
32+
register
33+
}

src/common/helper.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch')
1212
const errors = require('../common/errors')
1313
const logger = require('./logger')
1414
const models = require('../models')
15+
const eventDispatcher = require('./eventDispatcher')
1516
const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper')
1617

1718
const localLogger = {
@@ -281,8 +282,9 @@ async function getUserId (userId) {
281282
* Send Kafka event message
282283
* @params {String} topic the topic name
283284
* @params {Object} payload the payload
285+
* @params {Object} options the extra options to control the function
284286
*/
285-
async function postEvent (topic, payload) {
287+
async function postEvent (topic, payload, options = {}) {
286288
logger.debug({ component: 'helper', context: 'postEvent', message: `Posting event to Kafka topic ${topic}, ${JSON.stringify(payload)}` })
287289
const client = getBusApiClient()
288290
const message = {
@@ -293,6 +295,7 @@ async function postEvent (topic, payload) {
293295
payload
294296
}
295297
await client.postEvent(message)
298+
await eventDispatcher.handleEvent(topic, { value: payload, options })
296299
}
297300

298301
/**
@@ -603,6 +606,15 @@ async function ensureUserById (userId) {
603606
}
604607
}
605608

609+
/**
610+
* Generate M2M auth user.
611+
*
612+
* @returns {Object} the M2M auth user
613+
*/
614+
function getAuditM2Muser () {
615+
return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE }
616+
}
617+
606618
/**
607619
* Function to check whether a user is a member of a project
608620
* by first retrieving the project detail via /v5/projects/:projectId and
@@ -653,5 +665,6 @@ module.exports = {
653665
getUserSkill,
654666
ensureJobById,
655667
ensureUserById,
668+
getAuditM2Muser,
656669
checkIsMemberOfProject
657670
}

src/eventHandlers/JobEventHandler.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Handle events for Job.
3+
*/
4+
5+
const { Op } = require('sequelize')
6+
const models = require('../models')
7+
const logger = require('../common/logger')
8+
const helper = require('../common/helper')
9+
const JobCandidateService = require('../services/JobCandidateService')
10+
const ResourceBookingService = require('../services/ResourceBookingService')
11+
12+
/**
13+
* Cancel all related resource bookings and all related candidates when a job is cancelled.
14+
*
15+
* @param {Object} payload the event payload
16+
* @returns {undefined}
17+
*/
18+
async function cancelJob (payload) {
19+
if (payload.value.status === payload.options.oldValue.status) {
20+
logger.debug({
21+
component: 'JobEventHandler',
22+
context: 'cancelJob',
23+
message: 'status not changed'
24+
})
25+
return
26+
}
27+
if (payload.value.status !== 'cancelled') {
28+
logger.debug({
29+
component: 'JobEventHandler',
30+
context: 'cancelJob',
31+
message: `not interested job - status: ${payload.value.status}`
32+
})
33+
return
34+
}
35+
// pull data from db instead of directly extract data from the payload
36+
// since the payload may not contain all fields when it is from partically update operation.
37+
const job = await models.Job.findById(payload.value.id)
38+
const candidates = await models.JobCandidate.findAll({
39+
where: {
40+
jobId: job.id,
41+
status: {
42+
[Op.not]: 'cancelled'
43+
},
44+
deletedAt: null
45+
}
46+
})
47+
const resourceBookings = await models.ResourceBooking.findAll({
48+
where: {
49+
jobId: job.id,
50+
status: {
51+
[Op.not]: 'cancelled'
52+
},
53+
deletedAt: null
54+
}
55+
})
56+
await Promise.all([
57+
...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
58+
helper.getAuditM2Muser(),
59+
candidate.id,
60+
{ status: 'cancelled' }
61+
).then(result => {
62+
logger.info({
63+
component: 'JobEventHandler',
64+
context: 'cancelJob',
65+
message: `id: ${result.id} candidate got cancelled.`
66+
})
67+
})),
68+
...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking(
69+
helper.getAuditM2Muser(),
70+
resource.id,
71+
{ status: 'cancelled' }
72+
).then(result => {
73+
logger.info({
74+
component: 'JobEventHandler',
75+
context: 'cancelJob',
76+
message: `id: ${result.id} resource booking got cancelled.`
77+
})
78+
}))
79+
])
80+
}
81+
82+
/**
83+
* Process job update event.
84+
*
85+
* @param {Object} payload the event payload
86+
* @returns {undefined}
87+
*/
88+
async function processUpdate (payload) {
89+
await cancelJob(payload)
90+
}
91+
92+
module.exports = {
93+
processUpdate
94+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Handle events for ResourceBooking.
3+
*/
4+
5+
const { Op } = require('sequelize')
6+
const models = require('../models')
7+
const logger = require('../common/logger')
8+
const helper = require('../common/helper')
9+
const JobService = require('../services/JobService')
10+
const JobCandidateService = require('../services/JobCandidateService')
11+
12+
/**
13+
* When ResourceBooking's status is changed to `assigned`
14+
* the corresponding JobCandidate record (with the same userId and jobId)
15+
* should be updated with status `selected`
16+
*
17+
* @param {Object} payload the event payload
18+
* @returns {undefined}
19+
*/
20+
async function selectJobCandidate (payload) {
21+
if (payload.value.status === payload.options.oldValue.status) {
22+
logger.debug({
23+
component: 'ResourceBookingEventHandler',
24+
context: 'selectJobCandidate',
25+
message: 'status not changed'
26+
})
27+
return
28+
}
29+
if (payload.value.status !== 'assigned') {
30+
logger.debug({
31+
component: 'ResourceBookingEventHandler',
32+
context: 'selectJobCandidate',
33+
message: `not interested resource booking - status: ${payload.value.status}`
34+
})
35+
return
36+
}
37+
const resourceBooking = await models.ResourceBooking.findById(payload.value.id)
38+
if (!resourceBooking.jobId) {
39+
logger.debug({
40+
component: 'ResourceBookingEventHandler',
41+
context: 'selectJobCandidate',
42+
message: `id: ${resourceBooking.id} resource booking without jobId - ignored`
43+
})
44+
return
45+
}
46+
const candidates = await models.JobCandidate.findAll({
47+
where: {
48+
jobId: resourceBooking.jobId,
49+
userId: resourceBooking.userId,
50+
status: {
51+
[Op.not]: 'selected'
52+
},
53+
deletedAt: null
54+
}
55+
})
56+
await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
57+
helper.getAuditM2Muser(),
58+
candidate.id,
59+
{ status: 'selected' }
60+
).then(result => {
61+
logger.info({
62+
component: 'ResourceBookingEventHandler',
63+
context: 'selectJobCandidate',
64+
message: `id: ${result.id} candidate got selected.`
65+
})
66+
})))
67+
}
68+
69+
/**
70+
* Update the status of the Job to assigned when it positions requirement is fullfilled.
71+
*
72+
* @param {Object} payload the event payload
73+
* @returns {undefined}
74+
*/
75+
async function assignJob (payload) {
76+
if (payload.value.status === payload.options.oldValue.status) {
77+
logger.debug({
78+
component: 'ResourceBookingEventHandler',
79+
context: 'assignJob',
80+
message: 'status not changed'
81+
})
82+
return
83+
}
84+
if (payload.value.status !== 'assigned') {
85+
logger.debug({
86+
component: 'ResourceBookingEventHandler',
87+
context: 'assignJob',
88+
message: `not interested resource booking - status: ${payload.value.status}`
89+
})
90+
return
91+
}
92+
const resourceBooking = await models.ResourceBooking.findById(payload.value.id)
93+
if (!resourceBooking.jobId) {
94+
logger.debug({
95+
component: 'ResourceBookingEventHandler',
96+
context: 'assignJob',
97+
message: `id: ${resourceBooking.id} resource booking without jobId - ignored`
98+
})
99+
return
100+
}
101+
const job = await models.Job.findById(resourceBooking.jobId)
102+
if (job.status === 'assigned') {
103+
logger.debug({
104+
component: 'ResourceBookingEventHandler',
105+
context: 'assignJob',
106+
message: `job with projectId ${job.projectId} is already assigned`
107+
})
108+
return
109+
}
110+
const resourceBookings = await models.ResourceBooking.findAll({
111+
where: {
112+
jobId: job.id,
113+
status: 'assigned',
114+
deletedAt: null
115+
}
116+
})
117+
logger.debug({
118+
component: 'ResourceBookingEventHandler',
119+
context: 'assignJob',
120+
message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}`
121+
})
122+
if (job.numPositions === resourceBookings.length) {
123+
await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' })
124+
logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job ${job.id} is assigned` })
125+
}
126+
}
127+
128+
/**
129+
* Process resource booking update event.
130+
*
131+
* @param {Object} payload the event payload
132+
* @returns {undefined}
133+
*/
134+
async function processUpdate (payload) {
135+
await selectJobCandidate(payload)
136+
await assignJob(payload)
137+
}
138+
139+
module.exports = {
140+
processUpdate
141+
}

0 commit comments

Comments
 (0)