From 2f6608a7cc9dd2754154eca7dd3a41bd6a5d7a92 Mon Sep 17 00:00:00 2001 From: eisbilir Date: Tue, 3 Oct 2023 00:04:15 +0300 Subject: [PATCH] feat: process login events --- config/default.js | 1 + package.json | 3 ++ src/app.js | 5 ++- src/services/ProcessorService.js | 54 +++++++++++++++++++++++++++++++- 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/config/default.js b/config/default.js index d07bd2c..c5a6671 100755 --- a/config/default.js +++ b/config/default.js @@ -22,6 +22,7 @@ module.exports = { // Kafka topics related to Creation and Update of User USER_CREATE_TOPIC: process.env.USER_CREATE_TOPIC || 'identity.notification.create', USER_UPDATE_TOPIC: process.env.USER_UPDATE_TOPIC || 'identity.notification.update', + USER_LOGIN_TOPIC: process.env.USER_LOGIN_TOPIC || 'member.action.login', // Kafka output topics to be consumed by member-processor-es to save member profile data in Elasticsearch USER_CREATE_OUTPUT_TOPIC: process.env.USER_CREATE_OUTPUT_TOPIC || 'member.action.profile.create', diff --git a/package.json b/package.json index 1f343b6..05cb141 100755 --- a/package.json +++ b/package.json @@ -34,5 +34,8 @@ "env": [ "mocha" ] + }, + "volta": { + "node": "8.11.3" } } diff --git a/src/app.js b/src/app.js index dea0ff1..852fce7 100755 --- a/src/app.js +++ b/src/app.js @@ -68,6 +68,9 @@ const dataHandler = async (messageSet, topic, partition) => { logger.info('Ignore message.') } break + case config.USER_LOGIN_TOPIC: + await ProcessorService.processUserLogin(messageJSON, producer) + break default: throw new Error(`Invalid topic: ${topic}`) } @@ -103,7 +106,7 @@ producer logger.info('Starting kafka consumer') return consumer .init([{ - subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC], + subscriptions: [config.USER_CREATE_TOPIC, config.USER_UPDATE_TOPIC, config.USER_LOGIN_TOPIC], handler: dataHandler }]) }) diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 9b21100..099eda9 100755 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -199,9 +199,61 @@ async function processUpdateUser (message, producer) { processUpdateUser.schema = processCreateUser.schema +/** + * Process the User login event + * @param {Object} message the Kafka message in JSON format + * @param {Object} producer the Kafka producer + */ +async function processUserLogin (message, producer) { + const member = message.payload + const record = { + TableName: config.AMAZON_AWS_DYNAMODB_MEMBER_PROFILE_TABLE, + Key: { + userId: member.userId + }, + UpdateExpression: `set lastLoginDate = :lastLoginDate`, + ExpressionAttributeValues: { + ':lastLoginDate': member.lastLoginDate, + }, + } + if (member.loginCount) { + record['UpdateExpression'] = record['UpdateExpression'] + `, loginCount = :loginCount` + record['ExpressionAttributeValues'][':loginCount'] = member.loginCount + } + await helper.updateRecord(record) + logger.info('DynamoDB record is updated successfully.') + + // send output message to Kafka + const outputMessage = { + topic: config.USER_UPDATE_OUTPUT_TOPIC, + originator: config.OUTPUT_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload: member + } + await producer.send({ topic: outputMessage.topic, message: { value: JSON.stringify(outputMessage) } }) + logger.info(`Member profile update message is successfully sent to Kafka topic ${outputMessage.topic}`) +} + +processUserLogin.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + userId: joi.number().required(), + loginCount: joi.number(), + lastLoginDate: joi.date().raw().required(), + }).unknown(true).required() + }).required(), + producer: joi.object().required() +} + module.exports = { processCreateUser, - processUpdateUser + processUpdateUser, + processUserLogin } logger.buildService(module.exports)