diff --git a/app.js b/app.js index f60a0565..56521d9a 100644 --- a/app.js +++ b/app.js @@ -10,6 +10,7 @@ const cors = require('cors') const HttpStatus = require('http-status-codes') const interceptor = require('express-interceptor') const logger = require('./src/common/logger') +const eventHandlers = require('./src/eventHandlers') // setup express app const app = express() @@ -91,6 +92,7 @@ app.use((err, req, res, next) => { const server = app.listen(app.get('port'), () => { logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` }) + eventHandlers.init() }) if (process.env.NODE_ENV === 'test') { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 086a3ed8..42ca3ee6 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -575,8 +575,8 @@ paths: required: false schema: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: The user id. + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: The job candidate status. responses: '200': description: OK @@ -1776,8 +1776,8 @@ components: description: "The user id." status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: "The job status." + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: "The job candidate status." createdAt: type: string format: date-time @@ -1812,7 +1812,7 @@ components: properties: status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] JobPatchRequestBody: properties: status: @@ -2253,8 +2253,8 @@ components: description: 'The link for the resume that can be downloaded' status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: "The job status." + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: "The job candidate status." skills: type: array items: diff --git a/src/bootstrap.js b/src/bootstrap.js index a35212e0..c051b045 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -8,7 +8,7 @@ Joi.perPage = () => Joi.number().integer().min(1).default(20) Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled') Joi.workload = () => Joi.string().valid('full-time', 'fractional') -Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected') +Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled') function buildServices (dir) { const files = fs.readdirSync(dir) diff --git a/src/common/eventDispatcher.js b/src/common/eventDispatcher.js new file mode 100644 index 00000000..495b8788 --- /dev/null +++ b/src/common/eventDispatcher.js @@ -0,0 +1,33 @@ +/* + * Implement an event dispatcher that handles events synchronously. + */ + +const handlers = [] + +/** + * Handle event. + * + * @param {String} topic the topic name + * @param {Object} payload the message payload + * @returns {undefined} + */ +async function handleEvent (topic, payload) { + for (const handler of handlers) { + await handler.handleEvent(topic, payload) + } +} + +/** + * Register to the dispatcher. + * + * @param {Object} handler the handler containing the `handleEvent` function + * @returns {undefined} + */ +function register (handler) { + handlers.push(handler) +} + +module.exports = { + handleEvent, + register +} diff --git a/src/common/helper.js b/src/common/helper.js index 07bd2e37..21f67532 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch') const errors = require('../common/errors') const logger = require('./logger') const models = require('../models') +const eventDispatcher = require('./eventDispatcher') const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper') const localLogger = { @@ -281,8 +282,9 @@ async function getUserId (userId) { * Send Kafka event message * @params {String} topic the topic name * @params {Object} payload the payload + * @params {Object} options the extra options to control the function */ -async function postEvent (topic, payload) { +async function postEvent (topic, payload, options = {}) { logger.debug({ component: 'helper', context: 'postEvent', message: `Posting event to Kafka topic ${topic}, ${JSON.stringify(payload)}` }) const client = getBusApiClient() const message = { @@ -293,6 +295,7 @@ async function postEvent (topic, payload) { payload } await client.postEvent(message) + await eventDispatcher.handleEvent(topic, { value: payload, options }) } /** @@ -603,6 +606,15 @@ async function ensureUserById (userId) { } } +/** + * Generate M2M auth user. + * + * @returns {Object} the M2M auth user + */ +function getAuditM2Muser () { + return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE } +} + /** * Function to check whether a user is a member of a project * by first retrieving the project detail via /v5/projects/:projectId and @@ -653,5 +665,6 @@ module.exports = { getUserSkill, ensureJobById, ensureUserById, + getAuditM2Muser, checkIsMemberOfProject } diff --git a/src/eventHandlers/JobEventHandler.js b/src/eventHandlers/JobEventHandler.js new file mode 100644 index 00000000..28c5f39d --- /dev/null +++ b/src/eventHandlers/JobEventHandler.js @@ -0,0 +1,94 @@ +/* + * Handle events for Job. + */ + +const { Op } = require('sequelize') +const models = require('../models') +const logger = require('../common/logger') +const helper = require('../common/helper') +const JobCandidateService = require('../services/JobCandidateService') +const ResourceBookingService = require('../services/ResourceBookingService') + +/** + * Cancel all related resource bookings and all related candidates when a job is cancelled. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function cancelJob (payload) { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'JobEventHandler', + context: 'cancelJob', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'cancelled') { + logger.debug({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `not interested job - status: ${payload.value.status}` + }) + return + } + // pull data from db instead of directly extract data from the payload + // since the payload may not contain all fields when it is from partically update operation. + const job = await models.Job.findById(payload.value.id) + const candidates = await models.JobCandidate.findAll({ + where: { + jobId: job.id, + status: { + [Op.not]: 'cancelled' + }, + deletedAt: null + } + }) + const resourceBookings = await models.ResourceBooking.findAll({ + where: { + jobId: job.id, + status: { + [Op.not]: 'cancelled' + }, + deletedAt: null + } + }) + await Promise.all([ + ...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( + helper.getAuditM2Muser(), + candidate.id, + { status: 'cancelled' } + ).then(result => { + logger.info({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `id: ${result.id} candidate got cancelled.` + }) + })), + ...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking( + helper.getAuditM2Muser(), + resource.id, + { status: 'cancelled' } + ).then(result => { + logger.info({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `id: ${result.id} resource booking got cancelled.` + }) + })) + ]) +} + +/** + * Process job update event. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function processUpdate (payload) { + await cancelJob(payload) +} + +module.exports = { + processUpdate +} diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js new file mode 100644 index 00000000..78cf211e --- /dev/null +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -0,0 +1,141 @@ +/* + * Handle events for ResourceBooking. + */ + +const { Op } = require('sequelize') +const models = require('../models') +const logger = require('../common/logger') +const helper = require('../common/helper') +const JobService = require('../services/JobService') +const JobCandidateService = require('../services/JobCandidateService') + +/** + * When ResourceBooking's status is changed to `assigned` + * the corresponding JobCandidate record (with the same userId and jobId) + * should be updated with status `selected` + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function selectJobCandidate (payload) { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'assigned') { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `not interested resource booking - status: ${payload.value.status}` + }) + return + } + const resourceBooking = await models.ResourceBooking.findById(payload.value.id) + if (!resourceBooking.jobId) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `id: ${resourceBooking.id} resource booking without jobId - ignored` + }) + return + } + const candidates = await models.JobCandidate.findAll({ + where: { + jobId: resourceBooking.jobId, + userId: resourceBooking.userId, + status: { + [Op.not]: 'selected' + }, + deletedAt: null + } + }) + await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( + helper.getAuditM2Muser(), + candidate.id, + { status: 'selected' } + ).then(result => { + logger.info({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `id: ${result.id} candidate got selected.` + }) + }))) +} + +/** + * Update the status of the Job to assigned when it positions requirement is fullfilled. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function assignJob (payload) { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'assigned') { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `not interested resource booking - status: ${payload.value.status}` + }) + return + } + const resourceBooking = await models.ResourceBooking.findById(payload.value.id) + if (!resourceBooking.jobId) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `id: ${resourceBooking.id} resource booking without jobId - ignored` + }) + return + } + const job = await models.Job.findById(resourceBooking.jobId) + if (job.status === 'assigned') { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `job with projectId ${job.projectId} is already assigned` + }) + return + } + const resourceBookings = await models.ResourceBooking.findAll({ + where: { + jobId: job.id, + status: 'assigned', + deletedAt: null + } + }) + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}` + }) + if (job.numPositions === resourceBookings.length) { + await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' }) + logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job ${job.id} is assigned` }) + } +} + +/** + * Process resource booking update event. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function processUpdate (payload) { + await selectJobCandidate(payload) + await assignJob(payload) +} + +module.exports = { + processUpdate +} diff --git a/src/eventHandlers/index.js b/src/eventHandlers/index.js new file mode 100644 index 00000000..40e44864 --- /dev/null +++ b/src/eventHandlers/index.js @@ -0,0 +1,53 @@ +/* + * The entry of event handlers. + */ + +const config = require('config') +const eventDispatcher = require('../common/eventDispatcher') +const JobEventHandler = require('./JobEventHandler') +const ResourceBookingEventHandler = require('./ResourceBookingEventHandler') +const logger = require('../common/logger') + +const TopicOperationMapping = { + [config.TAAS_JOB_UPDATE_TOPIC]: JobEventHandler.processUpdate, + [config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingEventHandler.processUpdate +} + +/** + * Handle event. + * + * @param {String} topic the topic name + * @param {Object} payload the message payload + * @returns {undefined} + */ +async function handleEvent (topic, payload) { + if (!TopicOperationMapping[topic]) { + logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` }) + return + } + logger.info({ component: 'eventHanders', context: 'handleEvent', message: `event handling - topic: ${topic}` }) + logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `handling event - topic: ${topic} - payload: ${JSON.stringify(payload)}` }) + try { + await TopicOperationMapping[topic](payload) + } catch (err) { + logger.error({ component: 'eventHanders', context: 'handleEvent', message: 'failed to handle event' }) + // throw error so that it can be handled by the app + throw err + } + logger.info({ component: 'eventHanders', context: 'handleEvent', message: `event successfully handled - topic: ${topic}` }) +} + +/** + * Attach the handlers to the event dispatcher. + * + * @returns {undefined} + */ +function init () { + eventDispatcher.register({ + handleEvent + }) +} + +module.exports = { + init +} diff --git a/src/services/JobService.js b/src/services/JobService.js index fd1e22f4..bc2c27d7 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -187,6 +187,7 @@ async function updateJob (currentUser, id, data) { await _validateSkills(data.skills) } let job = await Job.findById(id) + const oldValue = job.toJSON() const ubhanUserId = await helper.getUserId(currentUser.userId) if (!currentUser.hasManagePermission && !currentUser.isMachine) { // Check whether user can update the job. @@ -201,7 +202,7 @@ async function updateJob (currentUser, id, data) { data.updatedBy = ubhanUserId await job.update(data) - await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, { id, ...data }) + await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, { id, ...data }, { oldValue: oldValue }) job = await Job.findById(id, true) job.dataValues.candidates = _.map(job.dataValues.candidates, (c) => helper.clearObject(c.dataValues)) return helper.clearObject(job.dataValues) diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 21a9bfd4..19d9d0f7 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -12,7 +12,6 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') -const JobCandidateService = require('./JobCandidateService') const ResourceBooking = models.ResourceBooking const esClient = helper.getESClient() @@ -141,38 +140,13 @@ async function updateResourceBooking (currentUser, id, data) { } const resourceBooking = await ResourceBooking.findById(id) - const isDiffStatus = resourceBooking.status !== data.status + const oldValue = resourceBooking.toJSON() + data.updatedAt = new Date() data.updatedBy = await helper.getUserId(currentUser.userId) - const updatedResourceBooking = await resourceBooking.update(data) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, { id, ...data }) - // When we are updating the status of ResourceBooking to `assigned` - // the corresponding JobCandidate record (with the same userId and jobId) - // should be updated with the status `selected` - if (isDiffStatus && data.status === 'assigned') { - const candidates = await models.JobCandidate.findAll({ - where: { - jobId: updatedResourceBooking.jobId, - userId: updatedResourceBooking.userId, - status: { - [Op.not]: 'selected' - }, - deletedAt: null - } - }) - await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( - currentUser, - candidate.id, - { status: 'selected' } - ).then(result => { - logger.debug({ - component: 'ResourceBookingService', - context: 'updatedResourceBooking', - message: `id: ${result.id} candidate got selected.` - }) - }))) - } + await resourceBooking.update(data) + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, { id, ...data }, { oldValue: oldValue }) const result = helper.clearObject(_.assign(resourceBooking.dataValues, data)) return result }