diff --git a/src/esProcessors/InterviewProcessor.js b/src/esProcessors/InterviewProcessor.js new file mode 100644 index 00000000..9334f04c --- /dev/null +++ b/src/esProcessors/InterviewProcessor.js @@ -0,0 +1,127 @@ +/** + * Interview Processor + */ + +const _ = require('lodash') +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + */ +async function updateJobCandidateViaScript (jobCandidateId, script) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + body: { script }, + refresh: 'wait_for' + }) +} + +/** + * Process request interview entity. + * Creates an interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processRequestInterview (interview) { + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process update interview entity + * Updates the interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processUpdateInterview (interview) { + // if there's an interview with this id, + // update it + const script = { + source: ` + if (ctx._source.containsKey("interviews")) { + def target = ctx._source.interviews.find(i -> i.id == params.interview.id); + if (target != null) { + for (prop in params.interview.entrySet()) { + target[prop.getKey()] = prop.getValue() + } + } + } + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process bulk (partially) update interviews entity. + * Currently supports status, updatedAt and updatedBy fields. + * Update Joi schema to allow more fields. + * (implementation should already handle new fields - just updating Joi schema should be enough) + * + * payload format: + * { + * "jobCandidateId": { + * "interviewId": { ...fields }, + * "interviewId2": { ...fields }, + * ... + * }, + * "jobCandidateId2": { // like above... }, + * ... + * } + * + * @param {Object} jobCandidates job candidates + */ +async function processBulkUpdateInterviews (jobCandidates) { + // script to update & params + const script = { + source: ` + def completedInterviews = params.jobCandidates[ctx._id]; + for (interview in completedInterviews.entrySet()) { + def interviewId = interview.getKey(); + def affectedFields = interview.getValue(); + def target = ctx._source.interviews.find(i -> i.id == interviewId); + if (target != null) { + for (field in affectedFields.entrySet()) { + target[field.getKey()] = field.getValue(); + } + } + } + `, + params: { jobCandidates } + } + // update interviews + await esClient.updateByQuery({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + body: { + script, + query: { + ids: { + values: _.keys(jobCandidates) + } + } + }, + refresh: true + }) +} + +module.exports = { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} diff --git a/src/esProcessors/JobCandidateProcessor.js b/src/esProcessors/JobCandidateProcessor.js new file mode 100644 index 00000000..8e82869b --- /dev/null +++ b/src/esProcessors/JobCandidateProcessor.js @@ -0,0 +1,54 @@ +/** + * Jobcandidate Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/JobProcessor.js b/src/esProcessors/JobProcessor.js new file mode 100644 index 00000000..00db929c --- /dev/null +++ b/src/esProcessors/JobProcessor.js @@ -0,0 +1,54 @@ +/** + * Job Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/RoleProcessor.js b/src/esProcessors/RoleProcessor.js new file mode 100644 index 00000000..22f819bf --- /dev/null +++ b/src/esProcessors/RoleProcessor.js @@ -0,0 +1,54 @@ +/** + * Role Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/services/InterviewService.js b/src/services/InterviewService.js index 3ddb1a3d..4ed04c13 100644 --- a/src/services/InterviewService.js +++ b/src/services/InterviewService.js @@ -13,7 +13,15 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') - +const { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} = require('../esProcessors/InterviewProcessor') +const { + processUpdate: jobCandidateProcessUpdate +} = require('../esProcessors/JobCandidateProcessor') +const sequelize = models.sequelize const Interview = models.Interview const esClient = helper.getESClient() @@ -245,25 +253,35 @@ async function requestInterview (currentUser, jobCandidateId, interview) { return (foundGuestMember !== undefined) ? `${foundGuestMember.firstName} ${foundGuestMember.lastName}` : guestEmail.split('@')[0] }) + let entity + let jobCandidateEntity try { - // create the interview - const created = await Interview.create(interview) - await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, created.toJSON()) - // update jobCandidate.status to Interview - const [, affectedRows] = await models.JobCandidate.update( - { status: 'interview' }, - { where: { id: created.jobCandidateId }, returning: true } - ) - const updatedJobCandidate = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updatedJobCandidate) - // return created interview - return created.dataValues + await sequelize.transaction(async (t) => { + // create the interview + const created = await Interview.create(interview, { transaction: t }) + entity = created.toJSON() + await processRequestInterview(entity) + // update jobCandidate.status to Interview + const [, affectedRows] = await models.JobCandidate.update( + { status: 'interview' }, + { where: { id: created.jobCandidateId }, returning: true, transaction: t } + ) + jobCandidateEntity = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') + await jobCandidateProcessUpdate(jobCandidateEntity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.request') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, entity) + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, jobCandidateEntity) + // return created interview + return entity } requestInterview.schema = Joi.object().keys({ @@ -301,16 +319,24 @@ async function partiallyUpdateInterview (currentUser, interview, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) + let entity try { - const updated = await interview.update(data) - await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, updated.toJSON(), { oldValue: interview.toJSON() }) - return updated.dataValues + await sequelize.transaction(async (t) => { + const updated = await interview.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdateInterview(entity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.update') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, interview.jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, entity, { oldValue: interview.toJSON() }) + return entity } /** @@ -571,38 +597,58 @@ searchInterviews.schema = Joi.object().keys({ async function updateCompletedInterviews () { logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: 'Running the scheduled job...' }) const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000) - const [affectedCount, updatedRows] = await Interview.update( - // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job - { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, - { - where: { - status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], - startTimestamp: { - [Op.lte]: oneHourAgo + + let entity + let affectedCount + try { + await sequelize.transaction(async (t) => { + const updated = await Interview.update( + // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job + { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, + { + where: { + status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], + startTimestamp: { + [Op.lte]: oneHourAgo + } + }, + returning: true, + transaction: t } - }, - returning: true - } - ) - - // post event if there are affected/updated interviews - if (affectedCount > 0) { - // payload format: - // { - // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // ... - // } - const bulkUpdatePayload = {} - // construct payload - _.forEach(updatedRows, row => { - const interview = row.toJSON() - const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) - _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + ) + let updatedRows + [affectedCount, updatedRows] = updated + + // post event if there are affected/updated interviews + if (affectedCount > 0) { + // payload format: + // { + // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // ... + // } + const bulkUpdatePayload = {} + // construct payload + _.forEach(updatedRows, row => { + const interview = row.toJSON() + const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) + _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + }) + entity = bulkUpdatePayload + await processBulkUpdateInterviews(bulkUpdatePayload) + } }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.bulkupdate') + } + throw e + } + if (affectedCount) { // post event - await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, bulkUpdatePayload) + await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, entity) } + logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: `Completed running. Updated ${affectedCount} interviews.` }) } diff --git a/src/services/JobCandidateService.js b/src/services/JobCandidateService.js index 8bdd2b60..9e41f5a6 100644 --- a/src/services/JobCandidateService.js +++ b/src/services/JobCandidateService.js @@ -14,7 +14,13 @@ const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') const JobService = require('./JobService') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobCandidateProcessor') +const sequelize = models.sequelize const JobCandidate = models.JobCandidate const esClient = helper.getESClient() @@ -118,9 +124,21 @@ async function createJobCandidate (currentUser, jobCandidate) { jobCandidate.id = uuid() jobCandidate.createdBy = await helper.getUserId(currentUser.userId) - const created = await JobCandidate.create(jobCandidate) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await JobCandidate.create(jobCandidate, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.create') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, entity) + return entity } createJobCandidate.schema = Joi.object().keys({ @@ -155,8 +173,20 @@ async function updateJobCandidate (currentUser, id, data) { data.updatedBy = userId - const updated = await jobCandidate.update(data) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await jobCandidate.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, entity, { oldValue: oldValue }) const result = _.assign(jobCandidate.dataValues, data) return result } @@ -225,7 +255,15 @@ async function deleteJobCandidate (currentUser, id) { } const jobCandidate = await JobCandidate.findById(id) - await jobCandidate.destroy() + try { + await sequelize.transaction(async (t) => { + await jobCandidate.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'jobcandidate.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_CANDIDATE_DELETE_TOPIC, { id }) } diff --git a/src/services/JobService.js b/src/services/JobService.js index 4a183783..9ec50c3b 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -12,7 +12,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobProcessor') +const sequelize = models.sequelize const Job = models.Job const esClient = helper.getESClient() @@ -182,9 +188,21 @@ async function createJob (currentUser, job) { job.id = uuid() job.createdBy = await helper.getUserId(currentUser.userId) - const created = await Job.create(job) - await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, created.toJSON()) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Job.create(job, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.create') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, entity) + return entity } createJob.schema = Joi.object() @@ -252,8 +270,20 @@ async function updateJob (currentUser, id, data) { data.updatedBy = ubahnUserId - const updated = await job.update(data) - await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await job.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, entity, { oldValue: oldValue }) job = await Job.findById(id, true) job.dataValues.candidates = _.map(job.dataValues.candidates, (c) => c.dataValues) return job.dataValues @@ -350,7 +380,15 @@ async function deleteJob (currentUser, id) { } const job = await Job.findById(id) - await job.destroy() + try { + await sequelize.transaction(async (t) => { + await job.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'job.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_DELETE_TOPIC, { id }) } diff --git a/src/services/RoleService.js b/src/services/RoleService.js index ba128170..b5d68673 100644 --- a/src/services/RoleService.js +++ b/src/services/RoleService.js @@ -11,7 +11,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/RoleProcessor') +const sequelize = models.sequelize const Role = models.Role const esClient = helper.getESClient() @@ -118,10 +124,21 @@ async function createRole (currentUser, role) { role.id = uuid.v4() role.createdBy = await helper.getUserId(currentUser.userId) - const created = await Role.create(role) - - await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, created.toJSON()) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Role.create(role, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.create') + } + throw e + } + await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, entity) + return entity } createRole.schema = Joi.object().keys({ @@ -175,10 +192,22 @@ async function updateRole (currentUser, id, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await role.update(data) - await helper.postEvent(config.TAAS_ROLE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await role.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } updateRole.schema = Joi.object().keys({ @@ -220,7 +249,16 @@ async function deleteRole (currentUser, id) { await _checkUserPermissionForWriteDeleteRole(currentUser) const role = await Role.findById(id) - await role.destroy() + + try { + await sequelize.transaction(async (t) => { + await role.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'role.delete') + throw e + } await helper.postEvent(config.TAAS_ROLE_DELETE_TOPIC, { id }) }