Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 1d3ab46

Browse files
update processor to handle challenge updates
1 parent 0b9dbaa commit 1d3ab46

File tree

12 files changed

+298
-42
lines changed

12 files changed

+298
-42
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,4 @@ workflows:
7979
context : org-global
8080
filters:
8181
branches:
82-
only: master
82+
only: master

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
node_modules
2-
32
.env
3+
coverage
4+
.nyc_output

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ fi
2020
if [ "$UPDATE_CACHE" == 1 ]
2121
then
2222
docker cp app:/$APP_NAME/node_modules .
23-
fi
23+
fi

config/default.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ module.exports = {
1414
// Kafka group id
1515
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'autopilot-processor',
1616
NOTIFICATION_CREATE_TOPIC: process.env.NOTIFICATION_CREATE_TOPIC || 'challenge.notification.create',
17+
NOTIFICATION_UPDATE_TOPIC: process.env.NOTIFICATION_UPDATE_TOPIC || 'challenge.notification.update',
1718

18-
SCHEDULE_API_URL: process.env.SCHEDULE_API_URL,
19+
SCHEDULE_API_URL: process.env.SCHEDULE_API_URL || 'https://api.topcoder-dev.com/v5/schedules',
1920
CHALLENGE_API_URL: process.env.CHALLENGE_API_URL || 'https://api.topcoder-dev.com/v5/challenges',
2021

2122
AUTH0_URL: process.env.AUTH0_URL,

docker-kafka/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: '3'
1+
version: "3"
22
services:
33
zookeeper:
44
image: wurstmeister/zookeeper
@@ -12,5 +12,5 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
KAFKA_CREATE_TOPICS: "challenge.notification.create:1:1"
15+
KAFKA_CREATE_TOPICS: "challenge.notification.create:1:1,challenge.notification.update:1:1"
1616
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

src/app.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
2020
*/
2121
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
2222
const message = m.message.value.toString('utf8')
23-
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
24-
m.offset}; Message: ${message}.`)
23+
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`)
2524
let messageJSON
2625
try {
2726
messageJSON = JSON.parse(message)
@@ -43,8 +42,11 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
4342
}
4443

4544
try {
46-
await ProcessorService.processCreate(messageJSON)
47-
45+
if (topic === config.NOTIFICATION_CREATE_TOPIC) {
46+
await ProcessorService.processCreate(messageJSON)
47+
} else {
48+
await ProcessorService.processUpdate(messageJSON)
49+
}
4850
logger.debug('Successfully processed message')
4951
} catch (err) {
5052
logger.logFullError(err)
@@ -68,7 +70,7 @@ const check = () => {
6870
return connected
6971
}
7072

71-
const topics = [config.NOTIFICATION_CREATE_TOPIC]
73+
const topics = [config.NOTIFICATION_CREATE_TOPIC, config.NOTIFICATION_UPDATE_TOPIC]
7274

7375
logger.info('Starting kafka consumer')
7476
consumer

src/common/helper.js

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ function getErrorWithStatus (message, statusCode) {
5353
/**
5454
* Get challenge by id
5555
* @param challengeId the challenge id
56+
* @returns {object} challenge
5657
*/
5758
async function getChallenge (challengeId) {
5859
const url = `${config.CHALLENGE_API_URL}/${challengeId}`
@@ -76,30 +77,67 @@ async function getChallenge (challengeId) {
7677
}
7778
}
7879

80+
/**
81+
* Get challenge by id from Schedule Api
82+
* @param challengeId the challenge id
83+
* @returns {Array} array of events
84+
*/
85+
async function getEventsFromScheduleApi (challengeId) {
86+
const url = `${config.SCHEDULE_API_URL}?externalId=${challengeId}`
87+
88+
logger.debug(`request GET ${url}`)
89+
try {
90+
const res = await axios.get(url)
91+
return res.data
92+
} catch (err) {
93+
logger.error(err.message)
94+
95+
if (err.response) {
96+
if (err.response.status === 404) {
97+
logger.error(`The Challenge with the id: ${challengeId} not exist`)
98+
throw getErrorWithStatus('[resource_not_found_exception]', 404)
99+
}
100+
}
101+
throw Error(`get ${url} failed`)
102+
}
103+
}
104+
79105
/**
80106
* Create events from challenge object
81107
* @param challenge the challenge object
82108
*/
83109
function getEventsFromPhases (challenge) {
84110
const events = []
85-
// for each phase, create 2 events for the scheduledStartDate and scheduledEndDate respectively
111+
const dateBasedEvents = {}
112+
113+
86114
for (const phase of challenge.phases) {
87-
const event = {
88-
phaseId: phase.phaseId,
89-
challengeId: challenge.id
115+
if (!dateBasedEvents[phase.scheduledStartDate]) {
116+
dateBasedEvents[phase.scheduledStartDate] = []
90117
}
91-
92-
events.push({
93-
...event,
94-
status: 'starting',
95-
scheduleTime: phase.scheduledStartDate
118+
if (!dateBasedEvents[phase.scheduledEndDate]) {
119+
dateBasedEvents[phase.scheduledEndDate] = []
120+
}
121+
dateBasedEvents[phase.scheduledStartDate].push({
122+
phaseId: phase.phaseId,
123+
isOpen: true
96124
})
97-
events.push({
98-
...event,
99-
status: 'closing',
100-
scheduleTime: phase.scheduledEndDate
125+
dateBasedEvents[phase.scheduledEndDate].push({
126+
phaseId: phase.phaseId,
127+
isOpen: false
101128
})
102129
}
130+
131+
_.each(dateBasedEvents, (eventData, scheduleTime) => {
132+
events.push({
133+
externalId: challenge.id,
134+
scheduleTime,
135+
payload: {
136+
phases: eventData
137+
}
138+
})
139+
})
140+
103141
return events
104142
}
105143

@@ -115,19 +153,15 @@ async function createEventsInExecutor (events) {
115153
for (const event of events) {
116154
// schedule executor api payload
117155
const executorPayload = {
118-
url: `${config.CHALLENGE_API_URL}/${event.challengeId}`,
156+
url: `${config.CHALLENGE_API_URL}/${event.externalId}`,
157+
externalId: event.externalId,
119158
method: 'patch',
120159
scheduleTime: event.scheduleTime,
121160
headers: {
122161
'content-type': 'application/json',
123162
Authorization: `Bearer ${token}`
124163
},
125-
payload: JSON.stringify({
126-
phases: [{
127-
phaseId: event.phaseId,
128-
isOpen: event.status === 'starting'
129-
}]
130-
})
164+
payload: JSON.stringify(event.payload)
131165
}
132166

133167
// call executor api
@@ -140,10 +174,37 @@ async function createEventsInExecutor (events) {
140174
}
141175
}
142176

177+
/**
178+
* Delete events in executor app
179+
* @param events the events array
180+
*/
181+
async function deleteEventsInExecutor (events) {
182+
const url = config.SCHEDULE_API_URL
183+
try {
184+
for (const event of events) {
185+
// schedule executor api payload
186+
const executorPayload = {
187+
id: event.id
188+
}
189+
190+
// call executor api
191+
logger.debug(`request DELETE ${url}`)
192+
await axios.delete(`${url}`, { data: executorPayload })
193+
}
194+
} catch (err) {
195+
logger.error(err.message)
196+
throw err
197+
}
198+
}
199+
143200
module.exports = {
144201
getKafkaOptions,
145202
getTopcoderM2Mtoken,
146203
getChallenge,
204+
getEventsFromScheduleApi,
147205
getEventsFromPhases,
148-
createEventsInExecutor
206+
compareEvents,
207+
updateChallenge,
208+
createEventsInExecutor,
209+
deleteEventsInExecutor
149210
}

src/services/ProcessorService.js

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Joi.id = () => Joi.string().uuid().required()
1212
* @param {Object} message the kafka message
1313
* @returns {Promise}
1414
*/
15-
async function processCreate (message) {
15+
async function processCreate(message) {
1616
// get challenge
1717
const challenge = await helper.getChallenge(message.payload.id)
1818
// create events
@@ -23,6 +23,22 @@ async function processCreate (message) {
2323
logger.info(`processing of the record completed, id: ${message.payload.id}`)
2424
}
2525

26+
/**
27+
* Process update entity message
28+
* @param {Object} message the kafka message
29+
* @returns {Promise}
30+
*/
31+
async function processUpdate(message) {
32+
const sourceChallenge = await helper.getChallenge(message.payload.id)
33+
const newEvents = helper.getEventsFromPhases(sourceChallenge)
34+
const oldEvents = await helper.getEventsFromScheduleApi(message.payload.id)
35+
logger.info(`Deleting existing events for challenge ${message.payload.id}`)
36+
await helper.deleteEventsInExecutor(oldEvents)
37+
logger.info(`Creating events for challenge ${message.payload.id}`)
38+
await helper.createEventsInExecutor(newEvents)
39+
logger.info(`processing of the record completed, id: ${message.payload.id}`)
40+
}
41+
2642
processCreate.schema = {
2743
message: Joi.object().keys({
2844
topic: Joi.string().required(),
@@ -34,9 +50,21 @@ processCreate.schema = {
3450
}).required().unknown(true)
3551
}).required()
3652
}
53+
processUpdate.schema = {
54+
message: Joi.object().keys({
55+
topic: Joi.string().required(),
56+
originator: Joi.string().required(),
57+
timestamp: Joi.date().required(),
58+
'mime-type': Joi.string().required(),
59+
payload: Joi.object().keys({
60+
id: Joi.id()
61+
}).required().unknown(true)
62+
}).required()
63+
}
3764

3865
module.exports = {
39-
processCreate
66+
processCreate,
67+
processUpdate
4068
}
4169

4270
logger.buildService(module.exports)

0 commit comments

Comments
 (0)