From b573c79fb40d4ad27dec64d119f310310da67750 Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Wed, 19 Jun 2019 17:24:12 +0530 Subject: [PATCH] #115 - Duplicate all events to post to aggregate topic --- config/default.js | 1 + src/common/helper.js | 20 ++++++++++++++++++-- src/services/ReviewService.js | 7 +++---- src/services/ReviewSummationService.js | 7 +++---- src/services/ReviewTypeService.js | 7 +++---- src/services/SubmissionService.js | 7 +++---- 6 files changed, 31 insertions(+), 18 deletions(-) diff --git a/config/default.js b/config/default.js index e8037556..b2e2bb05 100755 --- a/config/default.js +++ b/config/default.js @@ -20,6 +20,7 @@ module.exports = { }, BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5', KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'error.notification', + KAFKA_AGGREGATE_TOPIC: process.env.KAFKA_AGGREGATE_TOPIC, CHALLENGEAPI_URL: process.env.CHALLENGEAPI_URL || 'https://api.topcoder-dev.com/v4/challenges', AUTH0_URL: process.env.AUTH0_URL, // Auth0 credentials for Submission Service AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE || 'https://www.topcoder.com', diff --git a/src/common/helper.js b/src/common/helper.js index c9ad788c..31f72448 100755 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -517,11 +517,26 @@ function * downloadFile (fileURL) { return downloadedFile.Body } +/** + * Wrapper function to post to bus api. Ensures that every event posted to bus api + * is duplicated and posted to bus api again, but to a different "aggregate" topic + * @param {Object} payload Data that needs to be posted to the bus api + */ +function * postToBusApi (payload) { + const busApiClient = getBusApiClient() + + yield busApiClient.postEvent(payload) + + // Post to aggregate topic + payload['topic'] = config.get('KAFKA_AGGREGATE_TOPIC') + + yield busApiClient.postEvent(payload) +} + module.exports = { wrapExpress, autoWrapExpress, getEsClient, - getBusApiClient, fetchFromES, camelize, setPaginationHeaders, @@ -529,5 +544,6 @@ module.exports = { checkCreateAccess, checkGetAccess, checkReviewGetAccess, - downloadFile + downloadFile, + postToBusApi } diff --git a/src/services/ReviewService.js b/src/services/ReviewService.js index 2a3f47d8..41d535da 100644 --- a/src/services/ReviewService.js +++ b/src/services/ReviewService.js @@ -16,7 +16,6 @@ const { const HelperService = require('./HelperService') const SubmissionService = require('./SubmissionService') -const busApiClient = helper.getBusApiClient() const table = 'Review' /** @@ -135,7 +134,7 @@ function * createReview (authUser, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Inserting records in DynamoDB doesn't return any response // Hence returning the same entity to be in compliance with Swagger @@ -217,7 +216,7 @@ function * _updateReview (authUser, reviewId, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Updating records in DynamoDB doesn't return any response // Hence returning the response which will be in compliance with Swagger @@ -310,7 +309,7 @@ function * deleteReview (reviewId) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) } deleteReview.schema = { diff --git a/src/services/ReviewSummationService.js b/src/services/ReviewSummationService.js index 54ff3162..b6a6d028 100644 --- a/src/services/ReviewSummationService.js +++ b/src/services/ReviewSummationService.js @@ -11,7 +11,6 @@ const helper = require('../common/helper') const { originator, mimeType, events } = require('../../constants').busApiMeta const HelperService = require('./HelperService') -const busApiClient = helper.getBusApiClient() const table = 'ReviewSummation' /** @@ -112,7 +111,7 @@ function * createReviewSummation (authUser, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Inserting records in DynamoDB doesn't return any response // Hence returning the same entity to be in compliance with Swagger @@ -212,7 +211,7 @@ function * _updateReviewSummation (authUser, reviewSummationId, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Updating records in DynamoDB doesn't return any response // Hence returning the response which will be in compliance with Swagger @@ -302,7 +301,7 @@ function * deleteReviewSummation (reviewSummationId) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) } deleteReviewSummation.schema = { diff --git a/src/services/ReviewTypeService.js b/src/services/ReviewTypeService.js index 93945b06..63d67b0c 100755 --- a/src/services/ReviewTypeService.js +++ b/src/services/ReviewTypeService.js @@ -10,7 +10,6 @@ const dbhelper = require('../common/dbhelper') const helper = require('../common/helper') const { originator, mimeType, events } = require('../../constants').busApiMeta -const busApiClient = helper.getBusApiClient() const table = 'ReviewType' /** @@ -93,7 +92,7 @@ function * createReviewType (entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Inserting records in DynamoDB doesn't return any response // Hence returning the same entity to be in compliance with Swagger @@ -157,7 +156,7 @@ function * _updateReviewType (reviewTypeId, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Updating records in DynamoDB doesn't return any response // Hence returning the response which will be in compliance with Swagger @@ -235,7 +234,7 @@ function * deleteReviewType (reviewTypeId) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) } deleteReviewType.schema = { diff --git a/src/services/SubmissionService.js b/src/services/SubmissionService.js index cca7114c..f739161d 100755 --- a/src/services/SubmissionService.js +++ b/src/services/SubmissionService.js @@ -16,7 +16,6 @@ const { submissionIndex } = require('../../constants') const s3 = new AWS.S3() const logger = require('winston') -const busApiClient = helper.getBusApiClient() const table = 'Submission' /* @@ -308,7 +307,7 @@ function * createSubmission (authUser, files, entity) { logger.info('Prepared submission create event payload to pass to THE bus') // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Inserting records in DynamoDB doesn't return any response // Hence returning the entity which is in compliance with Swagger @@ -411,7 +410,7 @@ function * _updateSubmission (authUser, submissionId, entity) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) // Updating records in DynamoDB doesn't return any response // Hence returning the response which will be in compliance with Swagger @@ -503,7 +502,7 @@ function * deleteSubmission (submissionId) { } // Post to Bus API using Client - yield busApiClient.postEvent(reqBody) + yield helper.postToBusApi(reqBody) } deleteSubmission.schema = {