Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 574b311

Browse files
#21 - Listen on aggregate submission topic events
1 parent f25b1e4 commit 574b311

11 files changed

+21
-15
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ The following parameters can be set in config files or in env variables:
2020
if provided, it can be either path to private key file or private key content
2121
- KAFKA_GROUP_ID: consumer group id; default value: 'scorecard-processor'
2222
- REVIEW_TOPIC : Review topic, default value is 'submission.notification.score'
23-
- CREATE_SUBMISSION_TOPIC : create submission topic, default value is 'submission.notification.create'
23+
- AGGREGATE_SUBMISSION_TOPIC : aggregate submission topic, default value is 'submission.notification.aggregate'
2424
- SUBMISSION_API_URL: submission api url, default is 'https://api.topcoder-dev.com/v5'
2525
- CHALLENGE_API_URL: challenge API URL, default is 'https://api.topcoder-dev.com/v4/challenges'
2626
- SCORECARD_API_URL: scorecard API URL, default is 'http://localhost:4000/scorecards'
@@ -53,7 +53,7 @@ Configuration for the tests is at `config/test.js`, only add such new configurat
5353
- note that the zookeeper server is at localhost:2181, and Kafka server is at localhost:9092
5454
- use another terminal, go to same directory, create the needed topics:
5555
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic submission.notification.score`
56-
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic submission.notification.create`
56+
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic submission.notification.aggregate`
5757
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic avscan.action.scan`
5858
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic or.action.review`
5959
- verify that the topics are created:

Verification.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ curl --request POST --url https://topcoder-dev.auth0.com/oauth/token --header 'c
3737
`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic avscan.action.scan --from-beginning`
3838
- start kafka-console-consumer to listen to topic `or.action.review`:
3939
`bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic or.action.review --from-beginning`
40-
- start kafka-console-producer to write messages to `submission.notification.create` topic:
41-
`bin/kafka-console-producer.sh --broker-list localhost:9092 --topic submission.notification.create`
40+
- start kafka-console-producer to write messages to `submission.notification.aggregate` topic:
41+
`bin/kafka-console-producer.sh --broker-list localhost:9092 --topic submission.notification.aggregate`
4242
- write message:
43-
`{ "topic": "submission.notification.create", "originator": "or-app", "timestamp": "2019-02-25T00:00:00", "mime-type": "application/json", "payload": { "resource": "submission", "id": "104366f8-f46b-45db-a971-11bc69e6c8ff", "type": "Contest Submission", "url": "https://s3.amazonaws.com/topcoder-dev-submissions-dmz/30054740-8547899-SUBMISSION_ZIP-1554188341581.zip", "memberId": 8547899, "challengeId": 30049360, "created": "2019-04-02T06:59:29.785Z", "updated": "2019-04-02T06:59:29.785Z", "createdBy": "TonyJ", "updatedBy": "TonyJ", "submissionPhaseId": 764644, "fileType": "zip", "isFileSubmission": false } }`
43+
`{ "topic": "submission.notification.aggregate", "originator": "or-app", "timestamp": "2019-02-25T00:00:00", "mime-type": "application/json", "payload": { "originalTopic": "submission.notification.create", "resource": "submission", "id": "104366f8-f46b-45db-a971-11bc69e6c8ff", "type": "Contest Submission", "url": "https://s3.amazonaws.com/topcoder-dev-submissions-dmz/30054740-8547899-SUBMISSION_ZIP-1554188341581.zip", "memberId": 8547899, "challengeId": 30049360, "created": "2019-04-02T06:59:29.785Z", "updated": "2019-04-02T06:59:29.785Z", "createdBy": "TonyJ", "updatedBy": "TonyJ", "submissionPhaseId": 764644, "fileType": "zip", "isFileSubmission": false } }`
4444
- watch the app console, it should show logging of processing the message:
4545
```
4646
debug: Get M2M token

config/default.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ module.exports = {
1818
REVIEW_TOPIC: process.env.REVIEW_TOPIC || 'submission.notification.score',
1919
// Kafka topic related to create submission
2020
CREATE_SUBMISSION_TOPIC: process.env.CREATE_SUBMISSION_TOPIC || 'submission.notification.create',
21+
// Kafka topic related to all events related to submission
22+
AGGREGATE_SUBMISSION_TOPIC: process.env.AGGREGATE_SUBMISSION_TOPIC || 'submission.notification.aggregate',
2123

2224
SUBMISSION_API_URL: process.env.SUBMISSION_API_URL || 'https://api.topcoder-dev.com/v5',
2325

src/app.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
4646
return (async () => {
4747
if (topic === config.REVIEW_TOPIC) {
4848
await ReviewProcessorService.processReview(messageJSON)
49-
} else if (topic === config.CREATE_SUBMISSION_TOPIC) {
49+
} else if (topic === config.AGGREGATE_SUBMISSION_TOPIC &&
50+
messageJSON.payload.originalTopic === config.CREATE_SUBMISSION_TOPIC) {
5051
await SubmissionProcessorService.processSubmission(messageJSON)
5152
} else {
5253
throw new Error(`Invalid topic: ${topic}`)
@@ -76,7 +77,7 @@ function check () {
7677
}
7778

7879
if (consumer) {
79-
const topics = [config.REVIEW_TOPIC, config.CREATE_SUBMISSION_TOPIC]
80+
const topics = [config.REVIEW_TOPIC, config.AGGREGATE_SUBMISSION_TOPIC]
8081

8182
consumer
8283
.init([{

src/services/SubmissionProcessorService.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ function generateScoreSystemMessage (message, scoreSystem) {
6262
}
6363
if (scoreSystem.topic === 'or.action.review') {
6464
const payload = message.payload
65-
payload.eventType = message.topic === config.CREATE_SUBMISSION_TOPIC ? 'CREATE' : 'UPDATE'
65+
payload.eventType = 'CREATE'
66+
delete payload.originalTopic
6667
return {
6768
topic: scoreSystem.topic,
6869
originator: 'tc-scorecard-processor',
@@ -164,6 +165,7 @@ processSubmission.schema = {
164165
timestamp: joi.date().required(),
165166
'mime-type': joi.string().required(),
166167
payload: joi.object().keys({
168+
originalTopic: joi.string().valid(config.CREATE_SUBMISSION_TOPIC).required(),
167169
resource: joi.string().valid('submission').required(),
168170
id: joi.string().guid().required(),
169171
type: joi.string().valid('Contest Submission').required(),

test/common/testData.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ const testTopics = {
5454
}
5555
}
5656
},
57-
'submission.notification.create': {
57+
'submission.notification.aggregate': {
5858
operation: 'create',
5959
requiredFields: ['topic', 'originator', 'timestamp', 'mime-type',
6060
'payload.resource', 'payload.id', 'payload.url',
@@ -65,11 +65,12 @@ const testTopics = {
6565
dateFields: ['timestamp'],
6666
booleanFields: [],
6767
testMessage: {
68-
topic: 'submission.notification.create',
68+
topic: 'submission.notification.aggregate',
6969
originator: 'or-app',
7070
timestamp: '2019-02-25T00:00:00',
7171
'mime-type': 'application/json',
7272
payload: {
73+
originalTopic: 'submission.notification.create',
7374
resource: 'submission',
7475
id: '104366f8-f46b-45db-a971-11bc69e6c8ff',
7576
type: 'Contest Submission',

test/common/testHelper.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async function consumeMessages () {
203203
// consume and commit all not processed messages
204204
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
205205
await consumer.init([{
206-
subscriptions: [config.REVIEW_TOPIC, config.CREATE_SUBMISSION_TOPIC,
206+
subscriptions: [config.REVIEW_TOPIC, config.AGGREGATE_SUBMISSION_TOPIC,
207207
testData.avScanTopic, testData.reviewActionTopic],
208208
handler: (messageSet, topic, partition) => Promise.each(messageSet,
209209
(m) => consumer.commitOffset({ topic, partition, offset: m.offset }))

test/e2e/review.processor.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('Topcoder - Scorecard Review Processor E2E Test', () => {
8383
testHelper.assertErrorMessage('Invalid or not supported eventType: INVALID_TYPE')
8484
})
8585

86-
for (const op of ['create', 'update']) {
86+
for (const op of ['create']) {
8787
let { requiredFields, integerFields, stringFields, testMessage } = testTopics[op]
8888

8989
it('test invalid parameters, field submissionId incorrect', async () => {

test/e2e/submission.processor.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ describe('Topcoder - Scorecard Submission Processor E2E Test', () => {
2626
testHelper.clearInterceptedLogging()
2727
})
2828

29-
for (const topic of [config.CREATE_SUBMISSION_TOPIC]) {
29+
for (const topic of [config.AGGREGATE_SUBMISSION_TOPIC]) {
3030
const { operation, requiredFields, integerFields, stringFields, dateFields,
3131
booleanFields, testMessage } = testTopics[topic]
3232

test/unit/review.processor.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ describe('Topcoder - Scorecard Review Processor Unit Test', () => {
7979
}
8080
})
8181

82-
for (const op of ['create', 'update']) {
82+
for (const op of ['create']) {
8383
let { requiredFields, integerFields, stringFields, testMessage } = testTopics[op]
8484

8585
it('test invalid parameters, field submissionId incorrect', async () => {

test/unit/submission.processor.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ describe('Topcoder - Scorecard Submission Processor Unit Test', () => {
2424
testHelper.clearInterceptedLogging()
2525
})
2626

27-
for (const topic of [config.CREATE_SUBMISSION_TOPIC]) {
27+
for (const topic of [config.AGGREGATE_SUBMISSION_TOPIC]) {
2828
const { operation, requiredFields, integerFields, stringFields, dateFields,
2929
booleanFields, testMessage } = testTopics[topic]
3030

0 commit comments

Comments
 (0)