Skip to content

implement local event handling #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions app-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ module.exports = (app) => {
if (!req.authUser.scopes || !helper.checkIfExists(def.scopes, req.authUser.scopes)) {
next(new errors.ForbiddenError('You are not allowed to perform this action!'))
} else {
req.authUser.userId = config.m2m.M2M_AUDIT_USER_ID
req.authUser.handle = config.m2m.M2M_AUDIT_HANDLE
req.authUser = helper.authUserAsM2M()
next()
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const cors = require('cors')
const HttpStatus = require('http-status-codes')
const interceptor = require('express-interceptor')
const logger = require('./src/common/logger')
const eventHandlers = require('./src/eventHandlers')

// setup express app
const app = express()
Expand Down Expand Up @@ -91,6 +92,7 @@ app.use((err, req, res, next) => {

const server = app.listen(app.get('port'), () => {
logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` })
eventHandlers.init()
})

if (process.env.NODE_ENV === 'test') {
Expand Down
14 changes: 7 additions & 7 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ paths:
required: false
schema:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: The user id.
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: The job candidate status.
responses:
'200':
description: OK
Expand Down Expand Up @@ -1686,8 +1686,8 @@ components:
description: "The user id."
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: "The job status."
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: "The job candidate status."
createdAt:
type: string
format: date-time
Expand Down Expand Up @@ -1722,7 +1722,7 @@ components:
properties:
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
JobPatchRequestBody:
properties:
status:
Expand Down Expand Up @@ -2130,8 +2130,8 @@ components:
description: 'The link for the resume that can be downloaded'
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: "The job status."
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: "The job candidate status."
skills:
type: array
items:
Expand Down
33 changes: 33 additions & 0 deletions src/common/eventDispatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Implement an event dispatcher that handles events synchronously.
*/

const handlers = []

/**
* Handle event.
*
* @param {String} topic the topic name
* @param {Object} payload the message payload
* @returns {undefined}
*/
async function handleEvent (topic, payload) {
for (const handler of handlers) {
await handler.handleEvent(topic, payload)
}
}

/**
* Register to the dispatcher.
*
* @param {Object} handler the handler containing the `handleEvent` function
* @returns {undefined}
*/
function register (handler) {
handlers.push(handler)
}

module.exports = {
handleEvent,
register
}
14 changes: 13 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch')
const errors = require('../common/errors')
const logger = require('./logger')
const models = require('../models')
const eventDispatcher = require('./eventDispatcher')
const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper')

const localLogger = {
Expand Down Expand Up @@ -313,6 +314,7 @@ async function postEvent (topic, payload) {
payload
}
await client.postEvent(message)
await eventDispatcher.handleEvent(topic, payload)
}

/**
Expand Down Expand Up @@ -589,6 +591,15 @@ async function ensureUserById (userId) {
}
}

/**
* Generate M2M auth user.
*
* @returns {Object} the M2M auth user
*/
function authUserAsM2M () {
return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE }
}

module.exports = {
checkIfExists,
autoWrapExpress,
Expand All @@ -615,5 +626,6 @@ module.exports = {
getSkillById,
getUserSkill,
ensureJobById,
ensureUserById
ensureUserById,
authUserAsM2M
}
83 changes: 83 additions & 0 deletions src/eventHandlers/JobEventHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Handle events for Job.
*/

const { Op } = require('sequelize')
const models = require('../models')
const logger = require('../common/logger')
const helper = require('../common/helper')
const JobCandidateService = require('../services/JobCandidateService')
const ResourceBookingService = require('../services/ResourceBookingService')

/**
* Cancel all related resource bookings and all related candidates when a job is cancelled.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function cancelJob (payload) {
if (payload.status !== 'cancelled') {
logger.info({
component: 'JobEventHandler',
context: 'cancelJob',
message: `not interested job - status: ${payload.status}`
})
return
}
const candidates = await models.JobCandidate.findAll({
where: {
jobId: payload.id,
status: {
[Op.not]: 'cancelled'
},
deletedAt: null
}
})
const resourceBookings = await models.ResourceBooking.findAll({
where: {
projectId: payload.projectId,
status: {
[Op.not]: 'cancelled'
},
deletedAt: null
}
})
await Promise.all([
...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
helper.authUserAsM2M(),
candidate.id,
{ status: 'cancelled' }
).then(result => {
logger.info({
component: 'JobEventHandler',
context: 'cancelJob',
message: `id: ${result.id} candidate got cancelled.`
})
})),
...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking(
helper.authUserAsM2M(),
resource.id,
{ status: 'cancelled' }
).then(result => {
logger.info({
component: 'JobEventHandler',
context: 'cancelJob',
message: `id: ${result.id} resource booking got cancelled.`
})
}))
])
}

/**
* Process job update event.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function processUpdate (payload) {
await cancelJob(payload)
}

module.exports = {
processUpdate
}
111 changes: 111 additions & 0 deletions src/eventHandlers/ResourceBookingEventHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Handle events for ResourceBooking.
*/

const { Op } = require('sequelize')
const models = require('../models')
const logger = require('../common/logger')
const helper = require('../common/helper')
const JobService = require('../services/JobService')
const JobCandidateService = require('../services/JobCandidateService')

/**
* When ResourceBooking's status is changed to `assigned`
* the corresponding JobCandidate record (with the same userId and jobId)
* should be updated with status `selected`
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function selectJobCandidate (payload) {
if (payload.status !== 'assigned') {
logger.info({
component: 'ResourceBookingEventHandler',
context: 'selectJobCandidate',
message: `not interested resource booking - status: ${payload.status}`
})
return
}
const candidates = await models.JobCandidate.findAll({
where: {
jobId: payload.jobId,
userId: payload.userId,
status: {
[Op.not]: 'selected'
},
deletedAt: null
}
})
await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
helper.authUserAsM2M(),
candidate.id,
{ status: 'selected' }
).then(result => {
logger.info({
component: 'ResourceBookingEventHandler',
context: 'selectJobCandidate',
message: `id: ${result.id} candidate got selected.`
})
})))
}

/**
* Update the status of the Job to assigned when it positions requirement is fullfilled.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function assignJob (payload) {
if (payload.status !== 'assigned') {
logger.info({
component: 'ResourceBookingEventHandler',
context: 'assignJob',
message: `not interested resource booking - status: ${payload.status}`
})
return
}
const job = await models.Job.findOne({
where: {
projectId: payload.projectId,
deletedAt: null
}
})
if (job.status === 'assigned') {
logger.info({
component: 'ResourceBookingEventHandler',
context: 'assignJob',
message: `job with projectId ${payload.projectId} is already assigned`
})
return
}
const resourceBookings = await models.ResourceBooking.findAll({
where: {
status: 'assigned',
deletedAt: null
}
})
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'assignJob',
message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}`
})
if (job.numPositions === resourceBookings.length) {
await JobService.partiallyUpdateJob(helper.authUserAsM2M(), job.id, { status: 'assigned' })
logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job with projectId ${payload.projectId} is assigned` })
}
}

/**
* Process resource booking update event.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function processUpdate (payload) {
await selectJobCandidate(payload)
await assignJob(payload)
}

module.exports = {
processUpdate
}
52 changes: 52 additions & 0 deletions src/eventHandlers/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* The entry of event handlers.
*/

const config = require('config')
const eventDispatcher = require('../common/eventDispatcher')
const JobEventHandler = require('./JobEventHandler')
const ResourceBookingEventHandler = require('./ResourceBookingEventHandler')
const logger = require('../common/logger')

const TopicOperationMapping = {
[config.TAAS_JOB_UPDATE_TOPIC]: JobEventHandler.processUpdate,
[config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingEventHandler.processUpdate
}

/**
* Handle event.
*
* @param {String} topic the topic name
* @param {Object} payload the message payload
* @returns {undefined}
*/
async function handleEvent (topic, payload) {
if (!TopicOperationMapping[topic]) {
logger.info({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` })
return
}
logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `handling event - topic: ${topic} - payload: ${JSON.stringify(payload)}` })
try {
await TopicOperationMapping[topic](payload)
} catch (err) {
logger.error({ component: 'eventHanders', context: 'handleEvent', message: 'failed to handle event' })
// throw error so that it can be handled by the app
throw err
}
logger.info({ component: 'eventHanders', context: 'handleEvent', message: 'event successfully handled' })
}

/**
* Attach the handlers to the event dispatcher.
*
* @returns {undefined}
*/
function init () {
eventDispatcher.register({
handleEvent
})
}

module.exports = {
init
}
Loading