From c208671c64c7f86de89e35f1d02243f4bf778398 Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Tue, 29 Dec 2020 01:46:17 +0800 Subject: [PATCH 1/6] implement event handling --- app.js | 2 + docs/swagger.yaml | 14 +-- src/bootstrap.js | 2 +- src/common/eventDispatcher.js | 33 ++++++ src/common/helper.js | 14 ++- src/eventHandlers/JobEventHandler.js | 86 ++++++++++++++ .../ResourceBookingEventHandler.js | 105 ++++++++++++++++++ src/eventHandlers/index.js | 52 +++++++++ src/services/ResourceBookingService.js | 33 +----- 9 files changed, 303 insertions(+), 38 deletions(-) create mode 100644 src/common/eventDispatcher.js create mode 100644 src/eventHandlers/JobEventHandler.js create mode 100644 src/eventHandlers/ResourceBookingEventHandler.js create mode 100644 src/eventHandlers/index.js diff --git a/app.js b/app.js index f60a0565..56521d9a 100644 --- a/app.js +++ b/app.js @@ -10,6 +10,7 @@ const cors = require('cors') const HttpStatus = require('http-status-codes') const interceptor = require('express-interceptor') const logger = require('./src/common/logger') +const eventHandlers = require('./src/eventHandlers') // setup express app const app = express() @@ -91,6 +92,7 @@ app.use((err, req, res, next) => { const server = app.listen(app.get('port'), () => { logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` }) + eventHandlers.init() }) if (process.env.NODE_ENV === 'test') { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index e5d52f0a..ea2b5b88 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -575,8 +575,8 @@ paths: required: false schema: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: The user id. + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: The job candidate status. responses: '200': description: OK @@ -1686,8 +1686,8 @@ components: description: "The user id." status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: "The job status." + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: "The job candidate status." createdAt: type: string format: date-time @@ -1722,7 +1722,7 @@ components: properties: status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] JobPatchRequestBody: properties: status: @@ -2130,8 +2130,8 @@ components: description: 'The link for the resume that can be downloaded' status: type: string - enum: ['open', 'selected', 'shortlist', 'rejected'] - description: "The job status." + enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled'] + description: "The job candidate status." skills: type: array items: diff --git a/src/bootstrap.js b/src/bootstrap.js index a35212e0..c051b045 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -8,7 +8,7 @@ Joi.perPage = () => Joi.number().integer().min(1).default(20) Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled') Joi.workload = () => Joi.string().valid('full-time', 'fractional') -Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected') +Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled') function buildServices (dir) { const files = fs.readdirSync(dir) diff --git a/src/common/eventDispatcher.js b/src/common/eventDispatcher.js new file mode 100644 index 00000000..495b8788 --- /dev/null +++ b/src/common/eventDispatcher.js @@ -0,0 +1,33 @@ +/* + * Implement an event dispatcher that handles events synchronously. + */ + +const handlers = [] + +/** + * Handle event. + * + * @param {String} topic the topic name + * @param {Object} payload the message payload + * @returns {undefined} + */ +async function handleEvent (topic, payload) { + for (const handler of handlers) { + await handler.handleEvent(topic, payload) + } +} + +/** + * Register to the dispatcher. + * + * @param {Object} handler the handler containing the `handleEvent` function + * @returns {undefined} + */ +function register (handler) { + handlers.push(handler) +} + +module.exports = { + handleEvent, + register +} diff --git a/src/common/helper.js b/src/common/helper.js index 715d9981..1eb6776b 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch') const errors = require('../common/errors') const logger = require('./logger') const models = require('../models') +const eventDispatcher = require('./eventDispatcher') const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper') const localLogger = { @@ -313,6 +314,7 @@ async function postEvent (topic, payload) { payload } await client.postEvent(message) + await eventDispatcher.handleEvent(topic, payload) } /** @@ -589,6 +591,15 @@ async function ensureUserById (userId) { } } +/** + * Generate M2M auth user. + * + * @returns {Object} the M2M auth user + */ +function getAuditM2Muser () { + return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE } +} + module.exports = { checkIfExists, autoWrapExpress, @@ -615,5 +626,6 @@ module.exports = { getSkillById, getUserSkill, ensureJobById, - ensureUserById + ensureUserById, + getAuditM2Muser } diff --git a/src/eventHandlers/JobEventHandler.js b/src/eventHandlers/JobEventHandler.js new file mode 100644 index 00000000..715f58a5 --- /dev/null +++ b/src/eventHandlers/JobEventHandler.js @@ -0,0 +1,86 @@ +/* + * Handle events for Job. + */ + +const { Op } = require('sequelize') +const models = require('../models') +const logger = require('../common/logger') +const helper = require('../common/helper') +const JobCandidateService = require('../services/JobCandidateService') +const ResourceBookingService = require('../services/ResourceBookingService') + +/** + * Cancel all related resource bookings and all related candidates when a job is cancelled. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function cancelJob (payload) { + if (payload.status !== 'cancelled') { + logger.info({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `not interested job - status: ${payload.status}` + }) + return + } + // pull data from db instead of directly extract data from the payload + // since the payload may not contain all fields when it is from partically update operation. + const job = await models.Job.findById(payload.id) + const candidates = await models.JobCandidate.findAll({ + where: { + jobId: job.id, + status: { + [Op.not]: 'cancelled' + }, + deletedAt: null + } + }) + const resourceBookings = await models.ResourceBooking.findAll({ + where: { + projectId: job.projectId, + status: { + [Op.not]: 'cancelled' + }, + deletedAt: null + } + }) + await Promise.all([ + ...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( + helper.getAuditM2Muser(), + candidate.id, + { status: 'cancelled' } + ).then(result => { + logger.info({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `id: ${result.id} candidate got cancelled.` + }) + })), + ...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking( + helper.getAuditM2Muser(), + resource.id, + { status: 'cancelled' } + ).then(result => { + logger.info({ + component: 'JobEventHandler', + context: 'cancelJob', + message: `id: ${result.id} resource booking got cancelled.` + }) + })) + ]) +} + +/** + * Process job update event. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function processUpdate (payload) { + await cancelJob(payload) +} + +module.exports = { + processUpdate +} diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js new file mode 100644 index 00000000..12d96573 --- /dev/null +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -0,0 +1,105 @@ +/* + * Handle events for ResourceBooking. + */ + +const { Op } = require('sequelize') +const models = require('../models') +const logger = require('../common/logger') +const helper = require('../common/helper') +const JobService = require('../services/JobService') +const JobCandidateService = require('../services/JobCandidateService') + +/** + * When ResourceBooking's status is changed to `assigned` + * the corresponding JobCandidate record (with the same userId and jobId) + * should be updated with status `selected` + * + * @param {String} jobId the job id + * @param {String} userId the user id + * @returns {undefined} + */ +async function selectJobCandidate (jobId, userId) { + const candidates = await models.JobCandidate.findAll({ + where: { + jobId, + userId, + status: { + [Op.not]: 'selected' + }, + deletedAt: null + } + }) + await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( + helper.getAuditM2Muser(), + candidate.id, + { status: 'selected' } + ).then(result => { + logger.info({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `id: ${result.id} candidate got selected.` + }) + }))) +} + +/** + * Update the status of the Job to assigned when it positions requirement is fullfilled. + * + * @param {Object} job the job data + * @returns {undefined} + */ +async function assignJob (job) { + if (job.status === 'assigned') { + logger.info({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `job with projectId ${job.projectId} is already assigned` + }) + return + } + const resourceBookings = await models.ResourceBooking.findAll({ + where: { + status: 'assigned', + deletedAt: null + } + }) + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}` + }) + if (job.numPositions === resourceBookings.length) { + await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' }) + logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job with projectId ${job.projectId} is assigned` }) + } +} + +/** + * Process resource booking update event. + * + * @param {Object} payload the event payload + * @returns {undefined} + */ +async function processUpdate (payload) { + if (payload.status !== 'assigned') { + logger.info({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `not interested resource booking - status: ${payload.status}` + }) + return + } + const resourceBooking = await models.ResourceBooking.findById(payload.id) + const job = await models.Job.findOne({ + where: { + projectId: resourceBooking.projectId, + deletedAt: null + } + }) + await selectJobCandidate(job.id, resourceBooking.userId) + await assignJob(job) +} + +module.exports = { + processUpdate +} diff --git a/src/eventHandlers/index.js b/src/eventHandlers/index.js new file mode 100644 index 00000000..e9835d03 --- /dev/null +++ b/src/eventHandlers/index.js @@ -0,0 +1,52 @@ +/* + * The entry of event handlers. + */ + +const config = require('config') +const eventDispatcher = require('../common/eventDispatcher') +const JobEventHandler = require('./JobEventHandler') +const ResourceBookingEventHandler = require('./ResourceBookingEventHandler') +const logger = require('../common/logger') + +const TopicOperationMapping = { + [config.TAAS_JOB_UPDATE_TOPIC]: JobEventHandler.processUpdate, + [config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingEventHandler.processUpdate +} + +/** + * Handle event. + * + * @param {String} topic the topic name + * @param {Object} payload the message payload + * @returns {undefined} + */ +async function handleEvent (topic, payload) { + if (!TopicOperationMapping[topic]) { + logger.info({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` }) + return + } + logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `handling event - topic: ${topic} - payload: ${JSON.stringify(payload)}` }) + try { + await TopicOperationMapping[topic](payload) + } catch (err) { + logger.error({ component: 'eventHanders', context: 'handleEvent', message: 'failed to handle event' }) + // throw error so that it can be handled by the app + throw err + } + logger.info({ component: 'eventHanders', context: 'handleEvent', message: 'event successfully handled' }) +} + +/** + * Attach the handlers to the event dispatcher. + * + * @returns {undefined} + */ +function init () { + eventDispatcher.register({ + handleEvent + }) +} + +module.exports = { + init +} diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 6cc739aa..accf7bd2 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -11,7 +11,6 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') -const JobCandidateService = require('./JobCandidateService') const ResourceBooking = models.ResourceBooking const esClient = helper.getESClient() @@ -118,6 +117,7 @@ createResourceBooking.schema = Joi.object().keys({ async function updateResourceBooking (currentUser, id, data) { const resourceBooking = await ResourceBooking.findById(id) const isDiffStatus = resourceBooking.status !== data.status + if (!currentUser.isBookingManager && !currentUser.isMachine) { const connect = await helper.isConnectMember(resourceBooking.dataValues.projectId, currentUser.jwtToken) if (!connect) { @@ -127,34 +127,9 @@ async function updateResourceBooking (currentUser, id, data) { data.updatedAt = new Date() data.updatedBy = await helper.getUserId(currentUser.userId) - const updatedResourceBooking = await resourceBooking.update(data) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, { id, ...data }) - // When we are updating the status of ResourceBooking to `assigned` - // the corresponding JobCandidate record (with the same userId and jobId) - // should be updated with the status `selected` - if (isDiffStatus && data.status === 'assigned') { - const candidates = await models.JobCandidate.findAll({ - where: { - jobId: updatedResourceBooking.jobId, - userId: updatedResourceBooking.userId, - status: { - [Op.not]: 'selected' - }, - deletedAt: null - } - }) - await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate( - currentUser, - candidate.id, - { status: 'selected' } - ).then(result => { - logger.debug({ - component: 'ResourceBookingService', - context: 'updatedResourceBooking', - message: `id: ${result.id} candidate got selected.` - }) - }))) - } + await resourceBooking.update(data) + const eventPayload = { id, ...(isDiffStatus ? data : _.omit(data, ['status'])) } // send data with status only if status is changed + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, eventPayload) const result = helper.clearObject(_.assign(resourceBooking.dataValues, data)) return result } From b64ac863bef02dc579f1d42c5a37af5083e00370 Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Thu, 31 Dec 2020 15:33:18 +0800 Subject: [PATCH 2/6] fix inconsistence by interating all jobs found filtering by projectId --- src/eventHandlers/ResourceBookingEventHandler.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js index 12d96573..04eda6b3 100644 --- a/src/eventHandlers/ResourceBookingEventHandler.js +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -84,20 +84,22 @@ async function processUpdate (payload) { if (payload.status !== 'assigned') { logger.info({ component: 'ResourceBookingEventHandler', - context: 'selectJobCandidate', + context: 'processUpdate', message: `not interested resource booking - status: ${payload.status}` }) return } const resourceBooking = await models.ResourceBooking.findById(payload.id) - const job = await models.Job.findOne({ + const jobs = await models.Job.findAll({ where: { projectId: resourceBooking.projectId, deletedAt: null } }) - await selectJobCandidate(job.id, resourceBooking.userId) - await assignJob(job) + for (const job of jobs) { + await selectJobCandidate(job.id, resourceBooking.userId) + await assignJob(job) + } } module.exports = { From 1041679899cc4ed113865edfcaf91d0a621c94ae Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Thu, 31 Dec 2020 16:14:32 +0800 Subject: [PATCH 3/6] enhancement: encapsulate old value to the event payload as well --- src/common/helper.js | 5 +++-- src/eventHandlers/JobEventHandler.js | 14 +++++++++++--- src/eventHandlers/ResourceBookingEventHandler.js | 14 +++++++++++--- src/services/JobService.js | 3 ++- src/services/ResourceBookingService.js | 5 ++--- 5 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index 7b06d140..25cfb609 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -282,8 +282,9 @@ async function getUserId (userId) { * Send Kafka event message * @params {String} topic the topic name * @params {Object} payload the payload + * @params {Object} options the extra options to control the function */ -async function postEvent (topic, payload) { +async function postEvent (topic, payload, options = {}) { logger.debug({ component: 'helper', context: 'postEvent', message: `Posting event to Kafka topic ${topic}, ${JSON.stringify(payload)}` }) const client = getBusApiClient() const message = { @@ -294,7 +295,7 @@ async function postEvent (topic, payload) { payload } await client.postEvent(message) - await eventDispatcher.handleEvent(topic, payload) + await eventDispatcher.handleEvent(topic, { value: payload, options }) } /** diff --git a/src/eventHandlers/JobEventHandler.js b/src/eventHandlers/JobEventHandler.js index 715f58a5..cf5be168 100644 --- a/src/eventHandlers/JobEventHandler.js +++ b/src/eventHandlers/JobEventHandler.js @@ -16,17 +16,25 @@ const ResourceBookingService = require('../services/ResourceBookingService') * @returns {undefined} */ async function cancelJob (payload) { - if (payload.status !== 'cancelled') { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'JobEventHandler', + context: 'cancelJob', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'cancelled') { logger.info({ component: 'JobEventHandler', context: 'cancelJob', - message: `not interested job - status: ${payload.status}` + message: `not interested job - status: ${payload.value.status}` }) return } // pull data from db instead of directly extract data from the payload // since the payload may not contain all fields when it is from partically update operation. - const job = await models.Job.findById(payload.id) + const job = await models.Job.findById(payload.value.id) const candidates = await models.JobCandidate.findAll({ where: { jobId: job.id, diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js index 04eda6b3..d036a1cf 100644 --- a/src/eventHandlers/ResourceBookingEventHandler.js +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -81,15 +81,23 @@ async function assignJob (job) { * @returns {undefined} */ async function processUpdate (payload) { - if (payload.status !== 'assigned') { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'processUpdate', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'assigned') { logger.info({ component: 'ResourceBookingEventHandler', context: 'processUpdate', - message: `not interested resource booking - status: ${payload.status}` + message: `not interested resource booking - status: ${payload.value.status}` }) return } - const resourceBooking = await models.ResourceBooking.findById(payload.id) + const resourceBooking = await models.ResourceBooking.findById(payload.value.id) const jobs = await models.Job.findAll({ where: { projectId: resourceBooking.projectId, diff --git a/src/services/JobService.js b/src/services/JobService.js index bc131ee0..617bd149 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -192,6 +192,7 @@ async function updateJob (currentUser, id, data) { await _validateSkills(data.skills) } let job = await Job.findById(id) + const oldValue = job.toJSON() const ubhanUserId = await helper.getUserId(currentUser.userId) if (!currentUser.hasManagePermission && !currentUser.isMachine) { if (currentUser.isConnectManager) { @@ -209,7 +210,7 @@ async function updateJob (currentUser, id, data) { data.updatedBy = ubhanUserId await job.update(data) - await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, { id, ...data }) + await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, { id, ...data }, { oldValue: oldValue }) job = await Job.findById(id, true) job.dataValues.candidates = _.map(job.dataValues.candidates, (c) => helper.clearObject(c.dataValues)) return helper.clearObject(job.dataValues) diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 6e769016..d10c238f 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -142,14 +142,13 @@ async function updateResourceBooking (currentUser, id, data) { } const resourceBooking = await ResourceBooking.findById(id) - const isDiffStatus = resourceBooking.status !== data.status + const oldValue = resourceBooking.toJSON() data.updatedAt = new Date() data.updatedBy = await helper.getUserId(currentUser.userId) await resourceBooking.update(data) - const eventPayload = { id, ...(isDiffStatus ? data : _.omit(data, ['status'])) } // send data with status only if status is changed - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, eventPayload) + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, { id, ...data }, { oldValue: oldValue }) const result = helper.clearObject(_.assign(resourceBooking.dataValues, data)) return result } From 0c093c475bc7f5728b6fd993092dcdc0b1771e48 Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Mon, 4 Jan 2021 03:13:23 +0800 Subject: [PATCH 4/6] fix: use jobId instead of projectId to find job --- src/eventHandlers/JobEventHandler.js | 2 +- .../ResourceBookingEventHandler.js | 30 ++++++++++--------- src/eventHandlers/index.js | 5 ++-- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/eventHandlers/JobEventHandler.js b/src/eventHandlers/JobEventHandler.js index cf5be168..0e85f278 100644 --- a/src/eventHandlers/JobEventHandler.js +++ b/src/eventHandlers/JobEventHandler.js @@ -25,7 +25,7 @@ async function cancelJob (payload) { return } if (payload.value.status !== 'cancelled') { - logger.info({ + logger.debug({ component: 'JobEventHandler', context: 'cancelJob', message: `not interested job - status: ${payload.value.status}` diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js index d036a1cf..19296aee 100644 --- a/src/eventHandlers/ResourceBookingEventHandler.js +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -45,12 +45,13 @@ async function selectJobCandidate (jobId, userId) { /** * Update the status of the Job to assigned when it positions requirement is fullfilled. * - * @param {Object} job the job data + * @param {String} jobId the job id * @returns {undefined} */ -async function assignJob (job) { +async function assignJob (jobId) { + const job = await models.Job.findById(jobId) if (job.status === 'assigned') { - logger.info({ + logger.debug({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job with projectId ${job.projectId} is already assigned` @@ -59,6 +60,7 @@ async function assignJob (job) { } const resourceBookings = await models.ResourceBooking.findAll({ where: { + jobId: job.id, status: 'assigned', deletedAt: null } @@ -70,7 +72,7 @@ async function assignJob (job) { }) if (job.numPositions === resourceBookings.length) { await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' }) - logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job with projectId ${job.projectId} is assigned` }) + logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job ${job.id} is assigned` }) } } @@ -90,7 +92,7 @@ async function processUpdate (payload) { return } if (payload.value.status !== 'assigned') { - logger.info({ + logger.debug({ component: 'ResourceBookingEventHandler', context: 'processUpdate', message: `not interested resource booking - status: ${payload.value.status}` @@ -98,16 +100,16 @@ async function processUpdate (payload) { return } const resourceBooking = await models.ResourceBooking.findById(payload.value.id) - const jobs = await models.Job.findAll({ - where: { - projectId: resourceBooking.projectId, - deletedAt: null - } - }) - for (const job of jobs) { - await selectJobCandidate(job.id, resourceBooking.userId) - await assignJob(job) + if (!resourceBooking.jobId) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'processUpdate', + message: `id: ${resourceBooking.id} resource booking without jobId - ignored` + }) + return } + await selectJobCandidate(resourceBooking.jobId, resourceBooking.userId) + await assignJob(resourceBooking.jobId) } module.exports = { diff --git a/src/eventHandlers/index.js b/src/eventHandlers/index.js index e9835d03..40e44864 100644 --- a/src/eventHandlers/index.js +++ b/src/eventHandlers/index.js @@ -22,9 +22,10 @@ const TopicOperationMapping = { */ async function handleEvent (topic, payload) { if (!TopicOperationMapping[topic]) { - logger.info({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` }) + logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` }) return } + logger.info({ component: 'eventHanders', context: 'handleEvent', message: `event handling - topic: ${topic}` }) logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `handling event - topic: ${topic} - payload: ${JSON.stringify(payload)}` }) try { await TopicOperationMapping[topic](payload) @@ -33,7 +34,7 @@ async function handleEvent (topic, payload) { // throw error so that it can be handled by the app throw err } - logger.info({ component: 'eventHanders', context: 'handleEvent', message: 'event successfully handled' }) + logger.info({ component: 'eventHanders', context: 'handleEvent', message: `event successfully handled - topic: ${topic}` }) } /** From 6d8dfff94ed79da4960c02a5b67206f16147d267 Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Mon, 4 Jan 2021 20:17:49 +0800 Subject: [PATCH 5/6] decouple functions assignJob and selectJobCandidate to increase maintainability --- src/common/helper.js | 2 +- .../ResourceBookingEventHandler.js | 94 ++++++++++++------- 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index 9fc62e81..21f67532 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -667,4 +667,4 @@ module.exports = { ensureUserById, getAuditM2Muser, checkIsMemberOfProject -} \ No newline at end of file +} diff --git a/src/eventHandlers/ResourceBookingEventHandler.js b/src/eventHandlers/ResourceBookingEventHandler.js index 19296aee..78cf211e 100644 --- a/src/eventHandlers/ResourceBookingEventHandler.js +++ b/src/eventHandlers/ResourceBookingEventHandler.js @@ -14,15 +14,39 @@ const JobCandidateService = require('../services/JobCandidateService') * the corresponding JobCandidate record (with the same userId and jobId) * should be updated with status `selected` * - * @param {String} jobId the job id - * @param {String} userId the user id + * @param {Object} payload the event payload * @returns {undefined} */ -async function selectJobCandidate (jobId, userId) { +async function selectJobCandidate (payload) { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'assigned') { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `not interested resource booking - status: ${payload.value.status}` + }) + return + } + const resourceBooking = await models.ResourceBooking.findById(payload.value.id) + if (!resourceBooking.jobId) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'selectJobCandidate', + message: `id: ${resourceBooking.id} resource booking without jobId - ignored` + }) + return + } const candidates = await models.JobCandidate.findAll({ where: { - jobId, - userId, + jobId: resourceBooking.jobId, + userId: resourceBooking.userId, status: { [Op.not]: 'selected' }, @@ -45,11 +69,36 @@ async function selectJobCandidate (jobId, userId) { /** * Update the status of the Job to assigned when it positions requirement is fullfilled. * - * @param {String} jobId the job id + * @param {Object} payload the event payload * @returns {undefined} */ -async function assignJob (jobId) { - const job = await models.Job.findById(jobId) +async function assignJob (payload) { + if (payload.value.status === payload.options.oldValue.status) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: 'status not changed' + }) + return + } + if (payload.value.status !== 'assigned') { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `not interested resource booking - status: ${payload.value.status}` + }) + return + } + const resourceBooking = await models.ResourceBooking.findById(payload.value.id) + if (!resourceBooking.jobId) { + logger.debug({ + component: 'ResourceBookingEventHandler', + context: 'assignJob', + message: `id: ${resourceBooking.id} resource booking without jobId - ignored` + }) + return + } + const job = await models.Job.findById(resourceBooking.jobId) if (job.status === 'assigned') { logger.debug({ component: 'ResourceBookingEventHandler', @@ -83,33 +132,8 @@ async function assignJob (jobId) { * @returns {undefined} */ async function processUpdate (payload) { - if (payload.value.status === payload.options.oldValue.status) { - logger.debug({ - component: 'ResourceBookingEventHandler', - context: 'processUpdate', - message: 'status not changed' - }) - return - } - if (payload.value.status !== 'assigned') { - logger.debug({ - component: 'ResourceBookingEventHandler', - context: 'processUpdate', - message: `not interested resource booking - status: ${payload.value.status}` - }) - return - } - const resourceBooking = await models.ResourceBooking.findById(payload.value.id) - if (!resourceBooking.jobId) { - logger.debug({ - component: 'ResourceBookingEventHandler', - context: 'processUpdate', - message: `id: ${resourceBooking.id} resource booking without jobId - ignored` - }) - return - } - await selectJobCandidate(resourceBooking.jobId, resourceBooking.userId) - await assignJob(resourceBooking.jobId) + await selectJobCandidate(payload) + await assignJob(payload) } module.exports = { From 2d140883f1663ea8af4629fd45cea3281dc19cff Mon Sep 17 00:00:00 2001 From: imcaizheng Date: Mon, 4 Jan 2021 20:53:40 +0800 Subject: [PATCH 6/6] find resourceBooking by jobId when cancelJob --- src/eventHandlers/JobEventHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eventHandlers/JobEventHandler.js b/src/eventHandlers/JobEventHandler.js index 0e85f278..28c5f39d 100644 --- a/src/eventHandlers/JobEventHandler.js +++ b/src/eventHandlers/JobEventHandler.js @@ -46,7 +46,7 @@ async function cancelJob (payload) { }) const resourceBookings = await models.ResourceBooking.findAll({ where: { - projectId: job.projectId, + jobId: job.id, status: { [Op.not]: 'cancelled' },