Skip to content

Duplicate all events to post to aggregate topic #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module.exports = {
},
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'error.notification',
KAFKA_AGGREGATE_TOPIC: process.env.KAFKA_AGGREGATE_TOPIC,
CHALLENGEAPI_URL: process.env.CHALLENGEAPI_URL || 'https://api.topcoder-dev.com/v4/challenges',
AUTH0_URL: process.env.AUTH0_URL, // Auth0 credentials for Submission Service
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE || 'https://www.topcoder.com',
Expand Down
20 changes: 18 additions & 2 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,33 @@ function * downloadFile (fileURL) {
return downloadedFile.Body
}

/**
* Wrapper function to post to bus api. Ensures that every event posted to bus api
* is duplicated and posted to bus api again, but to a different "aggregate" topic
* @param {Object} payload Data that needs to be posted to the bus api
*/
function * postToBusApi (payload) {
const busApiClient = getBusApiClient()

yield busApiClient.postEvent(payload)

// Post to aggregate topic
payload['topic'] = config.get('KAFKA_AGGREGATE_TOPIC')

yield busApiClient.postEvent(payload)
}

module.exports = {
wrapExpress,
autoWrapExpress,
getEsClient,
getBusApiClient,
fetchFromES,
camelize,
setPaginationHeaders,
getSubmissionPhaseId,
checkCreateAccess,
checkGetAccess,
checkReviewGetAccess,
downloadFile
downloadFile,
postToBusApi
}
7 changes: 3 additions & 4 deletions src/services/ReviewService.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const {
const HelperService = require('./HelperService')
const SubmissionService = require('./SubmissionService')

const busApiClient = helper.getBusApiClient()
const table = 'Review'

/**
Expand Down Expand Up @@ -135,7 +134,7 @@ function * createReview (authUser, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Inserting records in DynamoDB doesn't return any response
// Hence returning the same entity to be in compliance with Swagger
Expand Down Expand Up @@ -217,7 +216,7 @@ function * _updateReview (authUser, reviewId, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Updating records in DynamoDB doesn't return any response
// Hence returning the response which will be in compliance with Swagger
Expand Down Expand Up @@ -310,7 +309,7 @@ function * deleteReview (reviewId) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)
}

deleteReview.schema = {
Expand Down
7 changes: 3 additions & 4 deletions src/services/ReviewSummationService.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const helper = require('../common/helper')
const { originator, mimeType, events } = require('../../constants').busApiMeta
const HelperService = require('./HelperService')

const busApiClient = helper.getBusApiClient()
const table = 'ReviewSummation'

/**
Expand Down Expand Up @@ -112,7 +111,7 @@ function * createReviewSummation (authUser, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Inserting records in DynamoDB doesn't return any response
// Hence returning the same entity to be in compliance with Swagger
Expand Down Expand Up @@ -212,7 +211,7 @@ function * _updateReviewSummation (authUser, reviewSummationId, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Updating records in DynamoDB doesn't return any response
// Hence returning the response which will be in compliance with Swagger
Expand Down Expand Up @@ -302,7 +301,7 @@ function * deleteReviewSummation (reviewSummationId) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)
}

deleteReviewSummation.schema = {
Expand Down
7 changes: 3 additions & 4 deletions src/services/ReviewTypeService.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const dbhelper = require('../common/dbhelper')
const helper = require('../common/helper')
const { originator, mimeType, events } = require('../../constants').busApiMeta

const busApiClient = helper.getBusApiClient()
const table = 'ReviewType'

/**
Expand Down Expand Up @@ -93,7 +92,7 @@ function * createReviewType (entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Inserting records in DynamoDB doesn't return any response
// Hence returning the same entity to be in compliance with Swagger
Expand Down Expand Up @@ -157,7 +156,7 @@ function * _updateReviewType (reviewTypeId, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Updating records in DynamoDB doesn't return any response
// Hence returning the response which will be in compliance with Swagger
Expand Down Expand Up @@ -235,7 +234,7 @@ function * deleteReviewType (reviewTypeId) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)
}

deleteReviewType.schema = {
Expand Down
7 changes: 3 additions & 4 deletions src/services/SubmissionService.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const { submissionIndex } = require('../../constants')
const s3 = new AWS.S3()
const logger = require('winston')

const busApiClient = helper.getBusApiClient()
const table = 'Submission'

/*
Expand Down Expand Up @@ -308,7 +307,7 @@ function * createSubmission (authUser, files, entity) {
logger.info('Prepared submission create event payload to pass to THE bus')

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Inserting records in DynamoDB doesn't return any response
// Hence returning the entity which is in compliance with Swagger
Expand Down Expand Up @@ -411,7 +410,7 @@ function * _updateSubmission (authUser, submissionId, entity) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)

// Updating records in DynamoDB doesn't return any response
// Hence returning the response which will be in compliance with Swagger
Expand Down Expand Up @@ -503,7 +502,7 @@ function * deleteSubmission (submissionId) {
}

// Post to Bus API using Client
yield busApiClient.postEvent(reqBody)
yield helper.postToBusApi(reqBody)
}

deleteSubmission.schema = {
Expand Down