Skip to content

[re-created]implement local event handling #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
2 changes: 2 additions & 0 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const cors = require('cors')
const HttpStatus = require('http-status-codes')
const interceptor = require('express-interceptor')
const logger = require('./src/common/logger')
const eventHandlers = require('./src/eventHandlers')

// setup express app
const app = express()
Expand Down Expand Up @@ -91,6 +92,7 @@ app.use((err, req, res, next) => {

const server = app.listen(app.get('port'), () => {
logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` })
eventHandlers.init()
})

if (process.env.NODE_ENV === 'test') {
Expand Down
14 changes: 7 additions & 7 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ paths:
required: false
schema:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: The user id.
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: The job candidate status.
responses:
'200':
description: OK
Expand Down Expand Up @@ -1776,8 +1776,8 @@ components:
description: "The user id."
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: "The job status."
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: "The job candidate status."
createdAt:
type: string
format: date-time
Expand Down Expand Up @@ -1812,7 +1812,7 @@ components:
properties:
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
JobPatchRequestBody:
properties:
status:
Expand Down Expand Up @@ -2253,8 +2253,8 @@ components:
description: 'The link for the resume that can be downloaded'
status:
type: string
enum: ['open', 'selected', 'shortlist', 'rejected']
description: "The job status."
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
description: "The job candidate status."
skills:
type: array
items:
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Joi.perPage = () => Joi.number().integer().min(1).default(20)
Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly')
Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled')
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled')

function buildServices (dir) {
const files = fs.readdirSync(dir)
Expand Down
33 changes: 33 additions & 0 deletions src/common/eventDispatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Implement an event dispatcher that handles events synchronously.
*/

const handlers = []

/**
* Handle event.
*
* @param {String} topic the topic name
* @param {Object} payload the message payload
* @returns {undefined}
*/
async function handleEvent (topic, payload) {
for (const handler of handlers) {
await handler.handleEvent(topic, payload)
}
}

/**
* Register to the dispatcher.
*
* @param {Object} handler the handler containing the `handleEvent` function
* @returns {undefined}
*/
function register (handler) {
handlers.push(handler)
}

module.exports = {
handleEvent,
register
}
17 changes: 15 additions & 2 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch')
const errors = require('../common/errors')
const logger = require('./logger')
const models = require('../models')
const eventDispatcher = require('./eventDispatcher')
const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper')

const localLogger = {
Expand Down Expand Up @@ -281,8 +282,9 @@ async function getUserId (userId) {
* Send Kafka event message
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {Object} options the extra options to control the function
*/
async function postEvent (topic, payload) {
async function postEvent (topic, payload, options = {}) {
logger.debug({ component: 'helper', context: 'postEvent', message: `Posting event to Kafka topic ${topic}, ${JSON.stringify(payload)}` })
const client = getBusApiClient()
const message = {
Expand All @@ -293,6 +295,7 @@ async function postEvent (topic, payload) {
payload
}
await client.postEvent(message)
await eventDispatcher.handleEvent(topic, { value: payload, options })
}

/**
Expand Down Expand Up @@ -603,6 +606,15 @@ async function ensureUserById (userId) {
}
}

/**
* Generate M2M auth user.
*
* @returns {Object} the M2M auth user
*/
function getAuditM2Muser () {
return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE }
}

module.exports = {
checkIfExists,
autoWrapExpress,
Expand All @@ -629,5 +641,6 @@ module.exports = {
getSkillById,
getUserSkill,
ensureJobById,
ensureUserById
ensureUserById,
getAuditM2Muser
}
94 changes: 94 additions & 0 deletions src/eventHandlers/JobEventHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Handle events for Job.
*/

const { Op } = require('sequelize')
const models = require('../models')
const logger = require('../common/logger')
const helper = require('../common/helper')
const JobCandidateService = require('../services/JobCandidateService')
const ResourceBookingService = require('../services/ResourceBookingService')

/**
* Cancel all related resource bookings and all related candidates when a job is cancelled.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function cancelJob (payload) {
if (payload.value.status === payload.options.oldValue.status) {
logger.debug({
component: 'JobEventHandler',
context: 'cancelJob',
message: 'status not changed'
})
return
}
if (payload.value.status !== 'cancelled') {
logger.debug({
component: 'JobEventHandler',
context: 'cancelJob',
message: `not interested job - status: ${payload.value.status}`
})
return
}
// pull data from db instead of directly extract data from the payload
// since the payload may not contain all fields when it is from partically update operation.
const job = await models.Job.findById(payload.value.id)
const candidates = await models.JobCandidate.findAll({
where: {
jobId: job.id,
status: {
[Op.not]: 'cancelled'
},
deletedAt: null
}
})
const resourceBookings = await models.ResourceBooking.findAll({
where: {
projectId: job.projectId,
status: {
[Op.not]: 'cancelled'
},
deletedAt: null
}
})
await Promise.all([
...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
helper.getAuditM2Muser(),
candidate.id,
{ status: 'cancelled' }
).then(result => {
logger.info({
component: 'JobEventHandler',
context: 'cancelJob',
message: `id: ${result.id} candidate got cancelled.`
})
})),
...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking(
helper.getAuditM2Muser(),
resource.id,
{ status: 'cancelled' }
).then(result => {
logger.info({
component: 'JobEventHandler',
context: 'cancelJob',
message: `id: ${result.id} resource booking got cancelled.`
})
}))
])
}

/**
* Process job update event.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function processUpdate (payload) {
await cancelJob(payload)
}

module.exports = {
processUpdate
}
117 changes: 117 additions & 0 deletions src/eventHandlers/ResourceBookingEventHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Handle events for ResourceBooking.
*/

const { Op } = require('sequelize')
const models = require('../models')
const logger = require('../common/logger')
const helper = require('../common/helper')
const JobService = require('../services/JobService')
const JobCandidateService = require('../services/JobCandidateService')

/**
* When ResourceBooking's status is changed to `assigned`
* the corresponding JobCandidate record (with the same userId and jobId)
* should be updated with status `selected`
*
* @param {String} jobId the job id
* @param {String} userId the user id
* @returns {undefined}
*/
async function selectJobCandidate (jobId, userId) {
const candidates = await models.JobCandidate.findAll({
where: {
jobId,
userId,
status: {
[Op.not]: 'selected'
},
deletedAt: null
}
})
await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
helper.getAuditM2Muser(),
candidate.id,
{ status: 'selected' }
).then(result => {
logger.info({
component: 'ResourceBookingEventHandler',
context: 'selectJobCandidate',
message: `id: ${result.id} candidate got selected.`
})
})))
}

/**
* Update the status of the Job to assigned when it positions requirement is fullfilled.
*
* @param {String} jobId the job id
* @returns {undefined}
*/
async function assignJob (jobId) {
const job = await models.Job.findById(jobId)
if (job.status === 'assigned') {
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'assignJob',
message: `job with projectId ${job.projectId} is already assigned`
})
return
}
const resourceBookings = await models.ResourceBooking.findAll({
where: {
jobId: job.id,
status: 'assigned',
deletedAt: null
}
})
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'assignJob',
message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}`
})
if (job.numPositions === resourceBookings.length) {
await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' })
logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job ${job.id} is assigned` })
}
}

/**
* Process resource booking update event.
*
* @param {Object} payload the event payload
* @returns {undefined}
*/
async function processUpdate (payload) {
if (payload.value.status === payload.options.oldValue.status) {
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'processUpdate',
message: 'status not changed'
})
return
}
if (payload.value.status !== 'assigned') {
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'processUpdate',
message: `not interested resource booking - status: ${payload.value.status}`
})
return
}
const resourceBooking = await models.ResourceBooking.findById(payload.value.id)
if (!resourceBooking.jobId) {
logger.debug({
component: 'ResourceBookingEventHandler',
context: 'processUpdate',
message: `id: ${resourceBooking.id} resource booking without jobId - ignored`
})
return
}
await selectJobCandidate(resourceBooking.jobId, resourceBooking.userId)
await assignJob(resourceBooking.jobId)
}

module.exports = {
processUpdate
}
Loading