diff --git a/.circleci/config.yml b/.circleci/config.yml
index f87288b..7bc897d 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -102,7 +102,7 @@ workflows:
context : org-global
filters:
branches:
- only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/platform-filtering']
+ only: [dev, 'hotfix/V5-API-Standards', 'v5-upgrade', 'feature/bulk-notification']
- "build-prod":
context : org-global
filters:
diff --git a/config/default.js b/config/default.js
index bbcc8c5..904b068 100644
--- a/config/default.js
+++ b/config/default.js
@@ -58,7 +58,7 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Challenge',
+ group: 'challenge',
title: 'Challenge specification is modified.',
},
},
@@ -75,7 +75,7 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Challenge',
+ group: 'challenge',
title: 'Challenge checkpoint review.',
},
},
@@ -92,12 +92,16 @@ module.exports = {
{
id: 0, /** challengeid or projectid */
name: '', /** challenge name */
- group: 'Submission',
+ group: 'submission',
title: 'A new submission is uploaded.',
},
},
},
],
+ 'admin.notification.broadcast' : [{
+ handleBulkNotification: {}
+ }
+ ]
//'notifications.community.challenge.created': ['handleChallengeCreated'],
//'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'],
},
@@ -108,4 +112,5 @@ module.exports = {
ENABLE_DEV_MODE: process.env.ENABLE_DEV_MODE ? Boolean(process.env.ENABLE_DEV_MODE) : true,
DEV_MODE_EMAIL: process.env.DEV_MODE_EMAIL,
DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL,
+ ENABLE_HOOK_BULK_NOTIFICATION : process.env.ENABLE_HOOK_BULK_NOTIFICATION || false,
};
diff --git a/emails/src/partials/project-team.html b/emails/src/partials/project-team.html
index 5c3671d..e8802ae 100644
--- a/emails/src/partials/project-team.html
+++ b/emails/src/partials/project-team.html
@@ -58,11 +58,7 @@
{{userFullName}} joined the project
{{/if}}
{{#if [connect.notification.project.member.invite.created]}}
- {{#if [isSSO]}}
- Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please use the link below to sign in and join the project.
- {{else}}
- Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("View project on Connect") below to join.
- {{/if}}
+ Hi {{userFullName}}, you are invited to join the project {{projectName}}. Please click on the button ("Join Project") below to join.
{{/if}}
{{#if [connect.notification.project.member.invite.requested]}}
You are requested to add {{userFullName}} as a copilot
diff --git a/src/common/broadcastAPIHelper.js b/src/common/broadcastAPIHelper.js
new file mode 100644
index 0000000..b196cf4
--- /dev/null
+++ b/src/common/broadcastAPIHelper.js
@@ -0,0 +1,217 @@
+/**
+ * Broadcast: API calling helper
+ */
+
+const _ = require('lodash')
+const config = require('config')
+const request = require('superagent')
+const logger = require('./logger')
+const m2mAuth = require('tc-core-library-js').auth.m2m;
+const m2m = m2mAuth(config);
+
+const logPrefix = "BroadcastAPI: "
+
+/**
+ * Helper Function - get m2m token
+ */
+async function getM2MToken() {
+ return m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
+}
+
+/**
+ * Helper Function - get member profile
+ * @param {Integer} userId
+ */
+async function getMemberInfo(userId) {
+ const url = config.TC_API_V3_BASE_URL +
+ "/members/_search/?" +
+ `query=userId%3A${userId}` +
+ `&limit=1`
+ return new Promise(async function (resolve, reject) {
+ let memberInfo = []
+ logger.info(`calling member api ${url} `)
+ try {
+ const res = await request.get(url)
+ if (!_.get(res, 'body.result.success')) {
+ reject(new Error(`BCA Memeber API: Failed to get member detail for user id ${userId}`))
+ }
+ memberInfo = _.get(res, 'body.result.content')
+ logger.info(`BCA Memeber API: Feteched ${memberInfo.length} record(s) from member api`)
+ resolve(memberInfo)
+ } catch (err) {
+ reject(new Error(`BCA Memeber API: Failed to get member ` +
+ `api detail for user id ${userId}, ${err}`))
+ }
+
+ })
+}
+
+/**
+ * Helper Function - get user group
+ * @param {Integer} userId
+ */
+async function getUserGroup(userId) {
+ try {
+ const machineToken = await getM2MToken()
+ if (machineToken.length <= 0) {
+ return (new Error(`BCA Group API: fecthing m2m token failed for ${userId}`))
+ }
+ let nextPage
+ let res
+ let url
+ let page = 1
+ let groupInfo = []
+ const perPage = 100
+ do {
+ url = config.TC_API_V5_BASE_URL +
+ `/groups/?memberId=${userId}&membershipType=user` +
+ `&page=${page}&perPage=${perPage}`
+ res = await callApi(url, machineToken)
+ let resStatus = _.get(res, 'res.statusCode')
+ if (resStatus != 200) {
+ throw new Error(`BCA Group API: Failed for user id ${userId},` +
+ ` response status ${resStatus}`)
+ }
+ let data = _.get(res, 'body')
+ groupInfo = groupInfo.concat(data)
+ nextPage = _.get(res, 'header.x-next-page')
+ page = nextPage
+ } while (nextPage)
+ logger.info(`BCA Group API: Feteched ${groupInfo.length} record(s) from group api`)
+ return groupInfo
+ } catch (e) {
+ logger.error(`BCA: Error calling group api : ${e}`)
+ throw new Error(`getUserGroup() : ${e}`)
+ }
+}
+
+/**
+ *
+ * @param {String} url
+ * @param {String} machineToken
+ */
+async function callApi(url, machineToken) {
+ try {
+ logger.info(`calling api url ${url}`)
+ return request.get(url).set('Authorization', `Bearer ${machineToken}`)
+ } catch (e) {
+ logger.error(`Error in calling URL ${url}, ${e}`)
+ throw new Error(`callApi() : ${e}`)
+ }
+}
+
+/**
+ * Helper function - check Skills and Tracks condition
+ */
+async function checkUserSkillsAndTracks(userId, bulkMessage) {
+ try {
+ const skills = _.get(bulkMessage, 'recipients.skills')
+ const tracks = _.get(bulkMessage, 'recipients.tracks')
+ const m = await getMemberInfo(userId)
+ let skillMatch, trackMatch = false // default
+ if (skills && skills.length > 0) {
+ const ms = _.get(m[0], "skills") // get member skills
+ const memberSkills = []
+ skillMatch = false
+ _.map(ms, (o) => {
+ memberSkills.push(_.get(o, 'name').toLowerCase())
+ })
+ _.map(skills, (s) => {
+ if (_.indexOf(memberSkills, s.toLowerCase()) >= 0) {
+ skillMatch = true
+ logger.info(`BroadcastMessageId: ${bulkMessage.id},` +
+ ` '${s}' skill matached for user id ${userId}`)
+ }
+ })
+ } else {
+ skillMatch = true // no condition, means allow for all
+ }
+
+ //
+ if (tracks.length > 0) {
+ trackMatch = false
+ const uDevChallenges = _.get(m[0], "stats[0].DEVELOP.challenges")
+ const uDesignChallenges = _.get(m[0], "stats[0].DESIGN.challenges")
+ const uDSChallenges = _.get(m[0], "stats[0].DATA_SCIENCE.challenges")
+ _.map(tracks, (t) => {
+ /**
+ * checking if user participated in specific challenges
+ */
+ let key = t.toLowerCase()
+ if (key === "develop") {
+ trackMatch = uDevChallenges > 0 ? true : trackMatch
+ } else if (key === "design") {
+ trackMatch = uDesignChallenges > 0 ? true : trackMatch
+ } else if (key === "data_science") {
+ trackMatch = uDSChallenges > 0 ? true : trackMatch
+ }
+ })
+ } else {
+ trackMatch = true // no condition, means allow for all
+ }
+ const flag = (skillMatch && trackMatch) ? true : false
+ return flag
+ } catch (e) {
+ throw new Error(`checkUserSkillsAndTracks() : ${e}`)
+ }
+}
+
+/**
+ * Helper function - check group condition
+ */
+async function checkUserGroup(userId, bulkMessage) {
+ try {
+ const groups = _.get(bulkMessage, 'recipients.groups')
+ let flag = false // default
+ const userGroupInfo = await getUserGroup(userId)
+ if (groups.length > 0) {
+ _.map(userGroupInfo, (o) => {
+ // particular group only condition
+ flag = (_.indexOf(groups, _.get(o, "name")) >= 0) ? true : flag
+ })
+ } else { // no group condition means its for `public` no private group
+ flag = true // default allow for all
+ _.map(userGroupInfo, (o) => {
+ // not allow if user is part of any private group
+ flag = (_.get(o, "privateGroup")) ? false : flag
+ })
+ logger.info(`public group condition for userId ${userId}` +
+ ` and BC messageId ${bulkMessage.id}, the result is: ${flag}`)
+ }
+ return flag
+ } catch (e) {
+ throw new Error(`checkUserGroup(): ${e}`)
+ }
+}
+
+/**
+ * Main Function - check if broadcast message is for current user or not
+ *
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+async function checkBroadcastMessageForUser(userId, bulkMessage) {
+ return new Promise(function (resolve, reject) {
+ Promise.all([
+ checkUserSkillsAndTracks(userId, bulkMessage),
+ checkUserGroup(userId, bulkMessage),
+ ]).then((results) => {
+ let flag = true // TODO need to be sure about default value
+ _.map(results, (r) => {
+ flag = !r ? false : flag // TODO recheck condition
+ })
+ logger.info(`BCA: messageId: ${bulkMessage.id} Final recipient` +
+ ` condition result is: ${flag} for userId ${userId}`)
+ resolve({
+ record: bulkMessage,
+ result: flag
+ })
+ }).catch((err) => {
+ reject(`${logPrefix} got issue in checking recipient condition. ${err}`)
+ })
+ }) // promise end
+}
+
+module.exports = {
+ checkBroadcastMessageForUser,
+}
\ No newline at end of file
diff --git a/src/hooks/hookBulkMessage.js b/src/hooks/hookBulkMessage.js
new file mode 100644
index 0000000..cacbe44
--- /dev/null
+++ b/src/hooks/hookBulkMessage.js
@@ -0,0 +1,158 @@
+/**
+ * Hook to insert broadcast notification into database for a user.
+ */
+
+'use strict'
+
+const _ = require('lodash')
+const logger = require('../common/logger')
+const models = require('../models')
+const api = require('../common/broadcastAPIHelper')
+
+const logPrefix = "BulkNotificationHook: "
+
+/**
+ * CREATE NEW TABLES IF NOT EXISTS
+ */
+models.BulkMessages.sync().then((t) => {
+ models.BulkMessageUserRefs.sync()
+})
+
+/**
+ * Main function
+ * @param {Integer} userId
+ */
+async function checkBulkMessageForUser(userId) {
+ return new Promise(function (resolve, reject) {
+ models.BulkMessages.count().then(function (tBulkMessages) {
+ if (tBulkMessages > 0) {
+ // the condition can help to optimize the execution
+ models.BulkMessageUserRefs.count({
+ where: {
+ user_id: userId
+ }
+ }).then(async function (tUserRefs) {
+ let result = true
+ if (tUserRefs < tBulkMessages) {
+ logger.info(`${logPrefix} Need to sync broadcast message for current user ${userId}`)
+ result = await syncBulkMessageForUser(userId)
+ }
+ resolve(result) // resolve here
+ }).catch((e) => {
+ reject(`${logPrefix} Failed to check total userRefs condition. Error: ${e}`)
+ })
+ } else {
+ resolve(true)
+ }
+ }).catch((e) => {
+ logger.error(`${logPrefix} Failed to check total broadcast message condition. Error: `, e)
+ reject(e)
+ })
+ })
+}
+
+/**
+ * Helper function
+ * @param {Integer} userId
+ */
+async function syncBulkMessageForUser(userId) {
+
+ return new Promise(function (resolve, reject) {
+ /**
+ * Check if all bulk mesaages processed for current user or not
+ */
+ let q = "SELECT a.* FROM bulk_messages AS a " +
+ " LEFT OUTER JOIN (SELECT id as refid, bulk_message_id " +
+ " FROM bulk_message_user_refs AS bmur WHERE bmur.user_id=$1)" +
+ " AS b ON a.id=b.bulk_message_id WHERE b.refid IS NULL"
+ models.sequelize.query(q, { bind: [userId] })
+ .then(function (res) {
+ Promise.all(res[0].map((r) => isBroadCastMessageForUser(userId, r)))
+ .then((results) => {
+ Promise.all(results.map((o) => {
+ if (o.result) {
+ return createNotificationForUser(userId, o.record)
+ } else {
+ return insertUserRefs(userId, o.record.id, null)
+ }
+ })).then((results) => {
+ resolve(results)
+ }).catch((e) => {
+ reject(e)
+ })
+ }).catch((e) => {
+ reject(e)
+ })
+ }).catch((e) => {
+ reject(`${logPrefix} Failed to check bulk message condition: error - ${e}`)
+ })
+ })
+}
+
+/**
+ * Helper function
+ * Check if current user in broadcast recipent group
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+async function isBroadCastMessageForUser(userId, bulkMessage) {
+ return api.checkBroadcastMessageForUser(userId, bulkMessage)
+}
+
+/**
+ * Helper function - Insert in bulkMessage user reference table
+ *
+ * @param {Integer} userId
+ * @param {Integer} bulkMessageId
+ * @param {Integer} notificationId
+ */
+async function insertUserRefs(userId, bulkMessageId, notificationId) {
+ try {
+ const r = await models.BulkMessageUserRefs.create({
+ bulk_message_id: bulkMessageId,
+ user_id: userId,
+ notification_id: notificationId,
+ })
+ logger.info(`${logPrefix} Inserted userRef record for bulk message id ${r.id} for current user ${userId}`)
+ return r
+ } catch (e) {
+ logger.error(`${logPrefix} Failed to insert userRef record for user: ${userId}, error: ${e}`)
+ throw new Error(`insertUserRefs() : ${e}`)
+ }
+}
+
+/**
+ * Helper function
+ * @param {Integer} userId
+ * @param {Object} bulkMessage
+ */
+async function createNotificationForUser(userId, bulkMessage) {
+ try {
+ const n = await models.Notification.create({
+ userId: userId,
+ type: bulkMessage.type,
+ contents: {
+ id: bulkMessage.id, /** broadcast message id */
+ message: bulkMessage.message, /** broadcast message */
+ group: 'broadcast',
+ title: 'Broadcast Message',
+ },
+ read: false,
+ seen: false,
+ version: null,
+ })
+ logger.info(`${logPrefix} Inserted notification record ${n.id} for current user ${userId}`)
+ // TODO need to be in transaction so that rollback will be possible
+ const result = await insertUserRefs(userId, bulkMessage.id, n.id)
+ return result
+ } catch (e) {
+ logger.error(`${logPrefix} insert broadcast notification error: ${e} `)
+ throw new Error(`createNotificationForUser() : ${e}`)
+ }
+}
+
+
+// Exports
+module.exports = {
+ checkBulkMessageForUser,
+};
\ No newline at end of file
diff --git a/src/hooks/index.js b/src/hooks/index.js
new file mode 100644
index 0000000..5dcc1a6
--- /dev/null
+++ b/src/hooks/index.js
@@ -0,0 +1,17 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * Hook implementation
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+const hookBulkMessage = require("./hookBulkMessage")
+
+
+module.exports = {
+ hookBulkMessage,
+};
diff --git a/src/models/BulkMessageUserRefs.js b/src/models/BulkMessageUserRefs.js
new file mode 100644
index 0000000..7f3baf3
--- /dev/null
+++ b/src/models/BulkMessageUserRefs.js
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * The Bulk Message User Reference schema
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+
+module.exports = (sequelize, DataTypes) => sequelize.define('bulk_message_user_refs', {
+ id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
+ bulk_message_id: {
+ type: DataTypes.BIGINT,
+ allowNull: false,
+ references: {
+ model: 'bulk_messages',
+ key: 'id'
+ }
+ },
+ notification_id: {
+ type: DataTypes.BIGINT,
+ allowNull: true,
+ references: {
+ model: 'Notifications',
+ key: 'id'
+ }
+ },
+ user_id: { type: DataTypes.BIGINT, allowNull: false }
+}, {});
+
+ // sequelize will generate and manage createdAt, updatedAt fields
diff --git a/src/models/BulkMessages.js b/src/models/BulkMessages.js
new file mode 100644
index 0000000..2bdbc6b
--- /dev/null
+++ b/src/models/BulkMessages.js
@@ -0,0 +1,20 @@
+/**
+ * Copyright (C) 2020 TopCoder Inc., All Rights Reserved.
+ */
+
+/**
+ * The Bulk Message Store schema
+ *
+ * @author TCSCODER
+ * @version 1.0
+ */
+
+
+module.exports = (sequelize, DataTypes) => sequelize.define('bulk_messages', {
+ id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true },
+ type: { type: DataTypes.STRING, allowNull: false },
+ message: { type: DataTypes.TEXT, allowNull: false },
+ recipients: { type: DataTypes.JSONB, allowNull: false },
+}, {});
+
+ // sequelize will generate and manage createdAt, updatedAt fields
diff --git a/src/models/index.js b/src/models/index.js
index e6ef09e..d18f68a 100644
--- a/src/models/index.js
+++ b/src/models/index.js
@@ -16,11 +16,17 @@ const Notification = require('./Notification')(sequelize, DataTypes);
const NotificationSetting = require('./NotificationSetting')(sequelize, DataTypes);
const ServiceSettings = require('./ServiceSettings')(sequelize, DataTypes);
const ScheduledEvents = require('./ScheduledEvents')(sequelize, DataTypes);
+const BulkMessages = require('./BulkMessages')(sequelize, DataTypes);
+const BulkMessageUserRefs = require('./BulkMessageUserRefs')(sequelize, DataTypes);
+
module.exports = {
Notification,
NotificationSetting,
ServiceSettings,
ScheduledEvents,
+ BulkMessages,
+ BulkMessageUserRefs,
+ sequelize,
init: () => sequelize.sync(),
};
diff --git a/src/processors/broadcast/bulkNotificationHandler.js b/src/processors/broadcast/bulkNotificationHandler.js
new file mode 100644
index 0000000..93d1e04
--- /dev/null
+++ b/src/processors/broadcast/bulkNotificationHandler.js
@@ -0,0 +1,52 @@
+/**
+ * Bulk notification handler.
+ */
+const joi = require('joi')
+const co = require('co')
+const models = require('../../models')
+const logger = require('../../common/logger')
+
+/**
+ * Handle Kafka JSON message of broadcast.
+ *
+ * @param {Object} message the Kafka JSON message
+ * @param {Object} ruleSets
+ *
+ * @return {Promise} promise resolved to notifications
+ */
+const handle = (message, ruleSets) => co(function* () {
+ try {
+ const bm = yield models.BulkMessages.create({
+ type: message.topic,
+ message: message.payload.message,
+ recipients: message.payload.recipients,
+ })
+ logger.info("Broadcast message recieved and inserted in db with id:", bm.id)
+ } catch (e) {
+ logger.error(`Broadcast processor failed in db operation. Error: ${e}`)
+ }
+ return [] // this point of time, send empty notification object
+});
+
+/**
+ * validate kafka payload
+ */
+handle.schema = {
+ message: joi.object().keys({
+ topic: joi.string().required(),
+ originator: joi.string().required(),
+ timestamp: joi.date().required(),
+ 'mime-type': joi.string().required(),
+ payload: joi.object().keys({
+ message: joi.string().required(),
+ recipients: joi.object().required(),
+ }),
+ }).required(),
+ ruleSets: joi.object(),
+};
+
+module.exports = {
+ handle,
+};
+
+logger.buildService(module.exports);
\ No newline at end of file
diff --git a/src/processors/index.js b/src/processors/index.js
index 70d8c6a..a0243be 100644
--- a/src/processors/index.js
+++ b/src/processors/index.js
@@ -8,6 +8,7 @@ const ChallengePhaseWarningHandler = require('./challenge/ChallengePhaseWarningH
const ChallengeHandler = require('./challenge/ChallengeHandler');
const AutoPilotHandler = require('./challenge/AutoPilotHandler');
const SubmissionHandler = require('./challenge/SubmissionHandler');
+const BulkNotificationHandler = require('./broadcast/bulkNotificationHandler');
// Exports
module.exports = {
@@ -16,4 +17,5 @@ module.exports = {
handleChallenge: ChallengeHandler.handle,
handleAutoPilot: AutoPilotHandler.handle,
handleSubmission: SubmissionHandler.handle,
+ handleBulkNotification: BulkNotificationHandler.handle,
};
diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js
index cb92781..6fa81f8 100644
--- a/src/services/NotificationService.js
+++ b/src/services/NotificationService.js
@@ -9,6 +9,8 @@ const Joi = require('joi');
const errors = require('../common/errors');
const logger = require('../common/logger');
const models = require('../models');
+const config = require('config');
+const hooks = require('../hooks');
const DEFAULT_LIMIT = 10;
@@ -202,11 +204,19 @@ function* listNotifications(query, userId) {
break;
}
+ if (config.ENABLE_HOOK_BULK_NOTIFICATION) {
+ try {
+ yield hooks.hookBulkMessage.checkBulkMessageForUser(userId)
+ } catch (e) {
+ logger.error(`Error in calling bulk notification hook: ${e}`)
+ }
+ }
+
if (_.keys(notificationSettings).length > 0) {
// only filter out notifications types which were explicitly set to 'no' - so we return notification by default
const notifications = _.keys(notificationSettings).filter((notificationType) =>
- !notificationSettings[notificationType] &&
- !notificationSettings[notificationType].web &&
+ notificationSettings[notificationType] &&
+ notificationSettings[notificationType].web &&
notificationSettings[notificationType].web.enabled === 'no'
);
filter.where.type = Object.assign(filter.where.type || {}, { $notIn: notifications });
diff --git a/test/checkHooks.js b/test/checkHooks.js
new file mode 100644
index 0000000..6371283
--- /dev/null
+++ b/test/checkHooks.js
@@ -0,0 +1,3 @@
+const bulkhook = require("../src/hooks/hookBulkMessage")
+
+bulkhook.checkBulkMessageForUser(123)
\ No newline at end of file