Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit c5621e1

Browse files
author
sachin-maheshwari
authored
Merge pull request #8 from topcoder-platform/Issue_7
Issue 7
2 parents e5005be + 7da71b5 commit c5621e1

File tree

5 files changed

+132
-61
lines changed

5 files changed

+132
-61
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Identity - Data Processor
22

3+
Creates user in u-bahn when they sign up on Topcoder. Also updates their availability status in u-bahn based on their account activation status in Topcoder
4+
35
## Dependencies
46

57
- Nodejs(v12+)

VERIFICATION.md

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,6 @@
11
# Verification
22

3-
```
4-
{
5-
"topic":"identity.notification.create",
6-
"originator":"u-bahn-api",
7-
"timestamp":"2019-07-08T00:00:00.000Z",
8-
"mime-type":"application/json",
9-
"payload":{
10-
"id":"90064000",
11-
"modifiedBy":null,
12-
"modifiedAt":"2021-01-05T14:01:40.336Z",
13-
"createdBy":null,
14-
"createdAt":"2021-01-05T14:01:40.336Z",
15-
"handle":"theuserhandle",
16-
"email":"[email protected]",
17-
"firstName":"theuserfirstname",
18-
"lastName":"theuserlastname",
19-
"credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},
20-
"profiles":null,
21-
"status":"A",
22-
"country":{
23-
"isoAlpha3Code": "IND"
24-
},
25-
"regSource":"null",
26-
"utmSource":"null",
27-
"utmMedium":"null",
28-
"utmCampaign":"null",
29-
"roles":null,
30-
"ssoLogin":false,
31-
"active":true,
32-
"profile":null,
33-
"emailActive":true
34-
}
35-
}
36-
```
37-
38-
Additionally, you will be entering the messages into only one topic:
3+
You will be entering the messages into only one topic:
394

405
```
416
docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic identity.notification.create
@@ -45,6 +10,31 @@ docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-produ
4510
2. write message:
4611
`{"recipients":[],"notificationType":"useractivation"}`
4712
3. Watch the app console, It will show error message.
48-
4. write message:
49-
`{"topic":"identity.notification.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"90064000","modifiedBy":null,"modifiedAt":"2021-01-05T14:01:40.336Z","createdBy":null,"createdAt":"2021-01-05T14:01:40.336Z","handle":"theuserhandle","email":"[email protected]","firstName":"theuserfirstname","lastName":"theuserlastname","credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},"profiles":null,"status":"A","country":{"isoAlpha3Code":"IND"},"regSource":"null","utmSource":"null","utmMedium":"null","utmCampaign":"null","roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
50-
5. Watch the app console, It will show message successfully handled.
13+
4. Write message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":null,"createdBy":null,"createdAt":null,"handle":"theuserhandle","email":"[email protected]","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"U","country":{"code":"040","name":"Austria","isoAlpha2Code":"AT","isoAlpha3Code":"AUT"},"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":false,"profile":null,"emailActive":false}}`
14+
5. Watch the app console. It will show message successfully handled. The log should look like:
15+
```
16+
info: user: tezsachin78033 created
17+
debug: Sleeping for 1000 ms
18+
info: external profile: 36ed815b-3da1-49f1-a043-aaed0a4e81ad created
19+
debug: Sleeping for 1000 ms
20+
info: user attribute: isAvailable created
21+
debug: Sleeping for 1000 ms
22+
info: user attribute: company created
23+
debug: Sleeping for 1000 ms
24+
info: user attribute: title created
25+
debug: Sleeping for 1000 ms
26+
info: user attribute: location created
27+
debug: Sleeping for 1000 ms
28+
info: user attribute: email created
29+
debug: EXIT handle
30+
debug: Successfully processed message
31+
debug: Commiting offset after processing message
32+
```
33+
6. Now, write the following message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":"2021-04-07T15:02:18.72Z","createdBy":null,"createdAt":"2021-04-07T15:02:18.72Z","handle":"theuserhandle","email":"[email protected]","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"A","country":null,"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
34+
7. Watch the app console. It will show message successfully handled. The log should look like:
35+
```
36+
info: user attribute: isAvailable updated
37+
debug: EXIT handle
38+
debug: Successfully processed message
39+
debug: Commiting offset after processing message
40+
```

src/app.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
4747
return
4848
}
4949
try {
50-
await ProcessorService.processCreate(messageJSON)
50+
await ProcessorService.handle(messageJSON)
5151
logger.debug('Successfully processed message')
5252
} catch (err) {
5353
logger.logFullError(err)

src/common/helper.js

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,30 @@ async function getTopcoderToken () {
6060
return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
6161
}
6262

63+
/**
64+
* Retrieves the user from u-bahn using their handle
65+
* Returns null if no user exists, the user id otherwise
66+
* @param {String} handle The member handle
67+
* @param {String} token The auth token
68+
*/
69+
async function getUserId (handle, token) {
70+
const res = await axios.get(`${config.UBAHN_API_URL}/users`, {
71+
headers: {
72+
Authorization: `Bearer ${token}`
73+
},
74+
params: {
75+
handle
76+
}
77+
})
78+
const user = res.data.filter(u => u.handle === handle)[0]
79+
80+
if (user) {
81+
return user.id
82+
}
83+
84+
return null
85+
}
86+
6387
/**
6488
* Create a new User
6589
* @param {Object} body
@@ -178,6 +202,17 @@ async function createUserAttribute (userId, attributeId, value, token) {
178202
await axios.post(`${config.UBAHN_API_URL}/users/${userId}/attributes`, { attributeId, value }, { headers: { Authorization: `Bearer ${token}` } })
179203
}
180204

205+
/**
206+
* Create user attribute
207+
* @param {String} userId
208+
* @param {String} attributeId
209+
* @param {String} value
210+
* @param {String} token
211+
*/
212+
async function updateUserAttribute (userId, attributeId, value, token) {
213+
await axios.patch(`${config.UBAHN_API_URL}/users/${userId}/attributes/${attributeId}`, { value }, { headers: { Authorization: `Bearer ${token}` } })
214+
}
215+
181216
/**
182217
* Create external profile
183218
* @param {String} userId
@@ -214,5 +249,7 @@ module.exports = {
214249
createUser,
215250
createUserAttribute,
216251
createExternalProfile,
217-
createUserSkill
252+
createUserSkill,
253+
getUserId,
254+
updateUserAttribute
218255
}

src/services/ProcessorService.js

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,63 @@ const Joi = require('@hapi/joi')
77
const logger = require('../common/logger')
88
const helper = require('../common/helper')
99

10+
const MEMBER_PROFILE_URL_PREFIX = 'https://www.topcoder.com/members/'
11+
1012
/**
1113
* Process identity create entity message
1214
* @param {Object} message the kafka message
1315
*/
14-
async function processCreate (message) {
16+
async function handle (message) {
17+
// Check if the user already exists in u-bahn
18+
// If yes, then proceed to only update the availability status
19+
// If not, then proceed to create the user and other associated data in u-bahn
1520
const ubahnToken = await helper.getUbahnToken()
21+
const userId = await helper.getUserId(message.payload.handle, ubahnToken)
22+
23+
if (userId) {
24+
await processUpdate(message, userId, ubahnToken)
25+
} else {
26+
await processCreate(message, ubahnToken)
27+
}
28+
}
29+
30+
handle.schema = {
31+
message: Joi.object().keys({
32+
topic: Joi.string().required(),
33+
originator: Joi.string().required(),
34+
timestamp: Joi.date().required(),
35+
'mime-type': Joi.string().required(),
36+
payload: Joi.object().keys({
37+
id: Joi.string().required(),
38+
handle: Joi.string().required(),
39+
firstName: Joi.string().required(),
40+
lastName: Joi.string().required(),
41+
email: Joi.string().email().required(),
42+
country: Joi.object().keys({
43+
isoAlpha3Code: Joi.string().required()
44+
}).unknown(true).allow(null),
45+
active: Joi.boolean()
46+
}).required().unknown(true)
47+
}).required().unknown(true)
48+
}
49+
50+
/**
51+
* Create the user and associated data in u-bahn
52+
* @param {Object} message the kafka message
53+
* @param {String} ubahnToken the auth token
54+
*/
55+
async function processCreate (message, ubahnToken) {
1656
const organizationId = await helper.getOrganizationId(ubahnToken)
1757
const attributes = await helper.getAttributes(ubahnToken)
1858
const location = message.payload.country.isoAlpha3Code
1959

2060
const userId = await helper.createUser(_.pick(message.payload, 'handle', 'firstName', 'lastName'), ubahnToken)
2161
logger.info(`user: ${message.payload.handle} created`)
2262
helper.sleep()
23-
await helper.createExternalProfile(userId, { organizationId, uri: 'uri', externalId: message.payload.id, isInactive: false }, ubahnToken)
63+
await helper.createExternalProfile(userId, { organizationId, uri: `${MEMBER_PROFILE_URL_PREFIX}${message.payload.handle}`, externalId: message.payload.id, isInactive: false }, ubahnToken)
2464
logger.info(`external profile: ${organizationId} created`)
2565
helper.sleep()
26-
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), 'true', ubahnToken)
66+
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
2767
logger.info('user attribute: isAvailable created')
2868
helper.sleep()
2969
await helper.createUserAttribute(userId, _.get(attributes, 'company'), 'Topcoder', ubahnToken)
@@ -34,28 +74,30 @@ async function processCreate (message) {
3474
helper.sleep()
3575
await helper.createUserAttribute(userId, _.get(attributes, 'location'), location, ubahnToken)
3676
logger.info('user attribute: location created')
77+
78+
// Custom attribute. May or may not exist
79+
if (_.get(attributes, 'email')) {
80+
helper.sleep()
81+
await helper.createUserAttribute(userId, _.get(attributes, 'email'), message.payload.email, ubahnToken)
82+
logger.info('user attribute: email created')
83+
}
3784
}
3885

39-
processCreate.schema = {
40-
message: Joi.object().keys({
41-
topic: Joi.string().required(),
42-
originator: Joi.string().required(),
43-
timestamp: Joi.date().required(),
44-
'mime-type': Joi.string().required(),
45-
payload: Joi.object().keys({
46-
id: Joi.string().required(),
47-
handle: Joi.string().required(),
48-
firstName: Joi.string().required(),
49-
lastName: Joi.string().required(),
50-
country: Joi.object().keys({
51-
isoAlpha3Code: Joi.string().required()
52-
}).required().unknown(true)
53-
}).required().unknown(true)
54-
}).required().unknown(true)
86+
/**
87+
* Updates the user's availability status in u-bahn
88+
* @param {Object} message the kafka message
89+
* @param {String} userId the u-bahn user id
90+
* @param {String} ubahnToken the auth token
91+
*/
92+
async function processUpdate (message, userId, ubahnToken) {
93+
const attributes = await helper.getAttributes(ubahnToken)
94+
95+
await helper.updateUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
96+
logger.info('user attribute: isAvailable updated')
5597
}
5698

5799
module.exports = {
58-
processCreate
100+
handle
59101
}
60102

61103
logger.buildService(module.exports)

0 commit comments

Comments
 (0)