Skip to content

Feature/shapeup4 cqrs update #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/eventHandlers/WorkPeriodPaymentEventHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ const logger = require('../common/logger')
const helper = require('../common/helper')
const { ActiveWorkPeriodPaymentStatuses } = require('../../app-constants')
const WorkPeriod = models.WorkPeriod
const {
processUpdate: processUpdateEs
} = require('../esProcessors/WorkPeriodProcessor')

const sequelize = models.sequelize
/**
* When a WorkPeriodPayment is updated or created, the workPeriod related to
* that WorkPeriodPayment should be updated also.
Expand Down Expand Up @@ -39,8 +43,25 @@ async function updateWorkPeriod (payload) {
})
return
}
const updated = await workPeriodModel.update(data)
await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, _.omit(updated.toJSON(), 'payments'), { oldValue: workPeriod, key: `resourceBooking.id:${workPeriod.resourceBookingId}` })

const key = `resourceBooking.id:${workPeriod.resourceBookingId}`
let entity
try {
await sequelize.transaction(async (t) => {
const updated = await workPeriodModel.update(data, { transaction: t })
entity = updated.toJSON()

entity = _.omit(entity, ['payments'])
await processUpdateEs({ ...entity, key })
})
} catch (e) {
if (entity) {
helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update')
}
throw e
}
await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: workPeriod, key })

logger.debug({
component: 'WorkPeriodPaymentEventHandler',
context: 'updateWorkPeriod',
Expand Down
1 change: 1 addition & 0 deletions src/services/JobService.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ async function createJob (currentUser, job, onTeamCreating) {
}
throw e
}

await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, entity, { onTeamCreating })
return entity
}
Expand Down
69 changes: 57 additions & 12 deletions src/services/PaymentSchedulerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ const _ = require('lodash')
const config = require('config')
const moment = require('moment')
const models = require('../models')
const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper')
const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent, postErrorEvent } = require('../common/helper')
const logger = require('../common/logger')
const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService')
const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants')

const {
processUpdate
} = require('../esProcessors/WorkPeriodPaymentProcessor')

const sequelize = models.sequelize
const WorkPeriodPayment = models.WorkPeriodPayment
const WorkPeriod = models.WorkPeriod
const PaymentScheduler = models.PaymentScheduler
Expand Down Expand Up @@ -88,9 +93,22 @@ async function processPayment (workPeriodPayment) {
}
} else {
const oldValue = workPeriodPayment.toJSON()
const updated = await workPeriodPayment.update({ status: 'in-progress' })
// Update the modified status to es
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` })
let entity
let key
try {
await sequelize.transaction(async (t) => {
const updated = await workPeriodPayment.update({ status: 'in-progress' }, { transaction: t })
key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}`
entity = updated.toJSON()
await processUpdate({ ...entity, key })
})
} catch (e) {
if (entity) {
postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update')
}
throw e
}
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key })
}
// Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing
await checkWait(PaymentSchedulerStatus.START_PROCESS)
Expand All @@ -112,11 +130,24 @@ async function processPayment (workPeriodPayment) {
}

const oldValue = workPeriodPayment.toJSON()
// 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed".
const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' })
// Update the modified status to es
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` })

let key
let entity
try {
await sequelize.transaction(async (t) => {
// 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed".
const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }, { transaction: t })
entity = updated.toJSON()
await processUpdate({ ...entity, key })
})
} catch (e) {
if (entity) {
postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update')
}
throw e
}
// Update the modified status to es
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key })
await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' })

localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment')
Expand All @@ -125,10 +156,24 @@ async function processPayment (workPeriodPayment) {
logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' })
const statusDetails = { errorMessage: extractErrorMessage(err), errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null }
const oldValue = workPeriodPayment.toJSON()
// If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format.
const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' })
// Update the modified status to es
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` })

let entity
let key
try {
await sequelize.transaction(async (t) => {
// If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format.
const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }, { transaction: t })
key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}`
entity = updated.toJSON()
await processUpdate({ ...entity, key })
})
} catch (e) {
if (entity) {
postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update')
}
throw e
}
await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key })

if (paymentScheduler) {
await paymentScheduler.update({ step: _.get(err, 'step'), userId: paymentScheduler.userId, status: 'failed' })
Expand Down