diff --git a/.circleci/config.yml b/.circleci/config.yml index f87288b..7bc897d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -102,7 +102,7 @@ workflows: context : org-global filters: branches: - only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/platform-filtering'] + only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/bulk-notification'] - "build-prod": context : org-global filters: diff --git a/config/default.js b/config/default.js index bbcc8c5..904b068 100644 --- a/config/default.js +++ b/config/default.js @@ -58,7 +58,7 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Challenge', + group: 'challenge', title: 'Challenge specification is modified.', }, }, @@ -75,7 +75,7 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Challenge', + group: 'challenge', title: 'Challenge checkpoint review.', }, }, @@ -92,12 +92,16 @@ module.exports = { { id: 0, /** challengeid or projectid */ name: '', /** challenge name */ - group: 'Submission', + group: 'submission', title: 'A new submission is uploaded.', }, }, }, ], + 'admin.notification.broadcast' : [{ + handleBulkNotification: {} + } + ] //'notifications.community.challenge.created': ['handleChallengeCreated'], //'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'], }, @@ -108,4 +112,5 @@ module.exports = { ENABLE_DEV_MODE: process.env.ENABLE_DEV_MODE ? Boolean(process.env.ENABLE_DEV_MODE) : true, DEV_MODE_EMAIL: process.env.DEV_MODE_EMAIL, DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL, + ENABLE_HOOK_BULK_NOTIFICATION : process.env.ENABLE_HOOK_BULK_NOTIFICATION || false, }; diff --git a/emails/src/partials/project-team.html b/emails/src/partials/project-team.html index 5c3671d..e8802ae 100644 --- a/emails/src/partials/project-team.html +++ b/emails/src/partials/project-team.html @@ -58,11 +58,7 @@ {{userFullName}} joined the project {{/if}} {{#if [connect.notification.project.member.invite.created]}} - {{#if [isSSO]}} - Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please use the link below to sign in and join the project. - {{else}} - Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("View project on Connect") below to join. - {{/if}} + Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("Join Project") below to join. {{/if}} {{#if [connect.notification.project.member.invite.requested]}} You are requested to add {{userFullName}} as a copilot diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js new file mode 100644 index 0000000..b196cf4 --- /dev/null +++ b/src/common/broadcastAPIHelper.js @@ -0,0 +1,217 @@ +/** + * Broadcast: API calling helper + */ + +const _ = require('lodash') +const config = require('config') +const request = require('superagent') +const logger = require('./logger') +const m2mAuth = require('tc-core-library-js').auth.m2m; +const m2m = m2mAuth(config); + +const logPrefix = "BroadcastAPI: " + +/** + * Helper Function - get m2m token + */ +async function getM2MToken() { + return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET) +} + +/** + * Helper Function - get member profile + * @param {Integer} userId + */ +async function getMemberInfo(userId) { + const url = config.TC_API_V3_BASE_URL + + "/members/_search/?" + + `query=userId%3A${userId}` + + `&limit=1` + return new Promise(async function (resolve, reject) { + let memberInfo = [] + logger.info(`calling member api ${url} `) + try { + const res = await request.get(url) + if (!_.get(res, 'body.result.success')) { + reject(new Error(`BCA Memeber API: Failed to get member detail for user id ${userId}`)) + } + memberInfo = _.get(res, 'body.result.content') + logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`) + resolve(memberInfo) + } catch (err) { + reject(new Error(`BCA Memeber API: Failed to get member ` + + `api detail for user id ${userId}, ${err}`)) + } + + }) +} + +/** + * Helper Function - get user group + * @param {Integer} userId + */ +async function getUserGroup(userId) { + try { + const machineToken = await getM2MToken() + if (machineToken.length <= 0) { + return (new Error(`BCA Group API: fecthing m2m token failed for ${userId}`)) + } + let nextPage + let res + let url + let page = 1 + let groupInfo = [] + const perPage = 100 + do { + url = config.TC_API_V5_BASE_URL + + `/groups/?memberId=${userId}&membershipType=user` + + `&page=${page}&perPage=${perPage}` + res = await callApi(url, machineToken) + let resStatus = _.get(res, 'res.statusCode') + if (resStatus != 200) { + throw new Error(`BCA Group API: Failed for user id ${userId},` + + ` response status ${resStatus}`) + } + let data = _.get(res, 'body') + groupInfo = groupInfo.concat(data) + nextPage = _.get(res, 'header.x-next-page') + page = nextPage + } while (nextPage) + logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`) + return groupInfo + } catch (e) { + logger.error(`BCA: Error calling group api : ${e}`) + throw new Error(`getUserGroup() : ${e}`) + } +} + +/** + * + * @param {String} url + * @param {String} machineToken + */ +async function callApi(url, machineToken) { + try { + logger.info(`calling api url ${url}`) + return request.get(url).set('Authorization', `Bearer ${machineToken}`) + } catch (e) { + logger.error(`Error in calling URL ${url}, ${e}`) + throw new Error(`callApi() : ${e}`) + } +} + +/** + * Helper function - check Skills and Tracks condition + */ +async function checkUserSkillsAndTracks(userId, bulkMessage) { + try { + const skills = _.get(bulkMessage, 'recipients.skills') + const tracks = _.get(bulkMessage, 'recipients.tracks') + const m = await getMemberInfo(userId) + let skillMatch, trackMatch = false // default + if (skills && skills.length > 0) { + const ms = _.get(m[0], "skills") // get member skills + const memberSkills = [] + skillMatch = false + _.map(ms, (o) => { + memberSkills.push(_.get(o, 'name').toLowerCase()) + }) + _.map(skills, (s) => { + if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) { + skillMatch = true + logger.info(`BroadcastMessageId: ${bulkMessage.id},` + + ` '${s}' skill matached for user id ${userId}`) + } + }) + } else { + skillMatch = true // no condition, means allow for all + } + + // + if (tracks.length > 0) { + trackMatch = false + const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges") + const uDesignChallenges = _.get(m[0], "stats[0].DESIGN.challenges") + const uDSChallenges = _.get(m[0], "stats[0].DATA_SCIENCE.challenges") + _.map(tracks, (t) => { + /** + * checking if user participated in specific challenges + */ + let key = t.toLowerCase() + if (key === "develop") { + trackMatch = uDevChallenges > 0 ? true : trackMatch + } else if (key === "design") { + trackMatch = uDesignChallenges > 0 ? true : trackMatch + } else if (key === "data_science") { + trackMatch = uDSChallenges > 0 ? true : trackMatch + } + }) + } else { + trackMatch = true // no condition, means allow for all + } + const flag = (skillMatch && trackMatch) ? true : false + return flag + } catch (e) { + throw new Error(`checkUserSkillsAndTracks() : ${e}`) + } +} + +/** + * Helper function - check group condition + */ +async function checkUserGroup(userId, bulkMessage) { + try { + const groups = _.get(bulkMessage, 'recipients.groups') + let flag = false // default + const userGroupInfo = await getUserGroup(userId) + if (groups.length > 0) { + _.map(userGroupInfo, (o) => { + // particular group only condition + flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag + }) + } else { // no group condition means its for `public` no private group + flag = true // default allow for all + _.map(userGroupInfo, (o) => { + // not allow if user is part of any private group + flag = (_.get(o, "privateGroup")) ? false : flag + }) + logger.info(`public group condition for userId ${userId}` + + ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`) + } + return flag + } catch (e) { + throw new Error(`checkUserGroup(): ${e}`) + } +} + +/** + * Main Function - check if broadcast message is for current user or not + * + * @param {Integer} userId + * @param {Object} bulkMessage + */ +async function checkBroadcastMessageForUser(userId, bulkMessage) { + return new Promise(function (resolve, reject) { + Promise.all([ + checkUserSkillsAndTracks(userId, bulkMessage), + checkUserGroup(userId, bulkMessage), + ]).then((results) => { + let flag = true // TODO need to be sure about default value + _.map(results, (r) => { + flag = !r ? false : flag // TODO recheck condition + }) + logger.info(`BCA: messageId: ${bulkMessage.id} Final recipient` + + ` condition result is: ${flag} for userId ${userId}`) + resolve({ + record: bulkMessage, + result: flag + }) + }).catch((err) => { + reject(`${logPrefix} got issue in checking recipient condition. ${err}`) + }) + }) // promise end +} + +module.exports = { + checkBroadcastMessageForUser, +} \ No newline at end of file diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js new file mode 100644 index 0000000..cacbe44 --- /dev/null +++ b/src/hooks/hookBulkMessage.js @@ -0,0 +1,158 @@ +/** + * Hook to insert broadcast notification into database for a user. + */ + +'use strict' + +const _ = require('lodash') +const logger = require('../common/logger') +const models = require('../models') +const api = require('../common/broadcastAPIHelper') + +const logPrefix = "BulkNotificationHook: " + +/** + * CREATE NEW TABLES IF NOT EXISTS + */ +models.BulkMessages.sync().then((t) => { + models.BulkMessageUserRefs.sync() +}) + +/** + * Main function + * @param {Integer} userId + */ +async function checkBulkMessageForUser(userId) { + return new Promise(function (resolve, reject) { + models.BulkMessages.count().then(function (tBulkMessages) { + if (tBulkMessages > 0) { + // the condition can help to optimize the execution + models.BulkMessageUserRefs.count({ + where: { + user_id: userId + } + }).then(async function (tUserRefs) { + let result = true + if (tUserRefs < tBulkMessages) { + logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`) + result = await syncBulkMessageForUser(userId) + } + resolve(result) // resolve here + }).catch((e) => { + reject(`${logPrefix} Failed to check total userRefs condition. Error: ${e}`) + }) + } else { + resolve(true) + } + }).catch((e) => { + logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e) + reject(e) + }) + }) +} + +/** + * Helper function + * @param {Integer} userId + */ +async function syncBulkMessageForUser(userId) { + + return new Promise(function (resolve, reject) { + /** + * Check if all bulk mesaages processed for current user or not + */ + let q = "SELECT a.* FROM bulk_messages AS a " + + " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " + + " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" + + " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL" + models.sequelize.query(q, { bind: [userId] }) + .then(function (res) { + Promise.all(res[0].map((r) => isBroadCastMessageForUser(userId, r))) + .then((results) => { + Promise.all(results.map((o) => { + if (o.result) { + return createNotificationForUser(userId, o.record) + } else { + return insertUserRefs(userId, o.record.id, null) + } + })).then((results) => { + resolve(results) + }).catch((e) => { + reject(e) + }) + }).catch((e) => { + reject(e) + }) + }).catch((e) => { + reject(`${logPrefix} Failed to check bulk message condition: error - ${e}`) + }) + }) +} + +/** + * Helper function + * Check if current user in broadcast recipent group + * @param {Integer} userId + * @param {Object} bulkMessage + */ +async function isBroadCastMessageForUser(userId, bulkMessage) { + return api.checkBroadcastMessageForUser(userId, bulkMessage) +} + +/** + * Helper function - Insert in bulkMessage user reference table + * + * @param {Integer} userId + * @param {Integer} bulkMessageId + * @param {Integer} notificationId + */ +async function insertUserRefs(userId, bulkMessageId, notificationId) { + try { + const r = await models.BulkMessageUserRefs.create({ + bulk_message_id: bulkMessageId, + user_id: userId, + notification_id: notificationId, + }) + logger.info(`${logPrefix} Inserted userRef record for bulk message id ${r.id} for current user ${userId}`) + return r + } catch (e) { + logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`) + throw new Error(`insertUserRefs() : ${e}`) + } +} + +/** + * Helper function + * @param {Integer} userId + * @param {Object} bulkMessage + */ +async function createNotificationForUser(userId, bulkMessage) { + try { + const n = await models.Notification.create({ + userId: userId, + type: bulkMessage.type, + contents: { + id: bulkMessage.id, /** broadcast message id */ + message: bulkMessage.message, /** broadcast message */ + group: 'broadcast', + title: 'Broadcast Message', + }, + read: false, + seen: false, + version: null, + }) + logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`) + // TODO need to be in transaction so that rollback will be possible + const result = await insertUserRefs(userId, bulkMessage.id, n.id) + return result + } catch (e) { + logger.error(`${logPrefix} insert broadcast notification error: ${e} `) + throw new Error(`createNotificationForUser() : ${e}`) + } +} + + +// Exports +module.exports = { + checkBulkMessageForUser, +}; \ No newline at end of file diff --git a/src/hooks/index.js b/src/hooks/index.js new file mode 100644 index 0000000..5dcc1a6 --- /dev/null +++ b/src/hooks/index.js @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * Hook implementation + * + * @author TCSCODER + * @version 1.0 + */ + +const hookBulkMessage = require("./hookBulkMessage") + + +module.exports = { + hookBulkMessage, +}; diff --git a/src/models/BulkMessageUserRefs.js b/src/models/BulkMessageUserRefs.js new file mode 100644 index 0000000..7f3baf3 --- /dev/null +++ b/src/models/BulkMessageUserRefs.js @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * The Bulk Message User Reference schema + * + * @author TCSCODER + * @version 1.0 + */ + + +module.exports = (sequelize, DataTypes) => sequelize.define('bulk_message_user_refs', { + id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, + bulk_message_id: { + type: DataTypes.BIGINT, + allowNull: false, + references: { + model: 'bulk_messages', + key: 'id' + } + }, + notification_id: { + type: DataTypes.BIGINT, + allowNull: true, + references: { + model: 'Notifications', + key: 'id' + } + }, + user_id: { type: DataTypes.BIGINT, allowNull: false } +}, {}); + + // sequelize will generate and manage createdAt, updatedAt fields diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js new file mode 100644 index 0000000..2bdbc6b --- /dev/null +++ b/src/models/BulkMessages.js @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2020 TopCoder Inc., All Rights Reserved. + */ + +/** + * The Bulk Message Store schema + * + * @author TCSCODER + * @version 1.0 + */ + + +module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', { + id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, + type: { type: DataTypes.STRING, allowNull: false }, + message: { type: DataTypes.TEXT, allowNull: false }, + recipients: { type: DataTypes.JSONB, allowNull: false }, +}, {}); + + // sequelize will generate and manage createdAt, updatedAt fields diff --git a/src/models/index.js b/src/models/index.js index e6ef09e..d18f68a 100644 --- a/src/models/index.js +++ b/src/models/index.js @@ -16,11 +16,17 @@ const Notification = require('./Notification')(sequelize, DataTypes); const NotificationSetting = require('./NotificationSetting')(sequelize, DataTypes); const ServiceSettings = require('./ServiceSettings')(sequelize, DataTypes); const ScheduledEvents = require('./ScheduledEvents')(sequelize, DataTypes); +const BulkMessages = require('./BulkMessages')(sequelize, DataTypes); +const BulkMessageUserRefs = require('./BulkMessageUserRefs')(sequelize, DataTypes); + module.exports = { Notification, NotificationSetting, ServiceSettings, ScheduledEvents, + BulkMessages, + BulkMessageUserRefs, + sequelize, init: () => sequelize.sync(), }; diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js new file mode 100644 index 0000000..93d1e04 --- /dev/null +++ b/src/processors/broadcast/bulkNotificationHandler.js @@ -0,0 +1,52 @@ +/** + * Bulk notification handler. + */ +const joi = require('joi') +const co = require('co') +const models = require('../../models') +const logger = require('../../common/logger') + +/** + * Handle Kafka JSON message of broadcast. + * + * @param {Object} message the Kafka JSON message + * @param {Object} ruleSets + * + * @return {Promise} promise resolved to notifications + */ +const handle = (message, ruleSets) => co(function* () { + try { + const bm = yield models.BulkMessages.create({ + type: message.topic, + message: message.payload.message, + recipients: message.payload.recipients, + }) + logger.info("Broadcast message recieved and inserted in db with id:", bm.id) + } catch (e) { + logger.error(`Broadcast processor failed in db operation. Error: ${e}`) + } + return [] // this point of time, send empty notification object +}); + +/** + * validate kafka payload + */ +handle.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + message: joi.string().required(), + recipients: joi.object().required(), + }), + }).required(), + ruleSets: joi.object(), +}; + +module.exports = { + handle, +}; + +logger.buildService(module.exports); \ No newline at end of file diff --git a/src/processors/index.js b/src/processors/index.js index 70d8c6a..a0243be 100644 --- a/src/processors/index.js +++ b/src/processors/index.js @@ -8,6 +8,7 @@ const ChallengePhaseWarningHandler = require('./challenge/ChallengePhaseWarningH const ChallengeHandler = require('./challenge/ChallengeHandler'); const AutoPilotHandler = require('./challenge/AutoPilotHandler'); const SubmissionHandler = require('./challenge/SubmissionHandler'); +const BulkNotificationHandler = require('./broadcast/bulkNotificationHandler'); // Exports module.exports = { @@ -16,4 +17,5 @@ module.exports = { handleChallenge: ChallengeHandler.handle, handleAutoPilot: AutoPilotHandler.handle, handleSubmission: SubmissionHandler.handle, + handleBulkNotification: BulkNotificationHandler.handle, }; diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index cb92781..6fa81f8 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -9,6 +9,8 @@ const Joi = require('joi'); const errors = require('../common/errors'); const logger = require('../common/logger'); const models = require('../models'); +const config = require('config'); +const hooks = require('../hooks'); const DEFAULT_LIMIT = 10; @@ -202,11 +204,19 @@ function* listNotifications(query, userId) { break; } + if (config.ENABLE_HOOK_BULK_NOTIFICATION) { + try { + yield hooks.hookBulkMessage.checkBulkMessageForUser(userId) + } catch (e) { + logger.error(`Error in calling bulk notification hook: ${e}`) + } + } + if (_.keys(notificationSettings).length > 0) { // only filter out notifications types which were explicitly set to 'no' - so we return notification by default const notifications = _.keys(notificationSettings).filter((notificationType) => - !notificationSettings[notificationType] && - !notificationSettings[notificationType].web && + notificationSettings[notificationType] && + notificationSettings[notificationType].web && notificationSettings[notificationType].web.enabled === 'no' ); filter.where.type = Object.assign(filter.where.type || {}, { $notIn: notifications }); diff --git a/test/checkHooks.js b/test/checkHooks.js new file mode 100644 index 0000000..6371283 --- /dev/null +++ b/test/checkHooks.js @@ -0,0 +1,3 @@ +const bulkhook = require("../src/hooks/hookBulkMessage") + +bulkhook.checkBulkMessageForUser(123) \ No newline at end of file