From 71bca54d47849775e454c2f4f7ec7d086fdb45d8 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Mon, 17 Feb 2020 15:28:26 +0530 Subject: [PATCH 01/11] initial bulk message developemnt. --- config/default.js | 7 +- src/hooks/hookBulkMessage.js | 137 ++++++++++++++++++++++++++++ src/hooks/index.js | 17 ++++ src/models/BulkMessageUserRefs.js | 34 +++++++ src/models/BulkMessages.js | 21 +++++ src/models/index.js | 6 ++ src/services/NotificationService.js | 6 ++ test/checkHooks.js | 3 + 8 files changed, 228 insertions(+), 3 deletions(-) create mode 100644 src/hooks/hookBulkMessage.js create mode 100644 src/hooks/index.js create mode 100644 src/models/BulkMessageUserRefs.js create mode 100644 src/models/BulkMessages.js create mode 100644 test/checkHooks.js diff --git a/config/default.js b/config/default.js index bbcc8c5..c1252b5 100644 --- a/config/default.js +++ b/config/default.js @@ -58,7 +58,7 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Challenge', + group: 'challenge', title: 'Challenge specification is modified.', }, }, @@ -75,7 +75,7 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Challenge', + group: 'challenge', title: 'Challenge checkpoint review.', }, }, @@ -92,7 +92,7 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Submission', + group: 'submission', title: 'A new submission is uploaded.', }, }, @@ -108,4 +108,5 @@ module.exports = { ENABLE_DEV_MODE: process.env.ENABLE_DEV_MODE ? Boolean(process.env.ENABLE_DEV_MODE) : true, DEV_MODE_EMAIL: process.env.DEV_MODE_EMAIL, DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL, + ENABLE_HOOK_BULK_NOTIFICATION : process.env.ENABLE_HOOK_BULK_NOTIFICATION || false, }; diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js new file mode 100644 index 0000000..e7278f3 --- /dev/null +++ b/src/hooks/hookBulkMessage.js @@ -0,0 +1,137 @@ +/** + * Hook to insert broadcast notification into database for a user. + */ + +'use strict' + +const _ = require('lodash') +//const Joi = require('joi') +//const errors = require('../common/errors') +const logger = require('../common/logger') +const models = require('../models') +const logPrefix = "BulkNotificationHook: " + +models.BulkMessages.sync() +models.BulkMessageUserRefs.sync() + +/** + * Main function + * @param {Integer} userId + */ +function checkBulkMessageForUser(userId) { + models.BulkMessages.count().then(function (tBulkMessages) { + if (tBulkMessages > 0) { + // the condition can help to optimize the execution + models.BulkMessageUserRefs.count({ + where: { + user_id: userId + } + }).then(function (tUserRefs) { + if (tUserRefs < tBulkMessages) { + logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) + syncBulkMessageForUser(userId) + } + }).catch((e) => { + logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e) + }) + } + }).catch((e) => { + logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e) + }) +} + +/** + * Helper function + * @param {Integer} userId + */ +function syncBulkMessageForUser(userId) { + + /** + * Check if all bulk mesaages processed for current user or not + */ + let q = "SELECT a.* FROM bulk_messages AS a " + + " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " + + " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" + + " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL" + models.sequelize.query(q, { bind: [userId] }) + .then(function (res) { + _.map(res[0], async (r) => { + logger.info(`${logPrefix} need to process for bulk message id: `, r.id) + // call function to check if current user in reciepent group + // insert row in userRef table + if (isBroadCastMessageForUser(userId, r)) { + // current user in reciepent group + createNotificationForUser(userId, r) + } else { + /** + * Insert row in userRef with notification-id null value + * It means - broadcast message in not for current user + */ + insertUserRefs(userId, r.id, null) + } + }) + }).catch((e) => { + logger.error(`${logPrefix} Failed to check bulk message condition: `, err) + }) +} + +/** + * Helper function + * Check if current user in broadcast recipent group + * @param {Integer} userId + * @param {Object} bulkMessage + */ +function isBroadCastMessageForUser(userId, bulkMessage) { + // TODO + return true; +} + +/** + * Helper function + * @param {Integer} userId + * @param {Integer} bulkMessageId + * @param {Integer} notificationId + */ +function insertUserRefs(userId, bulkMessageId, notificationId) { + models.BulkMessageUserRefs.create({ + bulk_message_id: bulkMessageId, + user_id: userId, + notification_id: notificationId, + }).then((b) => { + logger.info(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`) + }).catch((e) => { + logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: `, e) + }) +} + +/** + * Helper function + * @param {Integer} userId + * @param {Object} bulkMessage + */ +function createNotificationForUser(userId, bulkMessage) { + models.Notification.create({ + userId: userId, + type: bulkMessage.type, + contents: { + id: bulkMessage.id, /** broadcast message id */ + name: bulkMessage.contents, /** broadcast message */ + group: 'broadcast', + title: 'Broadcast Message', + }, + read: false, + seen: false, + version: null, + }).then((n) => { + logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) + insertUserRefs(userId, bulkMessage.id, n.id) + }).catch((err) => { + logger.error(`${logPrefix} Error in inserting broadcast message `, err) + }) +} + + +// Exports +module.exports = { + checkBulkMessageForUser, +}; \ No newline at end of file diff --git a/src/hooks/index.js b/src/hooks/index.js new file mode 100644 index 0000000..5dcc1a6 --- /dev/null +++ b/src/hooks/index.js @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * Hook implementation + * + * @author TCSCODER + * @version 1.0 + */ + +const hookBulkMessage = require("./hookBulkMessage") + + +module.exports = { + hookBulkMessage, +}; diff --git a/src/models/BulkMessageUserRefs.js b/src/models/BulkMessageUserRefs.js new file mode 100644 index 0000000..7f3baf3 --- /dev/null +++ b/src/models/BulkMessageUserRefs.js @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * The Bulk Message User Reference schema + * + * @author TCSCODER + * @version 1.0 + */ + + +module.exports = (sequelize, DataTypes) => sequelize.define('bulk_message_user_refs', { + id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, + bulk_message_id: { + type: DataTypes.BIGINT, + allowNull: false, + references: { + model: 'bulk_messages', + key: 'id' + } + }, + notification_id: { + type: DataTypes.BIGINT, + allowNull: true, + references: { + model: 'Notifications', + key: 'id' + } + }, + user_id: { type: DataTypes.BIGINT, allowNull: false } +}, {}); + + // sequelize will generate and manage createdAt, updatedAt fields diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js new file mode 100644 index 0000000..1479b2c --- /dev/null +++ b/src/models/BulkMessages.js @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * The Bulk Message Store schema + * + * @author TCSCODER + * @version 1.0 + */ + + +module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', { + id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, + type: { type: DataTypes.STRING, allowNull: false }, + contents: { type: DataTypes.JSONB, allowNull: false }, + recipient_group: { type: DataTypes.STRING, allowNull: false } + }, {}); + + // sequelize will generate and manage createdAt, updatedAt fields + \ No newline at end of file diff --git a/src/models/index.js b/src/models/index.js index e6ef09e..d18f68a 100644 --- a/src/models/index.js +++ b/src/models/index.js @@ -16,11 +16,17 @@ const Notification = require('./Notification')(sequelize, DataTypes); const NotificationSetting = require('./NotificationSetting')(sequelize, DataTypes); const ServiceSettings = require('./ServiceSettings')(sequelize, DataTypes); const ScheduledEvents = require('./ScheduledEvents')(sequelize, DataTypes); +const BulkMessages = require('./BulkMessages')(sequelize, DataTypes); +const BulkMessageUserRefs = require('./BulkMessageUserRefs')(sequelize, DataTypes); + module.exports = { Notification, NotificationSetting, ServiceSettings, ScheduledEvents, + BulkMessages, + BulkMessageUserRefs, + sequelize, init: () => sequelize.sync(), }; diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index cb92781..cf8c9cb 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -9,6 +9,8 @@ const Joi = require('joi'); const errors = require('../common/errors'); const logger = require('../common/logger'); const models = require('../models'); +const config = require('config'); +const hooks = require('../hooks'); const DEFAULT_LIMIT = 10; @@ -202,6 +204,10 @@ function* listNotifications(query, userId) { break; } + if (config.ENABLE_HOOK_BULK_NOTIFICATION){ + hooks.hookBulkMessage.checkBulkMessageForUser(userId) + } + if (_.keys(notificationSettings).length > 0) { // only filter out notifications types which were explicitly set to 'no' - so we return notification by default const notifications = _.keys(notificationSettings).filter((notificationType) => diff --git a/test/checkHooks.js b/test/checkHooks.js new file mode 100644 index 0000000..6371283 --- /dev/null +++ b/test/checkHooks.js @@ -0,0 +1,3 @@ +const bulkhook = require("../src/hooks/hookBulkMessage") + +bulkhook.checkBulkMessageForUser(123) \ No newline at end of file From 6e80a00beba6315aec9259df52bfd64036d785d5 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Mon, 17 Feb 2020 18:29:27 +0530 Subject: [PATCH 02/11] adding broadcast processor --- config/default.js | 4 +++ src/hooks/hookBulkMessage.js | 10 ++++-- src/models/BulkMessages.js | 5 +-- .../broadcast/bulkNotificationHandler.js | 35 +++++++++++++++++++ src/processors/index.js | 2 ++ 5 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 src/processors/broadcast/bulkNotificationHandler.js diff --git a/config/default.js b/config/default.js index c1252b5..904b068 100644 --- a/config/default.js +++ b/config/default.js @@ -98,6 +98,10 @@ module.exports = { }, }, ], + 'admin.notification.broadcast' : [{ + handleBulkNotification: {} + } + ] //'notifications.community.challenge.created': ['handleChallengeCreated'], //'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'], }, diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js index e7278f3..16d8c41 100644 --- a/src/hooks/hookBulkMessage.js +++ b/src/hooks/hookBulkMessage.js @@ -11,8 +11,12 @@ const logger = require('../common/logger') const models = require('../models') const logPrefix = "BulkNotificationHook: " -models.BulkMessages.sync() -models.BulkMessageUserRefs.sync() +/** + * CREATE NEW TABLES IF NOT EXISTS + */ +models.BulkMessages.sync().then((t)=> { + models.BulkMessageUserRefs.sync() +}) /** * Main function @@ -115,7 +119,7 @@ function createNotificationForUser(userId, bulkMessage) { type: bulkMessage.type, contents: { id: bulkMessage.id, /** broadcast message id */ - name: bulkMessage.contents, /** broadcast message */ + message: bulkMessage.message, /** broadcast message */ group: 'broadcast', title: 'Broadcast Message', }, diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js index 1479b2c..5026f6e 100644 --- a/src/models/BulkMessages.js +++ b/src/models/BulkMessages.js @@ -13,8 +13,9 @@ module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', { id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, type: { type: DataTypes.STRING, allowNull: false }, - contents: { type: DataTypes.JSONB, allowNull: false }, - recipient_group: { type: DataTypes.STRING, allowNull: false } + message: { type: DataTypes.TEXT, allowNull: false }, + recipients: { type: DataTypes.JSONB, allowNull: false }, + rules: {type: DataTypes.JSONB, allowNull: true} }, {}); // sequelize will generate and manage createdAt, updatedAt fields diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js new file mode 100644 index 0000000..11aa557 --- /dev/null +++ b/src/processors/broadcast/bulkNotificationHandler.js @@ -0,0 +1,35 @@ +/** + * Bulk notification handler. + */ +const co = require('co'); +const models = require('../../models'); +const logger = require('../../common/logger') + +/** + * Handle Kafka JSON message of broadcast. + * + * @param {Object} message the Kafka JSON message + * @param {Object} ruleSets + * + * @return {Promise} promise resolved to notifications + */ +const handle = (message, ruleSets) => co(function* () { + return new Promise(function(resolve, reject){ + models.BulkMessages.create({ + type: message.topic, + message: message.payload.message, + recipients: message.payload.recipients, + rules: message.payload.rules || null, + }).then((bm) => { + logger.info("Broadcast message recieved and inserted in db with id:", bm.id) + resolve([]) // no notification need to insert at this point + }).catch((e) => { + logger.error("Broadcast processor failed in db operation. Error: ", e) + reject(e) + }) + }) +}); + +module.exports = { + handle, +}; \ No newline at end of file diff --git a/src/processors/index.js b/src/processors/index.js index 70d8c6a..a0243be 100644 --- a/src/processors/index.js +++ b/src/processors/index.js @@ -8,6 +8,7 @@ const ChallengePhaseWarningHandler = require('./challenge/ChallengePhaseWarningH const ChallengeHandler = require('./challenge/ChallengeHandler'); const AutoPilotHandler = require('./challenge/AutoPilotHandler'); const SubmissionHandler = require('./challenge/SubmissionHandler'); +const BulkNotificationHandler = require('./broadcast/bulkNotificationHandler'); // Exports module.exports = { @@ -16,4 +17,5 @@ module.exports = { handleChallenge: ChallengeHandler.handle, handleAutoPilot: AutoPilotHandler.handle, handleSubmission: SubmissionHandler.handle, + handleBulkNotification: BulkNotificationHandler.handle, }; From 9d3ed96fcda4a268ed0976ce52a444833176e6e2 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 18 Feb 2020 16:53:54 +0530 Subject: [PATCH 03/11] adding functions to decide recipient condition. --- src/common/broadcastAPIHelper.js | 86 +++++++++++++++++++ src/hooks/hookBulkMessage.js | 123 +++++++++++++++------------- src/services/NotificationService.js | 6 +- 3 files changed, 157 insertions(+), 58 deletions(-) create mode 100644 src/common/broadcastAPIHelper.js diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js new file mode 100644 index 0000000..167c34c --- /dev/null +++ b/src/common/broadcastAPIHelper.js @@ -0,0 +1,86 @@ +/** + * + */ + +const _ = require('lodash') +const config = require('config') +const request = require('superagent') +const logger = require('./logger') +const m2mAuth = require('tc-core-library-js').auth.m2m; +const m2m = m2mAuth(config); + +const logPrefix = "BroadcastAPI: " + +async function getM2MToken() { + return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET) +} + +async function getMemberInfo(userId) { + const url = config.TC_API_V3_BASE_URL + + `/members/_search/?fields=userId%2Cskills&query=userId%3A${userId}&limit=1` + return new Promise(function (resolve, reject) { + let memberInfo = [] + logger.info(`calling member api ${url} `) + request + .get(url).then((res) => { + if (!_.get(res, 'body.result.success')) { + reject(new Error(`Failed to get member api detail for user id ${userId}`)) + } + memberInfo = _.get(res, 'body.result.content') + logger.info(`Feteched ${memberInfo.length} record(s) from member api`) + resolve(memberInfo) + }) + .catch((err) => { + reject(new Error(`Failed to get member api detail for user id ${userId}, ${err}`)) + }) + + }) + // Need clean-up + /*const m2m = await getM2MToken().catch((err) => { + logger.error(`${logPrefix} Failed to get m2m token`) + return new Promise(function(res, rej) { + rej(err) + }) + }) + logger.info(`${logPrefix} Fetched m2m token sucessfully. Token length is: `, m2m.length) + */ + //return request.get(url) +} + +async function checkBroadcastMessageForUser(userId, bulkMessage) { + return new Promise(function (resolve, reject) { + const skills = _.get(bulkMessage, 'recipients.skills') + if (skills && skills.length > 0) { + try { + getMemberInfo(userId).then((m) => { + let flag = false + logger.info(`${logPrefix} got member info.`) + const ms = _.get(m[0], "skills") + const memberSkills = [] + _.map(ms, (o) => { + memberSkills.push(_.get(o, 'name').toLowerCase()) + }) + logger.info(`${logPrefix} user id have following skills`, memberSkills) + _.map(skills, (s) => { + if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { + flag = true; + logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`) + } + }) + resolve(flag) + }).catch((err) => { + reject(err) + }) + } catch (err) { + reject(new Error(`${logPrefix} issue at skill condition check, ${err.message}`)) + } + } else { + resolve(true) // no condition on recipient, so for all + } + }) // promise end + +} + +module.exports = { + checkBroadcastMessageForUser, +} \ No newline at end of file diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js index 16d8c41..067b400 100644 --- a/src/hooks/hookBulkMessage.js +++ b/src/hooks/hookBulkMessage.js @@ -9,12 +9,14 @@ const _ = require('lodash') //const errors = require('../common/errors') const logger = require('../common/logger') const models = require('../models') +const api = require('../common/broadcastAPIHelper') + const logPrefix = "BulkNotificationHook: " /** * CREATE NEW TABLES IF NOT EXISTS */ -models.BulkMessages.sync().then((t)=> { +models.BulkMessages.sync().then((t) => { models.BulkMessageUserRefs.sync() }) @@ -22,25 +24,34 @@ models.BulkMessages.sync().then((t)=> { * Main function * @param {Integer} userId */ -function checkBulkMessageForUser(userId) { - models.BulkMessages.count().then(function (tBulkMessages) { - if (tBulkMessages > 0) { - // the condition can help to optimize the execution - models.BulkMessageUserRefs.count({ - where: { - user_id: userId - } - }).then(function (tUserRefs) { - if (tUserRefs < tBulkMessages) { - logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) - syncBulkMessageForUser(userId) - } - }).catch((e) => { - logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e) - }) - } - }).catch((e) => { - logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e) +async function checkBulkMessageForUser(userId) { + return new Promise(function (resolve, reject) { + models.BulkMessages.count().then(function (tBulkMessages) { + if (tBulkMessages > 0) { + // the condition can help to optimize the execution + models.BulkMessageUserRefs.count({ + where: { + user_id: userId + } + }).then(async function (tUserRefs) { + if (tUserRefs < tBulkMessages) { + logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) + syncBulkMessageForUser(userId).catch((e) => { + reject(e) + }) + } + resolve(true) // resolve here + }).catch((e) => { + logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e) + reject(e) + }) + } else { + resolve(true) + } + }).catch((e) => { + logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e) + reject(e) + }) }) } @@ -48,35 +59,36 @@ function checkBulkMessageForUser(userId) { * Helper function * @param {Integer} userId */ -function syncBulkMessageForUser(userId) { +async function syncBulkMessageForUser(userId) { - /** - * Check if all bulk mesaages processed for current user or not - */ - let q = "SELECT a.* FROM bulk_messages AS a " + - " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " + - " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" + - " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL" - models.sequelize.query(q, { bind: [userId] }) - .then(function (res) { - _.map(res[0], async (r) => { - logger.info(`${logPrefix} need to process for bulk message id: `, r.id) - // call function to check if current user in reciepent group - // insert row in userRef table - if (isBroadCastMessageForUser(userId, r)) { - // current user in reciepent group - createNotificationForUser(userId, r) - } else { - /** - * Insert row in userRef with notification-id null value - * It means - broadcast message in not for current user - */ - insertUserRefs(userId, r.id, null) - } + return new Promise(function (resolve, reject) { + /** + * Check if all bulk mesaages processed for current user or not + */ + let q = "SELECT a.* FROM bulk_messages AS a " + + " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " + + " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" + + " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL" + models.sequelize.query(q, { bind: [userId] }) + .then(function (res) { + _.map(res[0], (r) => { + logger.info(`${logPrefix} need to process for bulk message id: `, r.id) + isBroadCastMessageForUser(userId, r).then((result) => { + if (result) { + createNotificationForUser(userId, r) + } else { + insertUserRefs(userId, r.id, null) + } + }).catch((err) => { + logger.error("failed in checking recipient group condition, Error:", err) + }) + }) + resolve(true) + }).catch((e) => { + logger.error(`${logPrefix} Failed to check bulk message condition: `, e) + reject(e) }) - }).catch((e) => { - logger.error(`${logPrefix} Failed to check bulk message condition: `, err) - }) + }) } /** @@ -85,9 +97,8 @@ function syncBulkMessageForUser(userId) { * @param {Integer} userId * @param {Object} bulkMessage */ -function isBroadCastMessageForUser(userId, bulkMessage) { - // TODO - return true; +async function isBroadCastMessageForUser(userId, bulkMessage) { + return api.checkBroadcastMessageForUser(userId, bulkMessage) } /** @@ -96,8 +107,8 @@ function isBroadCastMessageForUser(userId, bulkMessage) { * @param {Integer} bulkMessageId * @param {Integer} notificationId */ -function insertUserRefs(userId, bulkMessageId, notificationId) { - models.BulkMessageUserRefs.create({ +async function insertUserRefs(userId, bulkMessageId, notificationId) { + await models.BulkMessageUserRefs.create({ bulk_message_id: bulkMessageId, user_id: userId, notification_id: notificationId, @@ -113,8 +124,8 @@ function insertUserRefs(userId, bulkMessageId, notificationId) { * @param {Integer} userId * @param {Object} bulkMessage */ -function createNotificationForUser(userId, bulkMessage) { - models.Notification.create({ +async function createNotificationForUser(userId, bulkMessage) { + await models.Notification.create({ userId: userId, type: bulkMessage.type, contents: { @@ -126,9 +137,9 @@ function createNotificationForUser(userId, bulkMessage) { read: false, seen: false, version: null, - }).then((n) => { + }).then(async (n) => { logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) - insertUserRefs(userId, bulkMessage.id, n.id) + await insertUserRefs(userId, bulkMessage.id, n.id) }).catch((err) => { logger.error(`${logPrefix} Error in inserting broadcast message `, err) }) diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index cf8c9cb..906bb3d 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -204,8 +204,10 @@ function* listNotifications(query, userId) { break; } - if (config.ENABLE_HOOK_BULK_NOTIFICATION){ - hooks.hookBulkMessage.checkBulkMessageForUser(userId) + if (config.ENABLE_HOOK_BULK_NOTIFICATION) { + yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => { + logger.info(`Issue in calling bulk notification hook.`, e) + }) } if (_.keys(notificationSettings).length > 0) { From d30fbefaeae4684d5e263d008cd8dfda4edf373b Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 18 Feb 2020 19:11:26 +0530 Subject: [PATCH 04/11] sync calls for testing --- src/common/broadcastAPIHelper.js | 6 +- src/hooks/hookBulkMessage.js | 95 +++++++++++++++-------------- src/services/NotificationService.js | 2 +- 3 files changed, 56 insertions(+), 47 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 167c34c..8debbca 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -50,6 +50,7 @@ async function getMemberInfo(userId) { async function checkBroadcastMessageForUser(userId, bulkMessage) { return new Promise(function (resolve, reject) { const skills = _.get(bulkMessage, 'recipients.skills') + logger.info(`Got skills in DB...`, skills) if (skills && skills.length > 0) { try { getMemberInfo(userId).then((m) => { @@ -67,7 +68,10 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) { logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`) } }) - resolve(flag) + resolve({ + record: bulkMessage, + result: flag + }) }).catch((err) => { reject(err) }) diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js index 067b400..9eb59f3 100644 --- a/src/hooks/hookBulkMessage.js +++ b/src/hooks/hookBulkMessage.js @@ -34,16 +34,14 @@ async function checkBulkMessageForUser(userId) { user_id: userId } }).then(async function (tUserRefs) { + let result = true if (tUserRefs < tBulkMessages) { logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) - syncBulkMessageForUser(userId).catch((e) => { - reject(e) - }) + result = await syncBulkMessageForUser(userId) } - resolve(true) // resolve here + resolve(result) // resolve here }).catch((e) => { - logger.error(`${logPrefix} Failed to check total userRefs condition. Error: `, e) - reject(e) + reject(`${logPrefix} Failed to check total userRefs condition. Error: ${e}`) }) } else { resolve(true) @@ -71,22 +69,24 @@ async function syncBulkMessageForUser(userId) { " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL" models.sequelize.query(q, { bind: [userId] }) .then(function (res) { - _.map(res[0], (r) => { - logger.info(`${logPrefix} need to process for bulk message id: `, r.id) - isBroadCastMessageForUser(userId, r).then((result) => { - if (result) { - createNotificationForUser(userId, r) - } else { - insertUserRefs(userId, r.id, null) - } - }).catch((err) => { - logger.error("failed in checking recipient group condition, Error:", err) + Promise.all(res[0].map((r) => isBroadCastMessageForUser(userId, r))) + .then((results) => { + Promise.all(results.map((o) => { + if (o.result) { + return createNotificationForUser(userId, o.record) + } else { + return insertUserRefs(userId, o.record.id, null) + } + })).then((results) => { + resolve(results) + }).catch((e) => { + reject(e) + }) + }).catch((e) => { + reject(e) }) - }) - resolve(true) }).catch((e) => { - logger.error(`${logPrefix} Failed to check bulk message condition: `, e) - reject(e) + reject(`${logPrefix} Failed to check bulk message condition: error - ${e}`) }) }) } @@ -108,14 +108,16 @@ async function isBroadCastMessageForUser(userId, bulkMessage) { * @param {Integer} notificationId */ async function insertUserRefs(userId, bulkMessageId, notificationId) { - await models.BulkMessageUserRefs.create({ - bulk_message_id: bulkMessageId, - user_id: userId, - notification_id: notificationId, - }).then((b) => { - logger.info(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`) - }).catch((e) => { - logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: `, e) + return new Promise(function (resolve, reject) { + models.BulkMessageUserRefs.create({ + bulk_message_id: bulkMessageId, + user_id: userId, + notification_id: notificationId, + }).then((b) => { + resolve(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`) + }).catch((e) => { + reject(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`) + }) }) } @@ -125,23 +127,26 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) { * @param {Object} bulkMessage */ async function createNotificationForUser(userId, bulkMessage) { - await models.Notification.create({ - userId: userId, - type: bulkMessage.type, - contents: { - id: bulkMessage.id, /** broadcast message id */ - message: bulkMessage.message, /** broadcast message */ - group: 'broadcast', - title: 'Broadcast Message', - }, - read: false, - seen: false, - version: null, - }).then(async (n) => { - logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) - await insertUserRefs(userId, bulkMessage.id, n.id) - }).catch((err) => { - logger.error(`${logPrefix} Error in inserting broadcast message `, err) + return new Promise(function (resolve, reject) { + models.Notification.create({ + userId: userId, + type: bulkMessage.type, + contents: { + id: bulkMessage.id, /** broadcast message id */ + message: bulkMessage.message, /** broadcast message */ + group: 'broadcast', + title: 'Broadcast Message', + }, + read: false, + seen: false, + version: null, + }).then(async (n) => { + logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) + const result = await insertUserRefs(userId, bulkMessage.id, n.id) + resolve(result) + }).catch((err) => { + reject(`${logPrefix} Error in inserting broadcast message: ${err} `) + }) }) } diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index 906bb3d..4456c3a 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -206,7 +206,7 @@ function* listNotifications(query, userId) { if (config.ENABLE_HOOK_BULK_NOTIFICATION) { yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => { - logger.info(`Issue in calling bulk notification hook.`, e) + logger.error(`Issue in calling bulk notification hook.`, e) }) } From 918f970e5bb1ae2402b39c22b8b6f52ab1c62d9a Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 19 Feb 2020 20:12:39 +0530 Subject: [PATCH 05/11] adding logic for user group checking. --- src/common/broadcastAPIHelper.js | 168 +++++++++++++++++++--------- src/services/NotificationService.js | 8 +- 2 files changed, 120 insertions(+), 56 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 8debbca..d8c11e4 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -15,74 +15,136 @@ async function getM2MToken() { return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET) } +/** + * Helper Function - get member profile + * @param {Integer} userId + */ async function getMemberInfo(userId) { const url = config.TC_API_V3_BASE_URL + - `/members/_search/?fields=userId%2Cskills&query=userId%3A${userId}&limit=1` - return new Promise(function (resolve, reject) { + "/members/_search/?" + + "fields=userId%2Cskills" + + `&query=userId%3A${userId}` + + `&limit=1` + return new Promise(async function (resolve, reject) { let memberInfo = [] logger.info(`calling member api ${url} `) - request - .get(url).then((res) => { - if (!_.get(res, 'body.result.success')) { - reject(new Error(`Failed to get member api detail for user id ${userId}`)) - } - memberInfo = _.get(res, 'body.result.content') - logger.info(`Feteched ${memberInfo.length} record(s) from member api`) - resolve(memberInfo) - }) - .catch((err) => { - reject(new Error(`Failed to get member api detail for user id ${userId}, ${err}`)) - }) + try { + const res = await request.get(url) + if (!_.get(res, 'body.result.success')) { + reject(new Error(`BCA Memeber API: Failed to get member detail for user id ${userId}`)) + } + memberInfo = _.get(res, 'body.result.content') + logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`) + resolve(memberInfo) + } catch (err) { + reject(new Error(`BCA Memeber API: Failed to get member api detail for user id ${userId}, ${err}`)) + } }) - // Need clean-up - /*const m2m = await getM2MToken().catch((err) => { - logger.error(`${logPrefix} Failed to get m2m token`) - return new Promise(function(res, rej) { - rej(err) - }) +} + +/** + * Helper Function - get user group + * @param {Integer} userId + */ +async function getUserGroup(userId) { + //TODO need to take care of pagination + const url = config.TC_API_V5_BASE_URL + + `/groups/?memberId=${userId}` + + "&membershipType=user&page=1" + let groupInfo = [] + return new Promise(async (resolve, reject) => { + try { + const machineToken = await getM2MToken() + //logger.info(`BCA Group API: got m2m token of length ${machineToken.length}`) + const res = await request.get(url).set('Authorization', `Bearer ${machineToken}`); + if (_.get(res, 'res.statusCode') != 200) { + reject(new Error(`BCA Group API: Failed to get group detail for user id ${userId}`)) + } + groupInfo = _.get(res, 'body') + logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`) + resolve(groupInfo) + } catch (e) { + reject(`Calling group api ${e}`) + } }) - logger.info(`${logPrefix} Fetched m2m token sucessfully. Token length is: `, m2m.length) - */ - //return request.get(url) } async function checkBroadcastMessageForUser(userId, bulkMessage) { return new Promise(function (resolve, reject) { - const skills = _.get(bulkMessage, 'recipients.skills') - logger.info(`Got skills in DB...`, skills) - if (skills && skills.length > 0) { - try { - getMemberInfo(userId).then((m) => { - let flag = false - logger.info(`${logPrefix} got member info.`) - const ms = _.get(m[0], "skills") - const memberSkills = [] - _.map(ms, (o) => { - memberSkills.push(_.get(o, 'name').toLowerCase()) - }) - logger.info(`${logPrefix} user id have following skills`, memberSkills) - _.map(skills, (s) => { - if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { - flag = true; - logger.info(`${logPrefix} '${s}' skill matached for user id ${userId}`) - } - }) - resolve({ - record: bulkMessage, - result: flag - }) - }).catch((err) => { - reject(err) + Promise.all([ + checkUserSkill(userId, bulkMessage), + checkUserGroup(userId, bulkMessage), + ]).then((results) => { + let flag = true // TODO need to be sure about default value + _.map(results, (r) => { + flag = !r ? false : flag // TODO recheck condition + }) + logger.info(`Final condition result is: ${flag}`) + resolve({ + record: bulkMessage, + result: flag + }) + }).catch((err) => { + reject(`${logPrefix} got issue in checking recipient condition. ${err}`) + }) + }) // promise end +} + +/** + * Helper function - check Skill condition + */ +async function checkUserSkill(userId, bulkMessage) { + return new Promise(async function (resolve, reject) { + try { + const skills = _.get(bulkMessage, 'recipients.skills') + let flag = true // allow for all + if (skills && skills.length > 0) { + const m = await getMemberInfo(userId) + const ms = _.get(m[0], "skills") // get member skills + const memberSkills = [] + flag = false + _.map(ms, (o) => { + memberSkills.push(_.get(o, 'name').toLowerCase()) + }) + _.map(skills, (s) => { + if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { + flag = true + logger.info(`BroadcastMessageId: ${bulkMessage.id}, '${s}' skill matached for user id ${userId}`) + } }) - } catch (err) { - reject(new Error(`${logPrefix} issue at skill condition check, ${err.message}`)) } - } else { - resolve(true) // no condition on recipient, so for all + resolve(flag) + } catch (e) { + reject(e) } - }) // promise end + }) // promise end +} +/** + * Helper function - check group condition + */ +async function checkUserGroup(userId, bulkMessage) { + return new Promise(async function (resolve, reject) { + try { + const groups = _.get(bulkMessage, 'recipients.groups') + let flag = true // TODO + if (groups.length > 0) { + flag = false + const groupInfo = await getUserGroup(userId) + _.map(groupInfo, (o) => { + if (_.indexOf(groups, "public") >= 0) { + flag = (_.get(o, "privateGroup")) ? false : flag + } else { + flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag + } + }) + } + resolve(flag) + } catch (e) { + reject(e) + } + }) } module.exports = { diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index 4456c3a..fcae5a2 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -205,9 +205,11 @@ function* listNotifications(query, userId) { } if (config.ENABLE_HOOK_BULK_NOTIFICATION) { - yield hooks.hookBulkMessage.checkBulkMessageForUser(userId).catch((e) => { - logger.error(`Issue in calling bulk notification hook.`, e) - }) + try { + yield hooks.hookBulkMessage.checkBulkMessageForUser(userId) + } catch (e) { + logger.error(`Error in calling bulk notification hook: ${e}`) + } } if (_.keys(notificationSettings).length > 0) { From 5c6eaad50f1a6e5f40955b5b9d9366718c5980cf Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Thu, 20 Feb 2020 15:33:10 +0530 Subject: [PATCH 06/11] deploying on dev --- .circleci/config.yml | 2 +- src/common/broadcastAPIHelper.js | 51 +++++++++++-------- src/hooks/hookBulkMessage.js | 39 +++++++------- src/models/BulkMessages.js | 14 +++-- .../broadcast/bulkNotificationHandler.js | 51 ++++++++++++------- 5 files changed, 92 insertions(+), 65 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f87288b..7bc897d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -102,7 +102,7 @@ workflows: context : org-global filters: branches: - only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/platform-filtering'] + only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/bulk-notification'] - "build-prod": context : org-global filters: diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index d8c11e4..3cda1e2 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -11,6 +11,9 @@ const m2m = m2mAuth(config); const logPrefix = "BroadcastAPI: " +/** + * Helper Function - get m2m token + */ async function getM2MToken() { return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET) } @@ -70,27 +73,6 @@ async function getUserGroup(userId) { }) } -async function checkBroadcastMessageForUser(userId, bulkMessage) { - return new Promise(function (resolve, reject) { - Promise.all([ - checkUserSkill(userId, bulkMessage), - checkUserGroup(userId, bulkMessage), - ]).then((results) => { - let flag = true // TODO need to be sure about default value - _.map(results, (r) => { - flag = !r ? false : flag // TODO recheck condition - }) - logger.info(`Final condition result is: ${flag}`) - resolve({ - record: bulkMessage, - result: flag - }) - }).catch((err) => { - reject(`${logPrefix} got issue in checking recipient condition. ${err}`) - }) - }) // promise end -} - /** * Helper function - check Skill condition */ @@ -147,6 +129,33 @@ async function checkUserGroup(userId, bulkMessage) { }) } +/** + * Main Function - check if broadcast message is for current user or not + * + * @param {Integer} userId + * @param {Object} bulkMessage + */ +async function checkBroadcastMessageForUser(userId, bulkMessage) { + return new Promise(function (resolve, reject) { + Promise.all([ + checkUserSkill(userId, bulkMessage), + checkUserGroup(userId, bulkMessage), + ]).then((results) => { + let flag = true // TODO need to be sure about default value + _.map(results, (r) => { + flag = !r ? false : flag // TODO recheck condition + }) + logger.info(`Final condition result is: ${flag}`) + resolve({ + record: bulkMessage, + result: flag + }) + }).catch((err) => { + reject(`${logPrefix} got issue in checking recipient condition. ${err}`) + }) + }) // promise end +} + module.exports = { checkBroadcastMessageForUser, } \ No newline at end of file diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js index 9eb59f3..1d0c266 100644 --- a/src/hooks/hookBulkMessage.js +++ b/src/hooks/hookBulkMessage.js @@ -37,7 +37,7 @@ async function checkBulkMessageForUser(userId) { let result = true if (tUserRefs < tBulkMessages) { logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) - result = await syncBulkMessageForUser(userId) + result = await syncBulkMessageForUser(userId) } resolve(result) // resolve here }).catch((e) => { @@ -102,23 +102,25 @@ async function isBroadCastMessageForUser(userId, bulkMessage) { } /** - * Helper function + * Helper function - Insert in bulkMessage user reference table + * * @param {Integer} userId * @param {Integer} bulkMessageId * @param {Integer} notificationId */ async function insertUserRefs(userId, bulkMessageId, notificationId) { - return new Promise(function (resolve, reject) { - models.BulkMessageUserRefs.create({ + try { + const r = await models.BulkMessageUserRefs.create({ bulk_message_id: bulkMessageId, user_id: userId, notification_id: notificationId, - }).then((b) => { - resolve(`${logPrefix} Inserted userRef record ${b.id} for current user ${userId}`) - }).catch((e) => { - reject(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`) }) - }) + logger.info(`${logPrefix} Inserted userRef record for bulk message id ${r.id} for current user ${userId}`) + return r + } catch (e) { + logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`) + return e + } } /** @@ -127,8 +129,8 @@ async function insertUserRefs(userId, bulkMessageId, notificationId) { * @param {Object} bulkMessage */ async function createNotificationForUser(userId, bulkMessage) { - return new Promise(function (resolve, reject) { - models.Notification.create({ + try { + const n = await models.Notification.create({ userId: userId, type: bulkMessage.type, contents: { @@ -140,14 +142,15 @@ async function createNotificationForUser(userId, bulkMessage) { read: false, seen: false, version: null, - }).then(async (n) => { - logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) - const result = await insertUserRefs(userId, bulkMessage.id, n.id) - resolve(result) - }).catch((err) => { - reject(`${logPrefix} Error in inserting broadcast message: ${err} `) }) - }) + logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) + // TODO need to be in transaction so that rollback will be possible + const result = await insertUserRefs(userId, bulkMessage.id, n.id) + return result + } catch (e) { + logger.error(`${logPrefix} Error in inserting broadcast message: ${err} `) + return e + } } diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js index 5026f6e..2bdbc6b 100644 --- a/src/models/BulkMessages.js +++ b/src/models/BulkMessages.js @@ -11,12 +11,10 @@ module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', { - id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, - type: { type: DataTypes.STRING, allowNull: false }, - message: { type: DataTypes.TEXT, allowNull: false }, - recipients: { type: DataTypes.JSONB, allowNull: false }, - rules: {type: DataTypes.JSONB, allowNull: true} - }, {}); - + id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, + type: { type: DataTypes.STRING, allowNull: false }, + message: { type: DataTypes.TEXT, allowNull: false }, + recipients: { type: DataTypes.JSONB, allowNull: false }, +}, {}); + // sequelize will generate and manage createdAt, updatedAt fields - \ No newline at end of file diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js index 11aa557..93d1e04 100644 --- a/src/processors/broadcast/bulkNotificationHandler.js +++ b/src/processors/broadcast/bulkNotificationHandler.js @@ -1,8 +1,9 @@ /** * Bulk notification handler. */ -const co = require('co'); -const models = require('../../models'); +const joi = require('joi') +const co = require('co') +const models = require('../../models') const logger = require('../../common/logger') /** @@ -14,22 +15,38 @@ const logger = require('../../common/logger') * @return {Promise} promise resolved to notifications */ const handle = (message, ruleSets) => co(function* () { - return new Promise(function(resolve, reject){ - models.BulkMessages.create({ - type: message.topic, - message: message.payload.message, - recipients: message.payload.recipients, - rules: message.payload.rules || null, - }).then((bm) => { - logger.info("Broadcast message recieved and inserted in db with id:", bm.id) - resolve([]) // no notification need to insert at this point - }).catch((e) => { - logger.error("Broadcast processor failed in db operation. Error: ", e) - reject(e) - }) - }) + try { + const bm = yield models.BulkMessages.create({ + type: message.topic, + message: message.payload.message, + recipients: message.payload.recipients, + }) + logger.info("Broadcast message recieved and inserted in db with id:", bm.id) + } catch (e) { + logger.error(`Broadcast processor failed in db operation. Error: ${e}`) + } + return [] // this point of time, send empty notification object }); +/** + * validate kafka payload + */ +handle.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + message: joi.string().required(), + recipients: joi.object().required(), + }), + }).required(), + ruleSets: joi.object(), +}; + module.exports = { handle, -}; \ No newline at end of file +}; + +logger.buildService(module.exports); \ No newline at end of file From 457a97ee30b8eaa3e4b627427726a117dd5203da Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Thu, 20 Feb 2020 17:01:37 +0530 Subject: [PATCH 07/11] correcting logic for public group --- src/common/broadcastAPIHelper.js | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 3cda1e2..9669921 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -110,16 +110,18 @@ async function checkUserGroup(userId, bulkMessage) { return new Promise(async function (resolve, reject) { try { const groups = _.get(bulkMessage, 'recipients.groups') - let flag = true // TODO + let flag = false // default + const userGroupInfo = await getUserGroup(userId) if (groups.length > 0) { - flag = false - const groupInfo = await getUserGroup(userId) - _.map(groupInfo, (o) => { - if (_.indexOf(groups, "public") >= 0) { - flag = (_.get(o, "privateGroup")) ? false : flag - } else { - flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag - } + _.map(userGroupInfo, (o) => { + // particular group only condition + flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag + }) + } else { // no group condition means its for `public` no private group + flag = true // default allow for all + _.map(userGroupInfo, (o) => { + // not allow if user is part of any private group + flag = (_.get(o, "privateGroup")) ? false : flag }) } resolve(flag) From a036f9fde1e37c90d491413cf38800a0c1494b77 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Thu, 20 Feb 2020 17:06:13 +0530 Subject: [PATCH 08/11] prining log --- src/common/broadcastAPIHelper.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 9669921..1f20b58 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -147,7 +147,7 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) { _.map(results, (r) => { flag = !r ? false : flag // TODO recheck condition }) - logger.info(`Final condition result is: ${flag}`) + logger.info(`BCA: Final recepient condition result is: ${flag} for userId ${userId}`) resolve({ record: bulkMessage, result: flag From 62cf1a1f30764439b8f5290e5c37b7093bd845a8 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 21 Feb 2020 17:59:20 +0530 Subject: [PATCH 09/11] changes in group api calling.. --- src/common/broadcastAPIHelper.js | 78 ++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 1f20b58..89e7f80 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -1,5 +1,5 @@ /** - * + * Broadcast: API calling helper */ const _ = require('lodash') @@ -40,7 +40,8 @@ async function getMemberInfo(userId) { logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`) resolve(memberInfo) } catch (err) { - reject(new Error(`BCA Memeber API: Failed to get member api detail for user id ${userId}, ${err}`)) + reject(new Error(`BCA Memeber API: Failed to get member ` + + `api detail for user id ${userId}, ${err}`)) } }) @@ -51,26 +52,53 @@ async function getMemberInfo(userId) { * @param {Integer} userId */ async function getUserGroup(userId) { - //TODO need to take care of pagination - const url = config.TC_API_V5_BASE_URL + - `/groups/?memberId=${userId}` + - "&membershipType=user&page=1" - let groupInfo = [] - return new Promise(async (resolve, reject) => { - try { - const machineToken = await getM2MToken() - //logger.info(`BCA Group API: got m2m token of length ${machineToken.length}`) - const res = await request.get(url).set('Authorization', `Bearer ${machineToken}`); - if (_.get(res, 'res.statusCode') != 200) { - reject(new Error(`BCA Group API: Failed to get group detail for user id ${userId}`)) - } - groupInfo = _.get(res, 'body') - logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`) - resolve(groupInfo) - } catch (e) { - reject(`Calling group api ${e}`) + try { + const machineToken = await getM2MToken() + if (machineToken.length <= 0) { + return (new Error(`BCA Group API: fecthing m2m token failed for ${userId}`)) } - }) + let nextPage + let res + let url + let page = 1 + let groupInfo = [] + const perPage = 100 + do { + url = config.TC_API_V5_BASE_URL + + `/groups/?memberId=${userId}&membershipType=user` + + `&page=${page}&perPage=${perPage}` + res = await callApi(url, machineToken) + let resStatus = _.get(res, 'res.statusCode') + if (resStatus != 200) { + throw new Error(`BCA Group API: Failed for user id ${userId},` + + ` response status ${resStatus}`) + } + let data = _.get(res, 'body') + groupInfo = groupInfo.concat(data) + nextPage = _.get(res, 'header.x-next-page') + page = nextPage + } while (nextPage) + logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`) + return groupInfo + } catch (e) { + logger.error(`BCA: Error calling group api : ${e}`) + throw new Error(`getUserGroup() : ${e}`) + } +} + +/** + * + * @param {String} url + * @param {String} machineToken + */ +async function callApi(url, machineToken) { + try { + logger.info(`calling api url ${url}`) + return request.get(url).set('Authorization', `Bearer ${machineToken}`) + } catch (e) { + logger.error(`Error in calling URL ${url}, ${e}`) + throw new Error(`callApi() : ${e}`) + } } /** @@ -92,7 +120,8 @@ async function checkUserSkill(userId, bulkMessage) { _.map(skills, (s) => { if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { flag = true - logger.info(`BroadcastMessageId: ${bulkMessage.id}, '${s}' skill matached for user id ${userId}`) + logger.info(`BroadcastMessageId: ${bulkMessage.id},` + + ` '${s}' skill matached for user id ${userId}`) } }) } @@ -123,6 +152,8 @@ async function checkUserGroup(userId, bulkMessage) { // not allow if user is part of any private group flag = (_.get(o, "privateGroup")) ? false : flag }) + logger.info(`public group condition for userId ${userId}` + + ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`) } resolve(flag) } catch (e) { @@ -147,7 +178,8 @@ async function checkBroadcastMessageForUser(userId, bulkMessage) { _.map(results, (r) => { flag = !r ? false : flag // TODO recheck condition }) - logger.info(`BCA: Final recepient condition result is: ${flag} for userId ${userId}`) + logger.info(`BCA: messageId: ${bulkMessage.id} Final recipient` + + ` condition result is: ${flag} for userId ${userId}`) resolve({ record: bulkMessage, result: flag From 83490b3b82aa54c4c66ece04de2abfd14f4abfa0 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Mon, 24 Feb 2020 19:09:23 +0530 Subject: [PATCH 10/11] adding track conditions --- src/common/broadcastAPIHelper.js | 125 ++++++++++++++++++------------- 1 file changed, 73 insertions(+), 52 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 89e7f80..18d759b 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -25,8 +25,7 @@ async function getM2MToken() { async function getMemberInfo(userId) { const url = config.TC_API_V3_BASE_URL + "/members/_search/?" + - "fields=userId%2Cskills" + - `&query=userId%3A${userId}` + + `query=userId%3A${userId}` + `&limit=1` return new Promise(async function (resolve, reject) { let memberInfo = [] @@ -102,64 +101,86 @@ async function callApi(url, machineToken) { } /** - * Helper function - check Skill condition + * Helper function - check Skills and Tracks condition */ -async function checkUserSkill(userId, bulkMessage) { - return new Promise(async function (resolve, reject) { - try { - const skills = _.get(bulkMessage, 'recipients.skills') - let flag = true // allow for all - if (skills && skills.length > 0) { - const m = await getMemberInfo(userId) - const ms = _.get(m[0], "skills") // get member skills - const memberSkills = [] - flag = false - _.map(ms, (o) => { - memberSkills.push(_.get(o, 'name').toLowerCase()) - }) - _.map(skills, (s) => { - if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { - flag = true - logger.info(`BroadcastMessageId: ${bulkMessage.id},` + - ` '${s}' skill matached for user id ${userId}`) - } - }) - } - resolve(flag) - } catch (e) { - reject(e) +async function checkUserSkillsAndTracks(userId, bulkMessage) { + try { + const skills = _.get(bulkMessage, 'recipients.skills') + const tracks = _.get(bulkMessage, 'recipients.tracks') + const m = await getMemberInfo(userId) + let skillMatch, trackMatch = false // default + if (skills && skills.length > 0) { + const ms = _.get(m[0], "skills") // get member skills + const memberSkills = [] + skillMatch = false + _.map(ms, (o) => { + memberSkills.push(_.get(o, 'name').toLowerCase()) + }) + _.map(skills, (s) => { + if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { + skillMatch = true + logger.info(`BroadcastMessageId: ${bulkMessage.id},` + + ` '${s}' skill matached for user id ${userId}`) + } + }) + } else { + skillMatch = true // no condition, means allow for all } - }) // promise end + + // + if (tracks.length > 0) { + trackMatch = false + const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges1") + const uDesignChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") + const uDSChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") + _.map(tracks, (t) => { + /** + * checking if user participated in specific challenges + */ + if (t.equalsIgnoreCase("DEVELOP")) { + trackMatch = uDevChallenges > 0 ? true : trackMatch + } else if (t.equalsIgnoreCase("DESIGN")) { + trackMatch = uDesignChallenges > 0 ? true : trackMatch + } else if (t.equalsIgnoreCase("DATA_SCIENCE")) { + trackMatch = uDSChallenges > 0 ? true : trackMatch + } + }) + } else { + trackMatch = true // no condition, means allow for all + } + const flag = (skillMatch && trackMatch) ? true : false + return flag + } catch (e) { + throw new Error(`checkUserSkillsAndTracks() : ${e}`) + } } /** * Helper function - check group condition */ async function checkUserGroup(userId, bulkMessage) { - return new Promise(async function (resolve, reject) { - try { - const groups = _.get(bulkMessage, 'recipients.groups') - let flag = false // default - const userGroupInfo = await getUserGroup(userId) - if (groups.length > 0) { - _.map(userGroupInfo, (o) => { - // particular group only condition - flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag - }) - } else { // no group condition means its for `public` no private group - flag = true // default allow for all - _.map(userGroupInfo, (o) => { - // not allow if user is part of any private group - flag = (_.get(o, "privateGroup")) ? false : flag - }) - logger.info(`public group condition for userId ${userId}` + - ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`) - } - resolve(flag) - } catch (e) { - reject(e) + try { + const groups = _.get(bulkMessage, 'recipients.groups') + let flag = false // default + const userGroupInfo = await getUserGroup(userId) + if (groups.length > 0) { + _.map(userGroupInfo, (o) => { + // particular group only condition + flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag + }) + } else { // no group condition means its for `public` no private group + flag = true // default allow for all + _.map(userGroupInfo, (o) => { + // not allow if user is part of any private group + flag = (_.get(o, "privateGroup")) ? false : flag + }) + logger.info(`public group condition for userId ${userId}` + + ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`) } - }) + return flag + } catch (e) { + throw new Error(`checkUserGroup(): ${e}`) + } } /** @@ -171,7 +192,7 @@ async function checkUserGroup(userId, bulkMessage) { async function checkBroadcastMessageForUser(userId, bulkMessage) { return new Promise(function (resolve, reject) { Promise.all([ - checkUserSkill(userId, bulkMessage), + checkUserSkillsAndTracks(userId, bulkMessage), checkUserGroup(userId, bulkMessage), ]).then((results) => { let flag = true // TODO need to be sure about default value From 7810afce454c8227f8e8cd39d4de127caf3424fc Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 26 Feb 2020 12:27:53 +0530 Subject: [PATCH 11/11] fixing typo --- src/common/broadcastAPIHelper.js | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js index 18d759b..b196cf4 100644 --- a/src/common/broadcastAPIHelper.js +++ b/src/common/broadcastAPIHelper.js @@ -130,18 +130,19 @@ async function checkUserSkillsAndTracks(userId, bulkMessage) { // if (tracks.length > 0) { trackMatch = false - const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges1") - const uDesignChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") - const uDSChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") + const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") + const uDesignChallenges = _.get(m[0], "stats[0].DESIGN.challenges") + const uDSChallenges = _.get(m[0], "stats[0].DATA_SCIENCE.challenges") _.map(tracks, (t) => { /** * checking if user participated in specific challenges */ - if (t.equalsIgnoreCase("DEVELOP")) { + let key = t.toLowerCase() + if (key === "develop") { trackMatch = uDevChallenges > 0 ? true : trackMatch - } else if (t.equalsIgnoreCase("DESIGN")) { + } else if (key === "design") { trackMatch = uDesignChallenges > 0 ? true : trackMatch - } else if (t.equalsIgnoreCase("DATA_SCIENCE")) { + } else if (key === "data_science") { trackMatch = uDSChallenges > 0 ? true : trackMatch } })