Skip to content

Commit e8e8cf9

Browse files
authored
Merge pull request #116 from topcoder-platform/Issue_115
Duplicate all events to post to aggregate topic
2 parents d287907 + b573c79 commit e8e8cf9

File tree

6 files changed

+31
-18
lines changed

6 files changed

+31
-18
lines changed

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ module.exports = {
2020
},
2121
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
2222
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'error.notification',
23+
KAFKA_AGGREGATE_TOPIC: process.env.KAFKA_AGGREGATE_TOPIC,
2324
CHALLENGEAPI_URL: process.env.CHALLENGEAPI_URL || 'https://api.topcoder-dev.com/v4/challenges',
2425
AUTH0_URL: process.env.AUTH0_URL, // Auth0 credentials for Submission Service
2526
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE || 'https://www.topcoder.com',

src/common/helper.js

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,17 +517,33 @@ function * downloadFile (fileURL) {
517517
return downloadedFile.Body
518518
}
519519

520+
/**
521+
* Wrapper function to post to bus api. Ensures that every event posted to bus api
522+
* is duplicated and posted to bus api again, but to a different "aggregate" topic
523+
* @param {Object} payload Data that needs to be posted to the bus api
524+
*/
525+
function * postToBusApi (payload) {
526+
const busApiClient = getBusApiClient()
527+
528+
yield busApiClient.postEvent(payload)
529+
530+
// Post to aggregate topic
531+
payload['topic'] = config.get('KAFKA_AGGREGATE_TOPIC')
532+
533+
yield busApiClient.postEvent(payload)
534+
}
535+
520536
module.exports = {
521537
wrapExpress,
522538
autoWrapExpress,
523539
getEsClient,
524-
getBusApiClient,
525540
fetchFromES,
526541
camelize,
527542
setPaginationHeaders,
528543
getSubmissionPhaseId,
529544
checkCreateAccess,
530545
checkGetAccess,
531546
checkReviewGetAccess,
532-
downloadFile
547+
downloadFile,
548+
postToBusApi
533549
}

src/services/ReviewService.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ const {
1616
const HelperService = require('./HelperService')
1717
const SubmissionService = require('./SubmissionService')
1818

19-
const busApiClient = helper.getBusApiClient()
2019
const table = 'Review'
2120

2221
/**
@@ -135,7 +134,7 @@ function * createReview (authUser, entity) {
135134
}
136135

137136
// Post to Bus API using Client
138-
yield busApiClient.postEvent(reqBody)
137+
yield helper.postToBusApi(reqBody)
139138

140139
// Inserting records in DynamoDB doesn't return any response
141140
// Hence returning the same entity to be in compliance with Swagger
@@ -217,7 +216,7 @@ function * _updateReview (authUser, reviewId, entity) {
217216
}
218217

219218
// Post to Bus API using Client
220-
yield busApiClient.postEvent(reqBody)
219+
yield helper.postToBusApi(reqBody)
221220

222221
// Updating records in DynamoDB doesn't return any response
223222
// Hence returning the response which will be in compliance with Swagger
@@ -310,7 +309,7 @@ function * deleteReview (reviewId) {
310309
}
311310

312311
// Post to Bus API using Client
313-
yield busApiClient.postEvent(reqBody)
312+
yield helper.postToBusApi(reqBody)
314313
}
315314

316315
deleteReview.schema = {

src/services/ReviewSummationService.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const helper = require('../common/helper')
1111
const { originator, mimeType, events } = require('../../constants').busApiMeta
1212
const HelperService = require('./HelperService')
1313

14-
const busApiClient = helper.getBusApiClient()
1514
const table = 'ReviewSummation'
1615

1716
/**
@@ -112,7 +111,7 @@ function * createReviewSummation (authUser, entity) {
112111
}
113112

114113
// Post to Bus API using Client
115-
yield busApiClient.postEvent(reqBody)
114+
yield helper.postToBusApi(reqBody)
116115

117116
// Inserting records in DynamoDB doesn't return any response
118117
// Hence returning the same entity to be in compliance with Swagger
@@ -212,7 +211,7 @@ function * _updateReviewSummation (authUser, reviewSummationId, entity) {
212211
}
213212

214213
// Post to Bus API using Client
215-
yield busApiClient.postEvent(reqBody)
214+
yield helper.postToBusApi(reqBody)
216215

217216
// Updating records in DynamoDB doesn't return any response
218217
// Hence returning the response which will be in compliance with Swagger
@@ -302,7 +301,7 @@ function * deleteReviewSummation (reviewSummationId) {
302301
}
303302

304303
// Post to Bus API using Client
305-
yield busApiClient.postEvent(reqBody)
304+
yield helper.postToBusApi(reqBody)
306305
}
307306

308307
deleteReviewSummation.schema = {

src/services/ReviewTypeService.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ const dbhelper = require('../common/dbhelper')
1010
const helper = require('../common/helper')
1111
const { originator, mimeType, events } = require('../../constants').busApiMeta
1212

13-
const busApiClient = helper.getBusApiClient()
1413
const table = 'ReviewType'
1514

1615
/**
@@ -93,7 +92,7 @@ function * createReviewType (entity) {
9392
}
9493

9594
// Post to Bus API using Client
96-
yield busApiClient.postEvent(reqBody)
95+
yield helper.postToBusApi(reqBody)
9796

9897
// Inserting records in DynamoDB doesn't return any response
9998
// Hence returning the same entity to be in compliance with Swagger
@@ -157,7 +156,7 @@ function * _updateReviewType (reviewTypeId, entity) {
157156
}
158157

159158
// Post to Bus API using Client
160-
yield busApiClient.postEvent(reqBody)
159+
yield helper.postToBusApi(reqBody)
161160

162161
// Updating records in DynamoDB doesn't return any response
163162
// Hence returning the response which will be in compliance with Swagger
@@ -235,7 +234,7 @@ function * deleteReviewType (reviewTypeId) {
235234
}
236235

237236
// Post to Bus API using Client
238-
yield busApiClient.postEvent(reqBody)
237+
yield helper.postToBusApi(reqBody)
239238
}
240239

241240
deleteReviewType.schema = {

src/services/SubmissionService.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ const { submissionIndex } = require('../../constants')
1616
const s3 = new AWS.S3()
1717
const logger = require('winston')
1818

19-
const busApiClient = helper.getBusApiClient()
2019
const table = 'Submission'
2120

2221
/*
@@ -308,7 +307,7 @@ function * createSubmission (authUser, files, entity) {
308307
logger.info('Prepared submission create event payload to pass to THE bus')
309308

310309
// Post to Bus API using Client
311-
yield busApiClient.postEvent(reqBody)
310+
yield helper.postToBusApi(reqBody)
312311

313312
// Inserting records in DynamoDB doesn't return any response
314313
// Hence returning the entity which is in compliance with Swagger
@@ -411,7 +410,7 @@ function * _updateSubmission (authUser, submissionId, entity) {
411410
}
412411

413412
// Post to Bus API using Client
414-
yield busApiClient.postEvent(reqBody)
413+
yield helper.postToBusApi(reqBody)
415414

416415
// Updating records in DynamoDB doesn't return any response
417416
// Hence returning the response which will be in compliance with Swagger
@@ -503,7 +502,7 @@ function * deleteSubmission (submissionId) {
503502
}
504503

505504
// Post to Bus API using Client
506-
yield busApiClient.postEvent(reqBody)
505+
yield helper.postToBusApi(reqBody)
507506
}
508507

509508
deleteSubmission.schema = {

0 commit comments

Comments
 (0)