Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Commit b54f029

Browse files
Merge pull request #14 from topcoder-platform/develop
Fix payment processor
2 parents 2ed2aa0 + ca5a833 commit b54f029

File tree

5 files changed

+99
-45
lines changed

5 files changed

+99
-45
lines changed

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ module.exports = {
4343
PAYMENT_STATUS_ID: process.env.PAYMENT_STATUS_ID || 55, // on hold
4444
MODIFICATION_RATIONALE_ID: process.env.MODIFICATION_RATIONALE_ID || 1,
4545
WINNER_PAYMENT_TYPE_ID: process.env.WINNER_PAYMENT_TYPE_ID || 72,
46+
CHECKPOINT_WINNER_PAYMENT_TYPE_ID: process.env.CHECKPOINT_WINNER_PAYMENT_TYPE_ID || 64,
4647
COPILOT_PAYMENT_TYPE_ID: process.env.COPILOT_PAYMENT_TYPE_ID || 74,
4748

4849
V5_PAYMENT_DETAIL_STATUS_REASON_ID: process.env.V5_PAYMENT_DETAIL_STATUS_REASON_ID || 500,

src/app.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,25 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
4444
return
4545
}
4646

47-
// Currently only process payments for challenges with `legacy.pureV5Task: true`
48-
if (!_.get(messageJSON.payload, 'legacy.pureV5Task', false)) {
49-
logger.info(`Challenge Legacy Object ${JSON.stringify(_.get(messageJSON.payload, 'legacy'))} does not have legacy.pureV5Task: true`)
47+
// Currently only process payments for challenges with `legacy.pureV5Task: true` or `legacy.pureV5: true`
48+
if (!_.get(messageJSON.payload, 'legacy.pureV5Task', false) && !_.get(messageJSON.payload, 'legacy.pureV5', false)) {
49+
logger.info(`Challenge Legacy Object ${JSON.stringify(_.get(messageJSON.payload, 'legacy'))} does not have legacy.pureV5Task: true or legacy.pureV5: true`)
5050
await consumer.commitOffset({ topic, partition, offset: m.offset })
5151
return
5252
}
5353

54-
if (_.toUpper(_.get(messageJSON.payload, 'type')) !== 'TASK' || _.toUpper(_.get(messageJSON.payload, 'status')) !== 'COMPLETED') {
55-
logger.info(`The message type ${_.get(messageJSON.payload, 'type')}, status ${_.get(messageJSON.payload, 'status')} doesn't match {type: 'Task', status: 'Completed'}.`)
54+
if (_.toUpper(_.get(messageJSON.payload, 'status')) !== 'COMPLETED') {
55+
logger.info(`The message type ${_.get(messageJSON.payload, 'type')}, status ${_.get(messageJSON.payload, 'status')} doesn't match {status: 'Completed'}.`)
5656

5757
// commit the message and ignore it
5858
await consumer.commitOffset({ topic, partition, offset: m.offset })
5959
return
6060
}
6161

6262
try {
63+
// delay for a random amount of time between 5-20 sec
64+
// to minimize the chance of having two processes doing the same at the same time
65+
await helper.delay(helper.getRandomInt(5 * 1000, 20 * 1000))
6366
await processorService.processUpdate(messageJSON)
6467
logger.debug('Successfully processed message')
6568
} catch (err) {

src/common/helper.js

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,29 @@ async function getCopilotId (challengeId) {
146146
return _.get(_.head(res.body), 'memberId')
147147
}
148148

149+
/**
150+
* Creates a delay promise
151+
* @param {Number} ms the milliseconds to delay
152+
* @returns a promise
153+
*/
154+
function delay (ms) {
155+
return new Promise(resolve => {
156+
setTimeout(resolve, ms)
157+
})
158+
}
159+
160+
/**
161+
* Generate a random integer
162+
* @param {Number} min the min
163+
* @param {Number} max the max
164+
* @returns the random integer
165+
*/
166+
function getRandomInt(min, max) {
167+
min = Math.ceil(min)
168+
max = Math.floor(max)
169+
return Math.floor(Math.random() * (max - min + 1)) + min
170+
}
171+
149172
module.exports = {
150173
getInformixConnection,
151174
getKafkaOptions,
@@ -154,5 +177,7 @@ module.exports = {
154177
putRequest,
155178
postRequest,
156179
getUserId,
157-
getCopilotId
180+
getCopilotId,
181+
delay,
182+
getRandomInt
158183
}

src/services/paymentService.js

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,18 @@ const INSERT_PAYMENT_STATUS_REASON_XREF = 'INSERT INTO payment_detail_status_rea
4646
* @param {String} sql the sql
4747
* @return {Object} Informix statement
4848
*/
49-
async function prepare (connection, sql) {
49+
async function prepare(connection, sql) {
5050
logger.debug(`Preparing SQL ${sql}`)
5151
const stmt = await connection.prepareAsync(sql)
5252
return Promise.promisifyAll(stmt)
5353
}
5454

55-
async function paymentExists(payment) {
56-
const connection = await helper.getInformixConnection()
55+
async function paymentExists(payment, connection) {
56+
let isNewConn = false
57+
if (!connection) {
58+
connection = await helper.getInformixConnection()
59+
isNewConn = true
60+
}
5761
try {
5862
const query = util.format(QUERY_PAYMENT, payment.memberId, payment.v5ChallengeId, payment.typeId)
5963
logger.debug(`Checking if paymentExists - ${query}`)
@@ -62,31 +66,39 @@ async function paymentExists(payment) {
6266
logger.error(`Error in 'paymentExists' ${e}`)
6367
throw e
6468
} finally {
65-
await connection.closeAsync()
69+
if (isNewConn) await connection.closeAsync()
6670
}
6771
}
6872

6973
/**
7074
* Create payment and save it to db
7175
* @param {Object} payment the payment info
7276
*/
73-
async function createPayment (payment) {
74-
logger.debug(`Creating Payment -> v5ChallengeID: ${payment.v5ChallengeId} - obj: ${JSON.stringify(payment)}`)
75-
const connection = await helper.getInformixConnection()
76-
const paymentDetailId = await paymentDetailIdGen.getNextId()
77-
const paymentId = await paymentIdGen.getNextId()
77+
async function createPayment(payment) {
7878
try {
79+
const connection = await helper.getInformixConnection()
7980
await connection.beginTransactionAsync()
80-
const insertDetail = await prepare(connection, INSERT_PAYMENT_DETAIL)
81-
await insertDetail.executeAsync([paymentDetailId, payment.amount, payment.amount, payment.statusId, payment.modificationRationaleId, payment.desc, payment.typeId, payment.methodId, payment.projectId, payment.charityInd, payment.amount, payment.installmentNumber, payment.createUser, payment.v5ChallengeId])
82-
const insertPayment = await prepare(connection, INSERT_PAYMENT)
83-
await insertPayment.executeAsync([paymentId, payment.memberId, paymentDetailId])
84-
const insertDetailXref = await prepare(connection, INSERT_PAYMENT_DETAIL_XREF)
85-
await insertDetailXref.executeAsync([paymentId, paymentDetailId])
86-
const insertStatusXref = await prepare(connection, INSERT_PAYMENT_STATUS_REASON_XREF)
87-
await insertStatusXref.executeAsync([paymentDetailId, config.V5_PAYMENT_DETAIL_STATUS_REASON_ID])
88-
logger.info(`Payment ${paymentId} with detail ${paymentDetailId} has been inserted`)
89-
await connection.commitTransactionAsync()
81+
82+
const existing = await paymentExists(payment, connection)
83+
logger.debug(`Payment Exists Response: ${JSON.stringify(existing)}`)
84+
if (!existing || existing.length === 0) {
85+
const paymentDetailId = await paymentDetailIdGen.getNextId()
86+
const paymentId = await paymentIdGen.getNextId()
87+
const insertDetail = await prepare(connection, INSERT_PAYMENT_DETAIL)
88+
await insertDetail.executeAsync([paymentDetailId, payment.amount, payment.amount, payment.statusId, payment.modificationRationaleId, payment.desc, payment.typeId, payment.methodId, payment.projectId, payment.charityInd, payment.amount, payment.installmentNumber, payment.createUser, payment.
89+
])
90+
const insertPayment = await prepare(connection, INSERT_PAYMENT)
91+
await insertPayment.executeAsync([paymentId, payment.memberId, paymentDetailId])
92+
const insertDetailXref = await prepare(connection, INSERT_PAYMENT_DETAIL_XREF)
93+
await insertDetailXref.executeAsync([paymentId, paymentDetailId])
94+
const insertStatusXref = await prepare(connection, INSERT_PAYMENT_STATUS_REASON_XREF)
95+
await insertStatusXref.executeAsync([paymentDetailId, config.V5_PAYMENT_DETAIL_STATUS_REASON_ID])
96+
logger.info(`Payment ${paymentId} with detail ${paymentDetailId} has been inserted`)
97+
await connection.commitTransactionAsync()
98+
} else {
99+
logger.error(`Payment Exists for ${payment.v5ChallengeId}, skipping - ${JSON.stringify(existing)}`)
100+
await connection.commitTransactionAsync()
101+
}
90102
} catch (e) {
91103
logger.error(`Error in 'createPayment' ${e}, rolling back transaction`)
92104
await connection.rollbackTransactionAsync()

src/services/processorService.js

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ async function processUpdate(message) {
4040
// add winner payment
4141
try {
4242
const winnerPrizes = _.get(_.find(message.payload.prizeSets, ['type', 'placement']), 'prizes', [])
43+
const checkpointPrizes = _.get(_.find(message.payload.prizeSets, ['type', 'checkpoint']), 'prizes', [])
4344
// const winnerPaymentDesc = _.get(_.find(message.payload.prizeSets, ['type', 'placement']), 'description', '')
44-
const winnerMembers = _.sortBy(_.get(message.payload, 'winners', []), ['placement'])
45+
//` w => w.type === 'placement' || _.isUndefined(w.type)` is used here to support challenges where the type is not set (old data or other tracks that only have placements)
46+
const winnerMembers = _.sortBy(_.filter(_.get(message.payload, 'winners', []), w => w.type === 'placement' || _.isUndefined(w.type)), ['placement'])
47+
const checkpointWinnerMembers = _.sortBy(_.filter(_.get(message.payload, 'winners', []), w => w.type === 'checkpoint'), ['placement'])
4548
if (_.isEmpty(winnerPrizes)) {
4649
logger.warn(`For challenge ${v5ChallengeId}, no winner payment avaiable`)
4750
} else if (winnerPrizes.length !== winnerMembers.length) {
@@ -52,23 +55,38 @@ async function processUpdate(message) {
5255
const payment = _.assign({
5356
memberId: winnerMembers[i - 1].userId,
5457
amount: winnerPrizes[i - 1].value,
55-
desc: `Task - ${message.payload.name} - ${i} Place`,
58+
desc: `Payment - ${message.payload.name} - ${i} Place`,
5659
typeId: config.WINNER_PAYMENT_TYPE_ID
5760
}, basePayment)
5861

59-
const paymentExists = await paymentService.paymentExists(payment)
60-
logger.debug(`Payment Exists Response: ${JSON.stringify(paymentExists)}`)
61-
if(!paymentExists || paymentExists.length === 0) {
62-
await paymentService.createPayment(payment)
63-
} else {
64-
logger.error(`Payment Exists for ${v5ChallengeId}, skipping - ${JSON.stringify(paymentExists)}`)
65-
}
62+
await paymentService.createPayment(payment)
6663
}
6764
} catch (error) {
6865
logger.error(`For challenge ${v5ChallengeId}, add winner payments error: ${error}`)
6966
}
7067
}
7168

69+
if (_.isEmpty(checkpointPrizes)) {
70+
logger.warn(`For challenge ${v5ChallengeId}, no checkpoint winner payment avaiable`)
71+
} else if (checkpointPrizes.length !== checkpointWinnerMembers.length) {
72+
logger.error(`For challenge ${v5ChallengeId}, there is ${checkpointPrizes.length} user prizes but ${checkpointWinnerMembers.length} winners`)
73+
} else {
74+
try {
75+
for (let i = 1; i <= checkpointPrizes.length; i++) {
76+
const payment = _.assign({
77+
memberId: checkpointWinnerMembers[i - 1].userId,
78+
amount: checkpointPrizes[i - 1].value,
79+
desc: `Checkpoint payment - ${message.payload.name} - ${i} Place`,
80+
typeId: config.CHECKPOINT_WINNER_PAYMENT_TYPE_ID
81+
}, basePayment)
82+
83+
await paymentService.createPayment(payment)
84+
}
85+
} catch (error) {
86+
logger.error(`For challenge ${v5ChallengeId}, add checkpoint winner payments error: ${error}`)
87+
}
88+
}
89+
7290
// add copilot payment
7391
const copilotId = await helper.getCopilotId(message.payload.id)
7492
const copilotAmount = _.get(_.head(_.get(_.find(message.payload.prizeSets, ['type', 'copilot']), 'prizes', [])), 'value')
@@ -83,16 +101,10 @@ async function processUpdate(message) {
83101
const copilotPayment = _.assign({
84102
memberId: copilotId,
85103
amount: copilotAmount,
86-
desc: (copilotPaymentDesc ? copilotPaymentDesc : `Task - ${message.payload.name} - Copilot`),
104+
desc: (copilotPaymentDesc ? copilotPaymentDesc : `${message.payload.name} - Copilot`),
87105
typeId: config.COPILOT_PAYMENT_TYPE_ID
88106
}, basePayment)
89-
const paymentExists = await paymentService.paymentExists(copilotPayment)
90-
logger.debug(`Copilot Payment Exists Response: ${JSON.stringify(paymentExists)}`)
91-
if(!paymentExists || paymentExists.length === 0) {
92-
await paymentService.createPayment(copilotPayment)
93-
} else {
94-
logger.error(`Copilot Payment Exists for ${v5ChallengeId}, skipping - ${JSON.stringify(paymentExists)}`)
95-
}
107+
await paymentService.createPayment(copilotPayment)
96108
} catch (error) {
97109
logger.error(`For challenge ${v5ChallengeId}, add copilot payments error: ${error}`)
98110
}
@@ -113,18 +125,19 @@ processUpdate.schema = {
113125
legacyId: Joi.number().integer().positive(),
114126
task: Joi.object().keys({
115127
memberId: Joi.string().allow(null)
116-
}).unknown(true).required(),
128+
}).unknown(true),
117129
name: Joi.string().required(),
118130
prizeSets: Joi.array().items(Joi.object().keys({
119-
type: Joi.string().valid('copilot', 'placement').required(),
131+
type: Joi.string().valid('copilot', 'placement', 'checkpoint').required(),
120132
prizes: Joi.array().items(Joi.object().keys({
121133
value: Joi.number().positive().required()
122134
}).unknown(true))
123135
}).unknown(true)).min(1),
124136
winners: Joi.array().items(Joi.object({
125137
userId: Joi.number().integer().positive().required(),
126138
handle: Joi.string(),
127-
placement: Joi.number().integer().positive().required()
139+
placement: Joi.number().integer().positive().required(),
140+
type: Joi.string().valid(['placement', 'checkpoint'])
128141
}).unknown(true)),
129142
type: Joi.string().required(),
130143
status: Joi.string().required(),

0 commit comments

Comments
 (0)