Skip to content

Commit ff01caa

Browse files
authored
Merge pull request #53 from topcoder-platform/dev
Process user login event
2 parents 912a45c + ecb3146 commit ff01caa

File tree

4 files changed

+61
-2
lines changed

4 files changed

+61
-2
lines changed

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module.exports = {
2222
// Kafka topics related to Creation and Update of User
2323
USER_CREATE_TOPIC: process.env.USER_CREATE_TOPIC || 'identity.notification.create',
2424
USER_UPDATE_TOPIC: process.env.USER_UPDATE_TOPIC || 'identity.notification.update',
25+
USER_LOGIN_TOPIC: process.env.USER_LOGIN_TOPIC || 'member.action.login',
2526

2627
// Kafka output topics to be consumed by member-processor-es to save member profile data in Elasticsearch
2728
USER_CREATE_OUTPUT_TOPIC: process.env.USER_CREATE_OUTPUT_TOPIC || 'member.action.profile.create',

package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,8 @@
3434
"env": [
3535
"mocha"
3636
]
37+
},
38+
"volta": {
39+
"node": "8.11.3"
3740
}
3841
}

src/app.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ const dataHandler = async (messageSet, topic, partition) => {
6868
logger.info('Ignore message.')
6969
}
7070
break
71+
case config.USER_LOGIN_TOPIC:
72+
await ProcessorService.processUserLogin(messageJSON, producer)
73+
break
7174
default:
7275
throw new Error(`Invalid topic: ${topic}`)
7376
}
@@ -103,7 +106,7 @@ producer
103106
logger.info('Starting kafka consumer')
104107
return consumer
105108
.init([{
106-
subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC],
109+
subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC, config.USER_LOGIN_TOPIC],
107110
handler: dataHandler
108111
}])
109112
})

src/services/ProcessorService.js

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,61 @@ async function processUpdateUser (message, producer) {
199199

200200
processUpdateUser.schema = processCreateUser.schema
201201

202+
/**
203+
* Process the User login event
204+
* @param {Object} message the Kafka message in JSON format
205+
* @param {Object} producer the Kafka producer
206+
*/
207+
async function processUserLogin (message, producer) {
208+
const member = message.payload
209+
const record = {
210+
TableName: config.AMAZON_AWS_DYNAMODB_MEMBER_PROFILE_TABLE,
211+
Key: {
212+
userId: member.userId
213+
},
214+
UpdateExpression: `set lastLoginDate = :lastLoginDate`,
215+
ExpressionAttributeValues: {
216+
':lastLoginDate': member.lastLoginDate,
217+
},
218+
}
219+
if (member.loginCount) {
220+
record['UpdateExpression'] = record['UpdateExpression'] + `, loginCount = :loginCount`
221+
record['ExpressionAttributeValues'][':loginCount'] = member.loginCount
222+
}
223+
await helper.updateRecord(record)
224+
logger.info('DynamoDB record is updated successfully.')
225+
226+
// send output message to Kafka
227+
const outputMessage = {
228+
topic: config.USER_UPDATE_OUTPUT_TOPIC,
229+
originator: config.OUTPUT_MESSAGE_ORIGINATOR,
230+
timestamp: new Date().toISOString(),
231+
'mime-type': 'application/json',
232+
payload: member
233+
}
234+
await producer.send({ topic: outputMessage.topic, message: { value: JSON.stringify(outputMessage) } })
235+
logger.info(`Member profile update message is successfully sent to Kafka topic ${outputMessage.topic}`)
236+
}
237+
238+
processUserLogin.schema = {
239+
message: joi.object().keys({
240+
topic: joi.string().required(),
241+
originator: joi.string().required(),
242+
timestamp: joi.date().required(),
243+
'mime-type': joi.string().required(),
244+
payload: joi.object().keys({
245+
userId: joi.number().required(),
246+
loginCount: joi.number(),
247+
lastLoginDate: joi.date().raw().required(),
248+
}).unknown(true).required()
249+
}).required(),
250+
producer: joi.object().required()
251+
}
252+
202253
module.exports = {
203254
processCreateUser,
204-
processUpdateUser
255+
processUpdateUser,
256+
processUserLogin
205257
}
206258

207259
logger.buildService(module.exports)

0 commit comments

Comments
 (0)