diff --git a/src/eventHandlers/WorkPeriodPaymentEventHandler.js b/src/eventHandlers/WorkPeriodPaymentEventHandler.js index 53ab7823..3587186b 100644 --- a/src/eventHandlers/WorkPeriodPaymentEventHandler.js +++ b/src/eventHandlers/WorkPeriodPaymentEventHandler.js @@ -9,7 +9,11 @@ const logger = require('../common/logger') const helper = require('../common/helper') const { ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const WorkPeriod = models.WorkPeriod +const { + processUpdate: processUpdateEs +} = require('../esProcessors/WorkPeriodProcessor') +const sequelize = models.sequelize /** * When a WorkPeriodPayment is updated or created, the workPeriod related to * that WorkPeriodPayment should be updated also. @@ -39,8 +43,25 @@ async function updateWorkPeriod (payload) { }) return } - const updated = await workPeriodModel.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, _.omit(updated.toJSON(), 'payments'), { oldValue: workPeriod, key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodModel.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdateEs({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: workPeriod, key }) + logger.debug({ component: 'WorkPeriodPaymentEventHandler', context: 'updateWorkPeriod', diff --git a/src/services/JobService.js b/src/services/JobService.js index d72055fa..8057c478 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -201,6 +201,7 @@ async function createJob (currentUser, job, onTeamCreating) { } throw e } + await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, entity, { onTeamCreating }) return entity } diff --git a/src/services/PaymentSchedulerService.js b/src/services/PaymentSchedulerService.js index f38eab6f..e718c295 100644 --- a/src/services/PaymentSchedulerService.js +++ b/src/services/PaymentSchedulerService.js @@ -2,11 +2,16 @@ const _ = require('lodash') const config = require('config') const moment = require('moment') const models = require('../models') -const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper') +const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent, postErrorEvent } = require('../common/helper') const logger = require('../common/logger') const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService') const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants') +const { + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod const PaymentScheduler = models.PaymentScheduler @@ -88,9 +93,22 @@ async function processPayment (workPeriodPayment) { } } else { const oldValue = workPeriodPayment.toJSON() - const updated = await workPeriodPayment.update({ status: 'in-progress' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let entity + let key + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update({ status: 'in-progress' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) } // Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing await checkWait(PaymentSchedulerStatus.START_PROCESS) @@ -112,11 +130,24 @@ async function processPayment (workPeriodPayment) { } const oldValue = workPeriodPayment.toJSON() - // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". - const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let key + let entity + try { + await sequelize.transaction(async (t) => { + // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". + const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }, { transaction: t }) + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' }) localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment') @@ -125,10 +156,24 @@ async function processPayment (workPeriodPayment) { logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' }) const statusDetails = { errorMessage: extractErrorMessage(err), errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null } const oldValue = workPeriodPayment.toJSON() - // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. - const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + + let entity + let key + try { + await sequelize.transaction(async (t) => { + // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. + const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) if (paymentScheduler) { await paymentScheduler.update({ step: _.get(err, 'step'), userId: paymentScheduler.userId, status: 'failed' })