Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Issue 7 #8

Merged
merged 1 commit into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Identity - Data Processor

Creates user in u-bahn when they sign up on Topcoder. Also updates their availability status in u-bahn based on their account activation status in Topcoder

## Dependencies

- Nodejs(v12+)
Expand Down
68 changes: 29 additions & 39 deletions VERIFICATION.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,6 @@
# Verification

```
{
"topic":"identity.notification.create",
"originator":"u-bahn-api",
"timestamp":"2019-07-08T00:00:00.000Z",
"mime-type":"application/json",
"payload":{
"id":"90064000",
"modifiedBy":null,
"modifiedAt":"2021-01-05T14:01:40.336Z",
"createdBy":null,
"createdAt":"2021-01-05T14:01:40.336Z",
"handle":"theuserhandle",
"email":"[email protected]",
"firstName":"theuserfirstname",
"lastName":"theuserlastname",
"credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},
"profiles":null,
"status":"A",
"country":{
"isoAlpha3Code": "IND"
},
"regSource":"null",
"utmSource":"null",
"utmMedium":"null",
"utmCampaign":"null",
"roles":null,
"ssoLogin":false,
"active":true,
"profile":null,
"emailActive":true
}
}
```

Additionally, you will be entering the messages into only one topic:
You will be entering the messages into only one topic:

```
docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic identity.notification.create
Expand All @@ -45,6 +10,31 @@ docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-produ
2. write message:
`{"recipients":[],"notificationType":"useractivation"}`
3. Watch the app console, It will show error message.
4. write message:
`{"topic":"identity.notification.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"90064000","modifiedBy":null,"modifiedAt":"2021-01-05T14:01:40.336Z","createdBy":null,"createdAt":"2021-01-05T14:01:40.336Z","handle":"theuserhandle","email":"[email protected]","firstName":"theuserfirstname","lastName":"theuserlastname","credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},"profiles":null,"status":"A","country":{"isoAlpha3Code":"IND"},"regSource":"null","utmSource":"null","utmMedium":"null","utmCampaign":"null","roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
5. Watch the app console, It will show message successfully handled.
4. Write message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":null,"createdBy":null,"createdAt":null,"handle":"theuserhandle","email":"[email protected]","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"U","country":{"code":"040","name":"Austria","isoAlpha2Code":"AT","isoAlpha3Code":"AUT"},"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":false,"profile":null,"emailActive":false}}`
5. Watch the app console. It will show message successfully handled. The log should look like:
```
info: user: tezsachin78033 created
debug: Sleeping for 1000 ms
info: external profile: 36ed815b-3da1-49f1-a043-aaed0a4e81ad created
debug: Sleeping for 1000 ms
info: user attribute: isAvailable created
debug: Sleeping for 1000 ms
info: user attribute: company created
debug: Sleeping for 1000 ms
info: user attribute: title created
debug: Sleeping for 1000 ms
info: user attribute: location created
debug: Sleeping for 1000 ms
info: user attribute: email created
debug: EXIT handle
debug: Successfully processed message
debug: Commiting offset after processing message
```
6. Now, write the following message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":"2021-04-07T15:02:18.72Z","createdBy":null,"createdAt":"2021-04-07T15:02:18.72Z","handle":"theuserhandle","email":"[email protected]","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"A","country":null,"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
7. Watch the app console. It will show message successfully handled. The log should look like:
```
info: user attribute: isAvailable updated
debug: EXIT handle
debug: Successfully processed message
debug: Commiting offset after processing message
```
2 changes: 1 addition & 1 deletion src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
return
}
try {
await ProcessorService.processCreate(messageJSON)
await ProcessorService.handle(messageJSON)
logger.debug('Successfully processed message')
} catch (err) {
logger.logFullError(err)
Expand Down
39 changes: 38 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,30 @@ async function getTopcoderToken () {
return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
}

/**
* Retrieves the user from u-bahn using their handle
* Returns null if no user exists, the user id otherwise
* @param {String} handle The member handle
* @param {String} token The auth token
*/
async function getUserId (handle, token) {
const res = await axios.get(`${config.UBAHN_API_URL}/users`, {
headers: {
Authorization: `Bearer ${token}`
},
params: {
handle
}
})
const user = res.data.filter(u => u.handle === handle)[0]

if (user) {
return user.id
}

return null
}

/**
* Create a new User
* @param {Object} body
Expand Down Expand Up @@ -178,6 +202,17 @@ async function createUserAttribute (userId, attributeId, value, token) {
await axios.post(`${config.UBAHN_API_URL}/users/${userId}/attributes`, { attributeId, value }, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Create user attribute
* @param {String} userId
* @param {String} attributeId
* @param {String} value
* @param {String} token
*/
async function updateUserAttribute (userId, attributeId, value, token) {
await axios.patch(`${config.UBAHN_API_URL}/users/${userId}/attributes/${attributeId}`, { value }, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Create external profile
* @param {String} userId
Expand Down Expand Up @@ -214,5 +249,7 @@ module.exports = {
createUser,
createUserAttribute,
createExternalProfile,
createUserSkill
createUserSkill,
getUserId,
updateUserAttribute
}
82 changes: 62 additions & 20 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,63 @@ const Joi = require('@hapi/joi')
const logger = require('../common/logger')
const helper = require('../common/helper')

const MEMBER_PROFILE_URL_PREFIX = 'https://www.topcoder.com/members/'

/**
* Process identity create entity message
* @param {Object} message the kafka message
*/
async function processCreate (message) {
async function handle (message) {
// Check if the user already exists in u-bahn
// If yes, then proceed to only update the availability status
// If not, then proceed to create the user and other associated data in u-bahn
const ubahnToken = await helper.getUbahnToken()
const userId = await helper.getUserId(message.payload.handle, ubahnToken)

if (userId) {
await processUpdate(message, userId, ubahnToken)
} else {
await processCreate(message, ubahnToken)
}
}

handle.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().keys({
id: Joi.string().required(),
handle: Joi.string().required(),
firstName: Joi.string().required(),
lastName: Joi.string().required(),
email: Joi.string().email().required(),
country: Joi.object().keys({
isoAlpha3Code: Joi.string().required()
}).unknown(true).allow(null),
active: Joi.boolean()
}).required().unknown(true)
}).required().unknown(true)
}

/**
* Create the user and associated data in u-bahn
* @param {Object} message the kafka message
* @param {String} ubahnToken the auth token
*/
async function processCreate (message, ubahnToken) {
const organizationId = await helper.getOrganizationId(ubahnToken)
const attributes = await helper.getAttributes(ubahnToken)
const location = message.payload.country.isoAlpha3Code

const userId = await helper.createUser(_.pick(message.payload, 'handle', 'firstName', 'lastName'), ubahnToken)
logger.info(`user: ${message.payload.handle} created`)
helper.sleep()
await helper.createExternalProfile(userId, { organizationId, uri: 'uri', externalId: message.payload.id, isInactive: false }, ubahnToken)
await helper.createExternalProfile(userId, { organizationId, uri: `${MEMBER_PROFILE_URL_PREFIX}${message.payload.handle}`, externalId: message.payload.id, isInactive: false }, ubahnToken)
logger.info(`external profile: ${organizationId} created`)
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), 'true', ubahnToken)
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
logger.info('user attribute: isAvailable created')
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'company'), 'Topcoder', ubahnToken)
Expand All @@ -34,28 +74,30 @@ async function processCreate (message) {
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'location'), location, ubahnToken)
logger.info('user attribute: location created')

// Custom attribute. May or may not exist
if (_.get(attributes, 'email')) {
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'email'), message.payload.email, ubahnToken)
logger.info('user attribute: email created')
}
}

processCreate.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().keys({
id: Joi.string().required(),
handle: Joi.string().required(),
firstName: Joi.string().required(),
lastName: Joi.string().required(),
country: Joi.object().keys({
isoAlpha3Code: Joi.string().required()
}).required().unknown(true)
}).required().unknown(true)
}).required().unknown(true)
/**
* Updates the user's availability status in u-bahn
* @param {Object} message the kafka message
* @param {String} userId the u-bahn user id
* @param {String} ubahnToken the auth token
*/
async function processUpdate (message, userId, ubahnToken) {
const attributes = await helper.getAttributes(ubahnToken)

await helper.updateUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
logger.info('user attribute: isAvailable updated')
}

module.exports = {
processCreate
handle
}

logger.buildService(module.exports)