diff --git a/README.md b/README.md index 568e23c..5b0b374 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Identity - Data Processor +Creates user in u-bahn when they sign up on Topcoder. Also updates their availability status in u-bahn based on their account activation status in Topcoder + ## Dependencies - Nodejs(v12+) diff --git a/VERIFICATION.md b/VERIFICATION.md index 975e232..010755d 100644 --- a/VERIFICATION.md +++ b/VERIFICATION.md @@ -1,41 +1,6 @@ # Verification -``` -{ - "topic":"identity.notification.create", - "originator":"u-bahn-api", - "timestamp":"2019-07-08T00:00:00.000Z", - "mime-type":"application/json", - "payload":{ - "id":"90064000", - "modifiedBy":null, - "modifiedAt":"2021-01-05T14:01:40.336Z", - "createdBy":null, - "createdAt":"2021-01-05T14:01:40.336Z", - "handle":"theuserhandle", - "email":"foo@bar.com", - "firstName":"theuserfirstname", - "lastName":"theuserlastname", - "credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false}, - "profiles":null, - "status":"A", - "country":{ - "isoAlpha3Code": "IND" - }, - "regSource":"null", - "utmSource":"null", - "utmMedium":"null", - "utmCampaign":"null", - "roles":null, - "ssoLogin":false, - "active":true, - "profile":null, - "emailActive":true - } -} -``` - -Additionally, you will be entering the messages into only one topic: +You will be entering the messages into only one topic: ``` docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic identity.notification.create @@ -45,6 +10,31 @@ docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-produ 2. write message: `{"recipients":[],"notificationType":"useractivation"}` 3. Watch the app console, It will show error message. -4. write message: - `{"topic":"identity.notification.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"90064000","modifiedBy":null,"modifiedAt":"2021-01-05T14:01:40.336Z","createdBy":null,"createdAt":"2021-01-05T14:01:40.336Z","handle":"theuserhandle","email":"foo@bar.com","firstName":"theuserfirstname","lastName":"theuserlastname","credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},"profiles":null,"status":"A","country":{"isoAlpha3Code":"IND"},"regSource":"null","utmSource":"null","utmMedium":"null","utmCampaign":"null","roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}` -5. Watch the app console, It will show message successfully handled. +4. Write message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":null,"createdBy":null,"createdAt":null,"handle":"theuserhandle","email":"theuserhandle@gmail.com","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"U","country":{"code":"040","name":"Austria","isoAlpha2Code":"AT","isoAlpha3Code":"AUT"},"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":false,"profile":null,"emailActive":false}}` +5. Watch the app console. It will show message successfully handled. The log should look like: + ``` + info: user: tezsachin78033 created + debug: Sleeping for 1000 ms + info: external profile: 36ed815b-3da1-49f1-a043-aaed0a4e81ad created + debug: Sleeping for 1000 ms + info: user attribute: isAvailable created + debug: Sleeping for 1000 ms + info: user attribute: company created + debug: Sleeping for 1000 ms + info: user attribute: title created + debug: Sleeping for 1000 ms + info: user attribute: location created + debug: Sleeping for 1000 ms + info: user attribute: email created + debug: EXIT handle + debug: Successfully processed message + debug: Commiting offset after processing message + ``` +6. Now, write the following message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":"2021-04-07T15:02:18.72Z","createdBy":null,"createdAt":"2021-04-07T15:02:18.72Z","handle":"theuserhandle","email":"theuserhandle@gmail.com","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"A","country":null,"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}` +7. Watch the app console. It will show message successfully handled. The log should look like: + ``` + info: user attribute: isAvailable updated + debug: EXIT handle + debug: Successfully processed message + debug: Commiting offset after processing message + ``` diff --git a/src/app.js b/src/app.js index c46ba96..f522a7e 100644 --- a/src/app.js +++ b/src/app.js @@ -47,7 +47,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a return } try { - await ProcessorService.processCreate(messageJSON) + await ProcessorService.handle(messageJSON) logger.debug('Successfully processed message') } catch (err) { logger.logFullError(err) diff --git a/src/common/helper.js b/src/common/helper.js index fb9fe62..bd44744 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -60,6 +60,30 @@ async function getTopcoderToken () { return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET) } +/** + * Retrieves the user from u-bahn using their handle + * Returns null if no user exists, the user id otherwise + * @param {String} handle The member handle + * @param {String} token The auth token + */ +async function getUserId (handle, token) { + const res = await axios.get(`${config.UBAHN_API_URL}/users`, { + headers: { + Authorization: `Bearer ${token}` + }, + params: { + handle + } + }) + const user = res.data.filter(u => u.handle === handle)[0] + + if (user) { + return user.id + } + + return null +} + /** * Create a new User * @param {Object} body @@ -178,6 +202,17 @@ async function createUserAttribute (userId, attributeId, value, token) { await axios.post(`${config.UBAHN_API_URL}/users/${userId}/attributes`, { attributeId, value }, { headers: { Authorization: `Bearer ${token}` } }) } +/** + * Create user attribute + * @param {String} userId + * @param {String} attributeId + * @param {String} value + * @param {String} token + */ +async function updateUserAttribute (userId, attributeId, value, token) { + await axios.patch(`${config.UBAHN_API_URL}/users/${userId}/attributes/${attributeId}`, { value }, { headers: { Authorization: `Bearer ${token}` } }) +} + /** * Create external profile * @param {String} userId @@ -214,5 +249,7 @@ module.exports = { createUser, createUserAttribute, createExternalProfile, - createUserSkill + createUserSkill, + getUserId, + updateUserAttribute } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 109f582..c9af6d0 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -7,12 +7,52 @@ const Joi = require('@hapi/joi') const logger = require('../common/logger') const helper = require('../common/helper') +const MEMBER_PROFILE_URL_PREFIX = 'https://www.topcoder.com/members/' + /** * Process identity create entity message * @param {Object} message the kafka message */ -async function processCreate (message) { +async function handle (message) { + // Check if the user already exists in u-bahn + // If yes, then proceed to only update the availability status + // If not, then proceed to create the user and other associated data in u-bahn const ubahnToken = await helper.getUbahnToken() + const userId = await helper.getUserId(message.payload.handle, ubahnToken) + + if (userId) { + await processUpdate(message, userId, ubahnToken) + } else { + await processCreate(message, ubahnToken) + } +} + +handle.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().required(), + handle: Joi.string().required(), + firstName: Joi.string().required(), + lastName: Joi.string().required(), + email: Joi.string().email().required(), + country: Joi.object().keys({ + isoAlpha3Code: Joi.string().required() + }).unknown(true).allow(null), + active: Joi.boolean() + }).required().unknown(true) + }).required().unknown(true) +} + +/** + * Create the user and associated data in u-bahn + * @param {Object} message the kafka message + * @param {String} ubahnToken the auth token + */ +async function processCreate (message, ubahnToken) { const organizationId = await helper.getOrganizationId(ubahnToken) const attributes = await helper.getAttributes(ubahnToken) const location = message.payload.country.isoAlpha3Code @@ -20,10 +60,10 @@ async function processCreate (message) { const userId = await helper.createUser(_.pick(message.payload, 'handle', 'firstName', 'lastName'), ubahnToken) logger.info(`user: ${message.payload.handle} created`) helper.sleep() - await helper.createExternalProfile(userId, { organizationId, uri: 'uri', externalId: message.payload.id, isInactive: false }, ubahnToken) + await helper.createExternalProfile(userId, { organizationId, uri: `${MEMBER_PROFILE_URL_PREFIX}${message.payload.handle}`, externalId: message.payload.id, isInactive: false }, ubahnToken) logger.info(`external profile: ${organizationId} created`) helper.sleep() - await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), 'true', ubahnToken) + await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken) logger.info('user attribute: isAvailable created') helper.sleep() await helper.createUserAttribute(userId, _.get(attributes, 'company'), 'Topcoder', ubahnToken) @@ -34,28 +74,30 @@ async function processCreate (message) { helper.sleep() await helper.createUserAttribute(userId, _.get(attributes, 'location'), location, ubahnToken) logger.info('user attribute: location created') + + // Custom attribute. May or may not exist + if (_.get(attributes, 'email')) { + helper.sleep() + await helper.createUserAttribute(userId, _.get(attributes, 'email'), message.payload.email, ubahnToken) + logger.info('user attribute: email created') + } } -processCreate.schema = { - message: Joi.object().keys({ - topic: Joi.string().required(), - originator: Joi.string().required(), - timestamp: Joi.date().required(), - 'mime-type': Joi.string().required(), - payload: Joi.object().keys({ - id: Joi.string().required(), - handle: Joi.string().required(), - firstName: Joi.string().required(), - lastName: Joi.string().required(), - country: Joi.object().keys({ - isoAlpha3Code: Joi.string().required() - }).required().unknown(true) - }).required().unknown(true) - }).required().unknown(true) +/** + * Updates the user's availability status in u-bahn + * @param {Object} message the kafka message + * @param {String} userId the u-bahn user id + * @param {String} ubahnToken the auth token + */ +async function processUpdate (message, userId, ubahnToken) { + const attributes = await helper.getAttributes(ubahnToken) + + await helper.updateUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken) + logger.info('user attribute: isAvailable updated') } module.exports = { - processCreate + handle } logger.buildService(module.exports)