diff --git a/.circleci/config.yml b/.circleci/config.yml index 2f6e7fe..8b2a88c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -57,7 +57,12 @@ jobs: if [ -e dev-tc-notifications-deployvar.json ]; then sudo rm -vf dev-tc-notifications-deployvar.json; fi ./buildenv.sh -e DEV -b dev-tc-notifications-consumers-deployvar source buildenvvar - ./master_deploy.sh -d ECS -e DEV -t latest -s dev-global-appvar,dev-tc-notifications-appvar -i tc-notifications + ./master_deploy.sh -d ECS -e DEV -t latest -s dev-global-appvar,dev-tc-notifications-appvar -i tc-notifications + echo "Running Masterscript - deploy tc-notifications-general-processor service" + if [ -e dev-tc-notifications-consumers-deployvar.json ]; then sudo rm -vf dev-tc-notifications-consumers-deployvar.json; fi + ./buildenv.sh -e DEV -b dev-tc-notifications-general-processor-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e DEV -t latest -s dev-global-appvar,dev-tc-notifications-appvar -i tc-notifications "build-prod": <<: *defaults @@ -81,7 +86,12 @@ jobs: if [ -e prod-tc-notifications-deployvar.json ]; then sudo rm -vf prod-tc-notifications-deployvar.json; fi ./buildenv.sh -e PROD -b prod-tc-notifications-consumers-deployvar source buildenvvar - ./master_deploy.sh -d ECS -e PROD -t latest -s prod-global-appvar,prod-tc-notifications-appvar -i tc-notifications + ./master_deploy.sh -d ECS -e PROD -t latest -s prod-global-appvar,prod-tc-notifications-appvar -i tc-notifications + echo "Running Masterscript - prod deploy tc-notifications-general-processor service" + if [ -e prod-tc-notifications-consumers-deployvar.json ]; then sudo rm -vf prod-tc-notifications-consumers-deployvar.json; fi + ./buildenv.sh -e PROD -b prod-tc-notifications-general-processor-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e PROD -t latest -s prod-global-appvar,prod-tc-notifications-appvar -i tc-notifications workflows: version: 2 @@ -92,7 +102,7 @@ workflows: context : org-global filters: branches: - only: ['dev'] + only: [dev, 'feature/general-purpose-notifications-usage'] - "build-prod": context : org-global filters: diff --git a/Consumer-Verification.md b/Consumer-Verification.md new file mode 100644 index 0000000..232fa6e --- /dev/null +++ b/Consumer-Verification.md @@ -0,0 +1,221 @@ +# TOPCODER NOTIFICATIONS - CONSUMER VERIFICATION + +## Local Kafka setup + +- `http://kafka.apache.org/quickstart` contains details to setup and manage Kafka server, + below provides details to setup Kafka server in Mac, Windows will use bat commands in bin/windows instead +- download kafka at `https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz` +- extract out the downloaded tgz file +- go to the extracted directory kafka_2.11-0.11.0.1 +- start ZooKeeper server: + `bin/zookeeper-server-start.sh config/zookeeper.properties` +- use another terminal, go to same directory, start the Kafka server: + `bin/kafka-server-start.sh config/server.properties` +- note that the zookeeper server is at localhost:2181, and Kafka server is at localhost:9092 +- use another terminal, go to same directory, create topics: +``` +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic notifications.community.challenge.created +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic notifications.community.challenge.phasewarning +``` + +- verify that the topic is created: +``` +bin/kafka-topics.sh --list --zookeeper localhost:2181 +``` + it should list out the created topics + +- run producer and then write some message into the console to send to the `notifications.community.challenge.created` topic: +``` +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic notifications.community.challenge.created +``` +- In the console, write some message, one message per line: +E.g. +``` +{ "topic": "notifications.community.challenge.created", "originator": "tc-direct", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "userId": 8547899, "initiatorUserId": 123, "skills": ["dotnet", "xcode"] } } +``` + +- optionally, use another terminal, go to same directory, start a consumer to view the messages: +``` +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notifications.community.challenge.created --from-beginning +``` + + +## Local deployment + +- start local Kafka, start local PostgreSQL db, create an empty database `notification` +- set some config params via env as below, use `set` instead of `export` for windows OS, + instead, you may set them via `config/default.js`, modify the DATABASE_URL according to your setup db: +``` +export LOG_LEVEL=debug +export DATABASE_URL=postgres://postgres:123456@localhost:5432/notification +export KAFKA_URL=localhost:9092 +export KAFKA_GROUP_ID=tc-notifications +export ENV=test +export DEV_MODE_EMAIL=testing@topcoder.com +export DEFAULT_REPLY_EMAIL=no-reply@topcoder.com +``` + +- to override TC API base URLs to use mock APIs, it is not needed if mock APIs are not used: +``` +export TC_API_V3_BASE_URL=http://localhost:4000/v3 +export TC_API_V4_BASE_URL=http://localhost:4000/v4 +export TC_API_V5_BASE_URL=http://localhost:4000/v5 +``` + +- set M2M config params: +``` +export AUTH0_CLIENT_ID=dummy +export AUTH0_CLIENT_SECRET=dummy +export AUTH0_URL=dummy +export AUTH0_AUDIENCE=dummy +``` + +- install dependencies `npm i` +- run code lint check `npm run lint` +- fix some lint errors `npm run lint:fix` +- create db tables if not present `node test/init-db`, this is needed only for local test, in production the tables are already present +- start notification consumer `npm run startConsumer` + + +## Verification + +- Run Kafka console producer to write message to topic `notifications.community.challenge.created`: + +``` +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic notifications.community.challenge.created +``` + +- Write message of challenge created: + +``` +{ "topic": "notifications.community.challenge.created", "originator": "tc-direct", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "userId": 8547899, "initiatorUserId": 123, "skills": ["dotnet", "xcode"] } } +``` + +- You will see logging in the app console: + +``` +info: Run handler handleChallengeCreated +... +verbose: Searched users: ... +... +info: Successfully sent notifications.action.email.connect.project.notifications.generic event with body ... to bus api +... +error: Failed to send email to user id: 5, handle: handle5 +... +info: Saved 8 notifications for users: 1, 2, 3, 4, 5, 6, 7, 8 +info: Handler handleChallengeCreated was run successfully +``` + + +- Run Kafka console producer to write message to topic `notifications.community.challenge.phasewarning`: + +``` +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic notifications.community.challenge.phasewarning +``` + +- Write message of challenge phase warning: + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +- You will see logging in the app console: + +``` +info: Run handler handleChallengePhaseWarning +... +verbose: Searched users: ... +... +info: Successfully sent notifications.action.email.connect.project.notifications.generic event with body ... to bus api +... +error: Failed to send email to user id: 5, handle: handle5 +... +info: Saved 8 notifications for users: 1, 2, 3, 4, 5, 6, 7, 8 +info: Handler handleChallengePhaseWarning was run successfully +``` + + +- Write message of challenge retrieved with error: + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 1111, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +- You will see logging in the app console: + +``` +info: Run handler handleChallengePhaseWarning +... +error: Handler handleChallengePhaseWarning failed +... +error: { Error: Internal Server Error ... +``` + + +- Write message of challenge which is not found: + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 2222, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +- You will see logging in the app console: + +``` +info: Run handler handleChallengePhaseWarning +... +error: Handler handleChallengePhaseWarning failed +... +error: { Error: Not Found ... +``` + + +- Write message of challenge of id 3333: + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeId": 3333, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +- You will see logging in the app console: + +``` +info: Run handler handleChallengePhaseWarning +... +error: Handler handleChallengePhaseWarning failed +... +error: { Error: Internal Server Error ... +... { message: 'there is some error' } ... +... Error: cannot GET /v3/members/_search?query=handle:%22handle1%22%20OR%20handle:%22handle2%22%20OR%20handle:%22handle3%22&offset=0&limit=5&fields=userId,email,handle,firstName,lastName,photoURL,status (500) ... +... +``` + + +- You may write some invalid messages like below: + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "invalid", "mime-type": "application/json", "payload": { "challengeId": 30054674, "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +``` +{ "topic": "notifications.community.challenge.phasewarning", "originator": "tc-autopilot", "timestamp": "2018-02-16T00:00:00", "mime-type": "application/json", "payload": { "challengeTitle": "test", "challengeUrl": "http://www.topcoder.com/123", "phase": "Submission", "remainingTime": 12345, "userId": 8547899, "initiatorUserId": 123 } } +``` + +``` +{ [ xyz +``` + +- You will see error logging in the app console. + +- Use some PostgreSQL client to connect to the database, e.g. you may use the PostgreSQL's built-in client psql to connect to the database: `psql -U postgres` + +- connect to database: `\c notification` + +- select notifications: `select * from "Notifications";` + +- you will see notification records: + +``` + 1 | 23154497 | notifications.community.challenge.created | {"skills": ["dotnet", "xcode"], "userId": 8547899, "challengeId": 30054522, "challengeUrl": "http://www.topcoder.com/123", "challengeTitle": "test", "initiatorUserId": 123} | f | f | | 2019-04-01 19:49:08.232+08 | 2019-04-01 19:49:08.232+08 + 2 | 294446 | notifications.community.challenge.created | {"skills": ["dotnet", "xcode"], "userId": 8547899, "challengeId": 30054522, "challengeUrl": "http://www.topcoder.com/123", "challengeTitle": "test", "initiatorUserId": 123} | f | f | | 2019-04-01 19:49:08.232+08 | 2019-04-01 19:49:08.232+08 + ... +``` + diff --git a/README.md b/README.md index 0ebacbc..808a611 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ tc-notifications (as a standard nodejs app) provides generic framework around no 5. Either add deployment for this new notification consumer/processor in existing deployment script (if you want to host the processor as separate service in the same ECS cluster) or write a new script if you want to keep the deployment separate. ## Dependencies -- nodejs https://nodejs.org/en/ (v6+) +- nodejs https://nodejs.org/en/ (v6+, if newer version of node is used, e.g. v10, then it needs to install extra lib `npm i natives@1.1.6` to support the gulp build) - Heroku Toolbelt https://toolbelt.heroku.com - git - PostgreSQL 9.5 @@ -50,6 +50,8 @@ The following parameters can be set in config files or in env variables: if not provided, then SSL connection is not used, direct insecure connection is used; if provided, it can be either path to private key file or private key content - **Topcoder API** + - `TC_API_V3_BASE_URL`: the TopCoder API V3 base URL + - `TC_API_V4_BASE_URL`: the TopCoder API V4 base URL - `TC_API_V5_BASE_URL`: the TopCoder API V5 base URL - **Notifications API** - `API_CONTEXT_PATH`: path to serve API on @@ -59,6 +61,16 @@ The following parameters can be set in config files or in env variables: - `TOKEN_CACHE_TIME`: time period of the cached token - `AUTH0_CLIENT_ID`: auth0 client id - `AUTH0_CLIENT_SECRET`: auth0 client secret + - `AUTH0_PROXY_SERVER_URL`: auth0 proxy server URL +- **Consumer handlers** + - `KAFKA_CONSUMER_HANDLERS`: mapping from consumer topic to handlers +- **Email notification** + - `ENV`: used to construct email category + - `ENABLE_EMAILS`: whether to enable email notifications + - `ENABLE_DEV_MODE`: whether to enable dev mode + - `DEV_MODE_EMAIL`: recipient email used in dev mode + - `DEFAULT_REPLY_EMAIL`: default reply email + ### Connect notification server Configuration for the connect notification server is at `connect/config.js`. diff --git a/config/default.js b/config/default.js index 1a1b895..dbbc549 100644 --- a/config/default.js +++ b/config/default.js @@ -28,7 +28,9 @@ module.exports = { KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, - TC_API_V5_BASE_URL: process.env.TC_API_V5_BASE_URL || 'https://api.topcoder-dev.com/v5', + TC_API_V3_BASE_URL: process.env.TC_API_V3_BASE_URL || '', + TC_API_V4_BASE_URL: process.env.TC_API_V4_BASE_URL || '', + TC_API_V5_BASE_URL: process.env.TC_API_V5_BASE_URL || '', API_CONTEXT_PATH: process.env.API_CONTEXT_PATH || '/v5/notifications', // Configuration for generating machine to machine auth0 token. @@ -40,5 +42,19 @@ module.exports = { TOKEN_CACHE_TIME: process.env.TOKEN_CACHE_TIME || 86400000, AUTH0_CLIENT_ID: process.env.AUTH0_CLIENT_ID, AUTH0_CLIENT_SECRET: process.env.AUTH0_CLIENT_SECRET, - AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL || '', + AUTH0_PROXY_SERVER_URL: process.env.AUTH0_PROXY_SERVER_URL, + + KAFKA_CONSUMER_RULESETS: { + // key is Kafka topic name, value is array of ruleset which have key as handler function name defined in src/processors/index.js + 'challenge.notification.events' : [{handleChallenge : {type:'UPDATE_DRAFT_CHALLENGE', roles: ["Submitter" /** Competitor */, "Copilot", "Reviewer"]}}], + //'notifications.community.challenge.created': ['handleChallengeCreated'], + //'notifications.community.challenge.phasewarning': ['handleChallengePhaseWarning'], + }, + + // email notification service related variables + ENV: process.env.ENV, + ENABLE_EMAILS: process.env.ENABLE_EMAILS ? Boolean(process.env.ENABLE_EMAILS) : false, + ENABLE_DEV_MODE: process.env.ENABLE_DEV_MODE ? Boolean(process.env.ENABLE_DEV_MODE) : true, + DEV_MODE_EMAIL: process.env.DEV_MODE_EMAIL, + DEFAULT_REPLY_EMAIL: process.env.DEFAULT_REPLY_EMAIL, }; diff --git a/constants.js b/constants.js new file mode 100644 index 0000000..317bf3a --- /dev/null +++ b/constants.js @@ -0,0 +1,13 @@ +module.exports = { + // set to a small value in order to test pagination functionalities, set to larger value in production + SEARCH_USERS_PAGE_SIZE: 5, + + SETTINGS_EMAIL_SERVICE_ID: 'email', + ACTIVE_USER_STATUSES: ['ACTIVE'], + + BUS_API_EVENT: { + EMAIL: { + GENERAL: 'notifications.action.email.connect.project.notifications.generic', + }, + }, +}; diff --git a/consumer.js b/consumer.js new file mode 100644 index 0000000..04eb79a --- /dev/null +++ b/consumer.js @@ -0,0 +1,143 @@ +/** + * Kafka consumer + */ +'use strict'; + +const config = require('config'); +const _ = require('lodash'); +const Kafka = require('no-kafka'); +const co = require('co'); +global.Promise = require('bluebird'); +const healthcheck = require('topcoder-healthcheck-dropin') + +const logger = require('./src/common/logger'); +const models = require('./src/models'); +const processors = require('./src/processors'); + + +/** + * Start Kafka consumer + */ +function startKafkaConsumer() { + const options = { groupId: config.KAFKA_GROUP_ID, connectionString: config.KAFKA_URL }; + if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) { + options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }; + } + const consumer = new Kafka.GroupConsumer(options); + + // data handler + const messageHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => { + const message = m.message.value.toString('utf8'); + logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${ + m.offset}; Message: ${message}.`); + + let messageJSON; + try { + messageJSON = JSON.parse(message); + } catch (e) { + logger.error('Invalid message JSON.'); + logger.logFullError(e); + // commit the message and ignore it + consumer.commitOffset({ topic, partition, offset: m.offset }); + return; + } + + if (messageJSON.topic !== topic) { + logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}.`); + // commit the message and ignore it + consumer.commitOffset({ topic, partition, offset: m.offset }); + return; + } + + // get rule sets for the topic + const ruleSets = config.KAFKA_CONSUMER_RULESETS[topic]; + + // TODO for NULL handler + if (!ruleSets || ruleSets.length === 0) { + logger.error(`No handler configured for Kafka topic ${topic}.`); + // commit the message and ignore it + consumer.commitOffset({ topic, partition, offset: m.offset }); + return; + } + + return co(function* () { + // run each handler + for (let i = 0; i < ruleSets.length; i += 1) { + const rule = ruleSets[i] + const handlerFuncArr = _.keys(rule) + const handlerFuncName = _.get(handlerFuncArr, "0") + + try { + const handler = processors[handlerFuncName] + const handlerRuleSets = rule[handlerFuncName] + if (!handler) { + logger.error(`Handler ${handlerFuncName} is not defined`); + continue; + } + logger.info(`Run handler ${handlerFuncName}`); + // run handler to get notifications + const notifications = yield handler(messageJSON, handlerRuleSets); + if (notifications && notifications.length > 0) { + // save notifications in bulk to improve performance + logger.info(`Going to insert ${notifications.length} notifications in database.`) + yield models.Notification.bulkCreate(_.map(notifications, (n) => ({ + userId: n.userId, + type: n.type || topic, + contents: n.contents || messageJSON.payload || {}, + read: false, + seen: false, + version: n.version || null, + }))) + // logging + logger.info(`Saved ${notifications.length} notifications`) + /* logger.info(` for users: ${ + _.map(notifications, (n) => n.userId).join(', ') + }`); */ + } + logger.info(`Handler ${handlerFuncName} executed successfully`); + } catch (e) { + // log and ignore error, so that it won't block rest handlers + logger.error(`Handler ${handlerFuncName} failed`); + logger.logFullError(e); + } + } + }) + // commit offset + .then(() => consumer.commitOffset({ topic, partition, offset: m.offset })) + .catch((err) => { + logger.error('Kafka handler failed'); + logger.logFullError(err); + }); + }); + + const check = function () { + if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { + return false + } + let connected = true + consumer.client.initialBrokers.forEach(conn => { + logger.debug(`url ${conn.server()} - connected=${conn.connected}`) + connected = conn.connected & connected + }) + return connected + } + + // Start kafka consumer + logger.info('Starting kafka consumer'); + consumer + .init([{ + // subscribe topics + subscriptions: _.keys(config.KAFKA_CONSUMER_RULESETS), + handler: messageHandler, + }]) + .then(() => { + logger.info('Kafka consumer initialized successfully'); + healthcheck.init([check]) + }) + .catch((err) => { + logger.error('Kafka consumer failed'); + logger.logFullError(err); + }); +} + +startKafkaConsumer(); diff --git a/deploy.sh b/deploy.sh index 05cb923..8645f2a 100755 --- a/deploy.sh +++ b/deploy.sh @@ -31,11 +31,13 @@ AWS_REPOSITORY=$(eval "echo \$${ENV}_AWS_REPOSITORY") AWS_ECS_CLUSTER=$(eval "echo \$${ENV}_AWS_ECS_CLUSTER") AWS_ECS_SERVICE_API=$(eval "echo \$${ENV}_AWS_ECS_SERVICE") AWS_ECS_SERVICE_CONSUMERS=$(eval "echo \$${ENV}_AWS_ECS_SERVICE_CONSUMERS") +AWS_ECS_SERVICE_GEN_CONSUMERS=$(eval "echo \$${ENV}_AWS_ECS_SERVICE_GEN_CONSUMERS") KAFKA_CLIENT_CERT=$(eval "echo \$${ENV}_KAFKA_CLIENT_CERT") KAFKA_CLIENT_CERT_KEY=$(eval "echo \$${ENV}_KAFKA_CLIENT_CERT_KEY") KAFKA_GROUP_ID=$(eval "echo \$${ENV}_KAFKA_GROUP_ID") KAFKA_URL=$(eval "echo \$${ENV}_KAFKA_URL") +KAFKA_CONSUMER_TOPICS=$(eval "echo \$${ENV}_KAFKA_CONSUMER_TOPICS") AUTHSECRET=$(eval "echo \$${ENV}_AUTHSECRET") VALID_ISSUERS=$(eval "echo \$${ENV}_VALID_ISSUERS") TC_API_BASE_URL=$(eval "echo \$${ENV}_TC_API_BASE_URL") @@ -248,6 +250,10 @@ make_task_def(){ { "name": "AUTH0_PROXY_SERVER_URL", "value": "%s" + }, + { + "name": "KAFKA_CONSUMER_TOPICS", + "value": "%s" } ], "portMappings": [ @@ -268,7 +274,7 @@ make_task_def(){ } ]' - task_def=$(printf "$task_template" $1 $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $2 $3 $4 $ENV "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $KAFKA_GROUP_ID $KAFKA_URL $DATABASE_URL $AUTHSECRET $TC_API_BASE_URL $TC_API_V3_BASE_URL $TC_API_V4_BASE_URL $TC_API_V5_BASE_URL $MESSAGE_API_BASE_URL $CONNECT_URL $ENABLE_EMAILS $MENTION_EMAIL $REPLY_EMAIL_PREFIX $REPLY_EMAIL_DOMAIN $REPLY_EMAIL_FROM $DEFAULT_REPLY_EMAIL $ENABLE_DEV_MODE $DEV_MODE_EMAIL $LOG_LEVEL $VALID_ISSUERS $PORT "$API_CONTEXT_PATH" "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME "$AUTH0_PROXY_SERVER_URL" $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) + task_def=$(printf "$task_template" $1 $AWS_ACCOUNT_ID $AWS_REGION $AWS_REPOSITORY $TAG $2 $3 $4 $ENV "$KAFKA_CLIENT_CERT" "$KAFKA_CLIENT_CERT_KEY" $KAFKA_GROUP_ID $KAFKA_URL $DATABASE_URL $AUTHSECRET $TC_API_BASE_URL $TC_API_V3_BASE_URL $TC_API_V4_BASE_URL $TC_API_V5_BASE_URL $MESSAGE_API_BASE_URL $CONNECT_URL $ENABLE_EMAILS $MENTION_EMAIL $REPLY_EMAIL_PREFIX $REPLY_EMAIL_DOMAIN $REPLY_EMAIL_FROM $DEFAULT_REPLY_EMAIL $ENABLE_DEV_MODE $DEV_MODE_EMAIL $LOG_LEVEL $VALID_ISSUERS $PORT "$API_CONTEXT_PATH" "$AUTH0_URL" "$AUTH0_AUDIENCE" $AUTH0_CLIENT_ID "$AUTH0_CLIENT_SECRET" $TOKEN_CACHE_TIME "$AUTH0_PROXY_SERVER_URL" $KAFKA_CONSUMER_TOPICS $AWS_ECS_CLUSTER $AWS_REGION $AWS_ECS_CLUSTER $ENV) } register_definition() { @@ -307,6 +313,10 @@ deploy_cluster $AWS_ECS_SERVICE_API "npm" "run" "startAPI" deploy_cluster $AWS_ECS_SERVICE_CONSUMERS "npm" "run" "start" +deploy_cluster $AWS_ECS_SERVICE_GEN_CONSUMERS "npm" "run" "startConsumer" + check_service_status $AWS_ECS_SERVICE_API check_service_status $AWS_ECS_SERVICE_CONSUMERS + +check_service_status $AWS_ECS_SERVICE_GEN_CONSUMERS diff --git a/package-lock.json b/package-lock.json index 31df2b6..17a40e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1124,6 +1124,7 @@ "requires": { "anymatch": "1.3.2", "async-each": "1.0.1", + "fsevents": "1.2.7", "glob-parent": "2.0.0", "inherits": "2.0.3", "is-binary-path": "1.0.1", @@ -3192,6 +3193,535 @@ "integrity": "sha1-FQStJSMVjKpA20onh8sBQRmU6k8=", "dev": true }, + "fsevents": { + "version": "1.2.7", + "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-1.2.7.tgz", + "integrity": "sha512-Pxm6sI2MeBD7RdD12RYsqaP0nMiwx8eZBXCa6z2L+mRHm2DYrOYwihmhjpkdjUHwQhslWQjRpEgNq4XvBmaAuw==", + "dev": true, + "optional": true, + "requires": { + "nan": "2.10.0", + "node-pre-gyp": "0.10.3" + }, + "dependencies": { + "abbrev": { + "version": "1.1.1", + "bundled": true, + "dev": true, + "optional": true + }, + "ansi-regex": { + "version": "2.1.1", + "bundled": true, + "dev": true + }, + "aproba": { + "version": "1.2.0", + "bundled": true, + "dev": true, + "optional": true + }, + "are-we-there-yet": { + "version": "1.1.5", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "delegates": "1.0.0", + "readable-stream": "2.3.6" + } + }, + "balanced-match": { + "version": "1.0.0", + "bundled": true, + "dev": true + }, + "brace-expansion": { + "version": "1.1.11", + "bundled": true, + "dev": true, + "requires": { + "balanced-match": "1.0.0", + "concat-map": "0.0.1" + } + }, + "chownr": { + "version": "1.1.1", + "bundled": true, + "dev": true, + "optional": true + }, + "code-point-at": { + "version": "1.1.0", + "bundled": true, + "dev": true + }, + "concat-map": { + "version": "0.0.1", + "bundled": true, + "dev": true + }, + "console-control-strings": { + "version": "1.1.0", + "bundled": true, + "dev": true + }, + "core-util-is": { + "version": "1.0.2", + "bundled": true, + "dev": true, + "optional": true + }, + "debug": { + "version": "2.6.9", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "ms": "2.0.0" + } + }, + "deep-extend": { + "version": "0.6.0", + "bundled": true, + "dev": true, + "optional": true + }, + "delegates": { + "version": "1.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "detect-libc": { + "version": "1.0.3", + "bundled": true, + "dev": true, + "optional": true + }, + "fs-minipass": { + "version": "1.2.5", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "minipass": "2.3.5" + } + }, + "fs.realpath": { + "version": "1.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "gauge": { + "version": "2.7.4", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "aproba": "1.2.0", + "console-control-strings": "1.1.0", + "has-unicode": "2.0.1", + "object-assign": "4.1.1", + "signal-exit": "3.0.2", + "string-width": "1.0.2", + "strip-ansi": "3.0.1", + "wide-align": "1.1.3" + } + }, + "glob": { + "version": "7.1.3", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "fs.realpath": "1.0.0", + "inflight": "1.0.6", + "inherits": "2.0.3", + "minimatch": "3.0.4", + "once": "1.4.0", + "path-is-absolute": "1.0.1" + } + }, + "has-unicode": { + "version": "2.0.1", + "bundled": true, + "dev": true, + "optional": true + }, + "iconv-lite": { + "version": "0.4.24", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "safer-buffer": "2.1.2" + } + }, + "ignore-walk": { + "version": "3.0.1", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "minimatch": "3.0.4" + } + }, + "inflight": { + "version": "1.0.6", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "once": "1.4.0", + "wrappy": "1.0.2" + } + }, + "inherits": { + "version": "2.0.3", + "bundled": true, + "dev": true + }, + "ini": { + "version": "1.3.5", + "bundled": true, + "dev": true, + "optional": true + }, + "is-fullwidth-code-point": { + "version": "1.0.0", + "bundled": true, + "dev": true, + "requires": { + "number-is-nan": "1.0.1" + } + }, + "isarray": { + "version": "1.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "minimatch": { + "version": "3.0.4", + "bundled": true, + "dev": true, + "requires": { + "brace-expansion": "1.1.11" + } + }, + "minimist": { + "version": "0.0.8", + "bundled": true, + "dev": true + }, + "minipass": { + "version": "2.3.5", + "bundled": true, + "dev": true, + "requires": { + "safe-buffer": "5.1.2", + "yallist": "3.0.3" + } + }, + "minizlib": { + "version": "1.2.1", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "minipass": "2.3.5" + } + }, + "mkdirp": { + "version": "0.5.1", + "bundled": true, + "dev": true, + "requires": { + "minimist": "0.0.8" + } + }, + "ms": { + "version": "2.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "needle": { + "version": "2.2.4", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "debug": "2.6.9", + "iconv-lite": "0.4.24", + "sax": "1.2.4" + } + }, + "node-pre-gyp": { + "version": "0.10.3", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "detect-libc": "1.0.3", + "mkdirp": "0.5.1", + "needle": "2.2.4", + "nopt": "4.0.1", + "npm-packlist": "1.2.0", + "npmlog": "4.1.2", + "rc": "1.2.8", + "rimraf": "2.6.3", + "semver": "5.6.0", + "tar": "4.4.8" + } + }, + "nopt": { + "version": "4.0.1", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "abbrev": "1.1.1", + "osenv": "0.1.5" + } + }, + "npm-bundled": { + "version": "1.0.5", + "bundled": true, + "dev": true, + "optional": true + }, + "npm-packlist": { + "version": "1.2.0", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "ignore-walk": "3.0.1", + "npm-bundled": "1.0.5" + } + }, + "npmlog": { + "version": "4.1.2", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "are-we-there-yet": "1.1.5", + "console-control-strings": "1.1.0", + "gauge": "2.7.4", + "set-blocking": "2.0.0" + } + }, + "number-is-nan": { + "version": "1.0.1", + "bundled": true, + "dev": true + }, + "object-assign": { + "version": "4.1.1", + "bundled": true, + "dev": true, + "optional": true + }, + "once": { + "version": "1.4.0", + "bundled": true, + "dev": true, + "requires": { + "wrappy": "1.0.2" + } + }, + "os-homedir": { + "version": "1.0.2", + "bundled": true, + "dev": true, + "optional": true + }, + "os-tmpdir": { + "version": "1.0.2", + "bundled": true, + "dev": true, + "optional": true + }, + "osenv": { + "version": "0.1.5", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "os-homedir": "1.0.2", + "os-tmpdir": "1.0.2" + } + }, + "path-is-absolute": { + "version": "1.0.1", + "bundled": true, + "dev": true, + "optional": true + }, + "process-nextick-args": { + "version": "2.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "rc": { + "version": "1.2.8", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "deep-extend": "0.6.0", + "ini": "1.3.5", + "minimist": "1.2.0", + "strip-json-comments": "2.0.1" + }, + "dependencies": { + "minimist": { + "version": "1.2.0", + "bundled": true, + "dev": true, + "optional": true + } + } + }, + "readable-stream": { + "version": "2.3.6", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "core-util-is": "1.0.2", + "inherits": "2.0.3", + "isarray": "1.0.0", + "process-nextick-args": "2.0.0", + "safe-buffer": "5.1.2", + "string_decoder": "1.1.1", + "util-deprecate": "1.0.2" + } + }, + "rimraf": { + "version": "2.6.3", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "glob": "7.1.3" + } + }, + "safe-buffer": { + "version": "5.1.2", + "bundled": true, + "dev": true + }, + "safer-buffer": { + "version": "2.1.2", + "bundled": true, + "dev": true, + "optional": true + }, + "sax": { + "version": "1.2.4", + "bundled": true, + "dev": true, + "optional": true + }, + "semver": { + "version": "5.6.0", + "bundled": true, + "dev": true, + "optional": true + }, + "set-blocking": { + "version": "2.0.0", + "bundled": true, + "dev": true, + "optional": true + }, + "signal-exit": { + "version": "3.0.2", + "bundled": true, + "dev": true, + "optional": true + }, + "string-width": { + "version": "1.0.2", + "bundled": true, + "dev": true, + "requires": { + "code-point-at": "1.1.0", + "is-fullwidth-code-point": "1.0.0", + "strip-ansi": "3.0.1" + } + }, + "string_decoder": { + "version": "1.1.1", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "safe-buffer": "5.1.2" + } + }, + "strip-ansi": { + "version": "3.0.1", + "bundled": true, + "dev": true, + "requires": { + "ansi-regex": "2.1.1" + } + }, + "strip-json-comments": { + "version": "2.0.1", + "bundled": true, + "dev": true, + "optional": true + }, + "tar": { + "version": "4.4.8", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "chownr": "1.1.1", + "fs-minipass": "1.2.5", + "minipass": "2.3.5", + "minizlib": "1.2.1", + "mkdirp": "0.5.1", + "safe-buffer": "5.1.2", + "yallist": "3.0.3" + } + }, + "util-deprecate": { + "version": "1.0.2", + "bundled": true, + "dev": true, + "optional": true + }, + "wide-align": { + "version": "1.1.3", + "bundled": true, + "dev": true, + "optional": true, + "requires": { + "string-width": "1.0.2" + } + }, + "wrappy": { + "version": "1.0.2", + "bundled": true, + "dev": true + }, + "yallist": { + "version": "3.0.3", + "bundled": true, + "dev": true + } + } + }, "fstream": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.11.tgz", @@ -8954,7 +9484,7 @@ "auth0-js": "9.6.1", "axios": "0.12.0", "bunyan": "1.8.12", - "jsonwebtoken": "8.5.0", + "jsonwebtoken": "8.5.1", "jwks-rsa": "1.4.0", "le_node": "1.7.1", "lodash": "4.17.10", @@ -8963,9 +9493,9 @@ }, "dependencies": { "ajv": { - "version": "6.9.1", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.9.1.tgz", - "integrity": "sha512-XDN92U311aINL77ieWHmqCcNlwjoP5cHXDxIxbf2MaPYuCXOHS7gHH8jktxeK5omgd52XbSTX6a4Piwd1pQmzA==", + "version": "6.10.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.10.0.tgz", + "integrity": "sha512-nffhOpkymDECQyR0mnsUtoCE8RlX38G0rYP+wgLWFyZuUyuuojSSvi/+euOiQBIn63whYwYVIIH1TvE3tu4OEg==", "requires": { "fast-deep-equal": "2.0.1", "fast-json-stable-stringify": "2.0.0", @@ -8993,7 +9523,7 @@ "resolved": "https://registry.npmjs.org/har-validator/-/har-validator-5.1.3.tgz", "integrity": "sha512-sNvOCzEQNr/qrvJgc3UG/kD4QtlHycrzwS+6mfTrrSq97BvaYcPZZI1ZSqGSPR73Cxn4LKTD4PttRwfU7jWq5g==", "requires": { - "ajv": "6.9.1", + "ajv": "6.10.0", "har-schema": "2.0.0" } }, @@ -9003,11 +9533,11 @@ "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==" }, "jsonwebtoken": { - "version": "8.5.0", - "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-8.5.0.tgz", - "integrity": "sha512-IqEycp0znWHNA11TpYi77bVgyBO/pGESDh7Ajhas+u0ttkGkKYIIAjniL4Bw5+oVejVF+SYkaI7XKfwCCyeTuA==", + "version": "8.5.1", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-8.5.1.tgz", + "integrity": "sha512-XjwVfRS6jTMsqYs0EsuJ4LGxXV14zQybNd4L2r0UvbVnSF9Af8x7p5MzbJ90Ioz/9TI41/hTCvznF/loiSzn8w==", "requires": { - "jws": "3.2.1", + "jws": "3.2.2", "lodash.includes": "4.3.0", "lodash.isboolean": "3.0.3", "lodash.isinteger": "4.0.4", @@ -9016,13 +9546,13 @@ "lodash.isstring": "4.0.1", "lodash.once": "4.1.1", "ms": "2.1.1", - "semver": "5.6.0" + "semver": "5.7.0" } }, "jwa": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.3.0.tgz", - "integrity": "sha512-SxObIyzv9a6MYuZYaSN6DhSm9j3+qkokwvCB0/OTSV5ylPq1wUQiygZQcHT5Qlux0I5kmISx3J86TxKhuefItg==", + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.1.tgz", + "integrity": "sha512-qiLX/xhEEFKUAJ6FiBMbes3w9ATzyk5W7Hvzpa/SLYdxNtng+gcurvrI7TbACjIXlsJyr05/S1oUhZrc63evQA==", "requires": { "buffer-equal-constant-time": "1.0.1", "ecdsa-sig-formatter": "1.0.11", @@ -9043,11 +9573,11 @@ } }, "jws": { - "version": "3.2.1", - "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.1.tgz", - "integrity": "sha512-bGA2omSrFUkd72dhh05bIAN832znP4wOU3lfuXtRBuGTbsmNmDXMQg28f0Vsxaxgk4myF5YkKQpz6qeRpMgX9g==", + "version": "3.2.2", + "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", + "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", "requires": { - "jwa": "1.3.0", + "jwa": "1.4.1", "safe-buffer": "5.1.1" } }, @@ -9114,9 +9644,9 @@ } }, "semver": { - "version": "5.6.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.6.0.tgz", - "integrity": "sha512-RS9R6R35NYgQn++fkDWaOmqGoj4Ek9gGs+DPxNUZKuwE183xjJroKvyo1IzVFeXvUrvmALy6FWD5xrdJT25gMg==" + "version": "5.7.0", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.0.tgz", + "integrity": "sha512-Ya52jSX2u7QKghxeoFGpLwCtGlt7j0oY9DYb5apt9nPlJ42ID+ulTXESnt/qAQcoSERyZ5sl3LDIOw0nAn/5DA==" }, "tough-cookie": { "version": "2.4.3", diff --git a/package.json b/package.json index ab609c5..7e802da 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "scripts": { "start": "node connect/connectNotificationServer", "startAPI": "node index-api", + "startConsumer": "node consumer", "lint": "eslint *.js src config test connect || true", "lint:fix": "eslint *.js --fix src config test connect || true", "postinstall": "npm run build", diff --git a/src/bootstrap.js b/src/bootstrap.js index 8812739..9f7b6fa 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -5,6 +5,3 @@ 'use strict'; global.Promise = require('bluebird'); -const logger = require('./common/logger'); - -logger.buildService(require('./services/NotificationService')); diff --git a/src/common/tcApiHelper.js b/src/common/tcApiHelper.js new file mode 100644 index 0000000..f666a35 --- /dev/null +++ b/src/common/tcApiHelper.js @@ -0,0 +1,295 @@ +/** + * Contains generic helper methods for TC API + */ +const _ = require('lodash'); +const config = require('config'); +const request = require('superagent'); +const m2mAuth = require('tc-core-library-js').auth.m2m; +const m2m = m2mAuth(config); +const constants = require('../../constants'); +const NotificationService = require('../services/NotificationService'); +const logger = require('./logger'); + +/** + * Get M2M token. + * @returns {String} the M2M token + */ +function* getM2MToken() { + return yield m2m.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET); +} + +/** + * Search users by query string. + * @param {String} query the query string + * @returns {Array} the matched users + */ +function* searchUsersByQuery(query) { + const token = yield getM2MToken(); + let users = []; + // there may be multiple pages, search all pages + let offset = 0; + const limit = constants.SEARCH_USERS_PAGE_SIZE; + // set initial total to 1 so that at least one search is done, + // it will be updated from search result + let total = 1; + while (offset < total) { + const res = yield request + .get(`${ + config.TC_API_V3_BASE_URL + }/members/_search?query=${ + query + }&offset=${ + offset + }&limit=${ + limit + }&fields=userId,email,handle,firstName,lastName,photoURL,status`) + .set('Authorization', `Bearer ${token}`); + if (!_.get(res, 'body.result.success')) { + throw new Error(`Failed to search users by query: ${query}`); + } + const records = _.get(res, 'body.result.content') || []; + // add users + users = users.concat(records); + + total = _.get(res, 'body.result.metadata.totalCount') || 0; + offset += limit; + } + + logger.verbose(`Searched users: ${JSON.stringify(users, null, 4)}`); + return users; +} + +/** + * Get users by skills. + * @param {Array} skills the skills + * @returns {Array} the matched users + */ +function* getUsersBySkills(skills) { + if (!skills || skills.length === 0) { + return []; + } + // use 'OR' to link the skill matches + const query = _.map(skills, (skill) => 'profiletrait.skills.name%3D"' + skill.trim() + '"').join(' OR '); + return yield searchUsersByQuery(query); +} + +/** + * Get users by handles. + * @param {Array} handles the user handles + * @returns {Array} the matched users + */ +function* getUsersByHandles(handles) { + if (!handles || handles.length === 0) { + return []; + } + // use 'OR' to link the handle matches + const query = _.map(handles, (h) => 'handle:"' + h.trim() + '"').join(' OR '); + return yield searchUsersByQuery(query); +} + +/** + * Send message to bus. + * @param {Object} data the data to send + */ +function* sendMessageToBus(data) { + const token = yield getM2MToken(); + yield request + .post(`${config.TC_API_V5_BASE_URL}/bus/events`) + .set('Content-Type', 'application/json') + .set('Authorization', `Bearer ${token}`) + .send(data) + .catch((err) => { + const errorDetails = _.get(err, 'message'); + throw new Error( + 'Failed to post event to bus.' + + (errorDetails ? ' Server response: ' + errorDetails : '') + ); + }); +} + +/** + * Notify user via email. + * @param {Object} user the user + * @param {Object} message the Kafka message JSON + */ +function* notifyUserViaEmail(user, message) { + const notificationType = message.topic; + const eventType = constants.BUS_API_EVENT.EMAIL.GENERAL; + + const settings = yield NotificationService.getSettings(user.userId); + + // if email notification is explicitly disabled for current notification type do nothing + // by default we treat all notification types enabled + if (settings.notifications[notificationType] + && settings.notifications[notificationType][constants.SETTINGS_EMAIL_SERVICE_ID] + && settings.notifications[notificationType][constants.SETTINGS_EMAIL_SERVICE_ID].enabled === 'no' + ) { + logger.verbose(`Notification '${notificationType}' won't be sent by '${constants.SETTINGS_EMAIL_SERVICE_ID}'` + + ` service to the userId '${user.userId}' due to his notification settings.`); + return; + } + + const userStatus = user.status; + // don't send email notification for inactive users, ideally we should not have generated + // notifications for inactive users, however, for now handling it here as safe gaurd + if (userStatus && constants.ACTIVE_USER_STATUSES.indexOf(userStatus) < 0) { + logger.error('Notification generated for inactive user, ignoring'); + return; + } + + let userEmail; + if (config.ENABLE_DEV_MODE) { + userEmail = config.DEV_MODE_EMAIL; + } else { + userEmail = user.email; + if (!userEmail) { + logger.error(`Email not received for user: ${user.userId}`); + return; + } + } + const recipients = [userEmail]; + + const categories = [`${config.ENV}:${eventType}`.toLowerCase()]; + + const eventMessage = { + data: { + name: user.firstName + ' ' + user.lastName, + handle: user.handle, + date: (new Date(message.timestamp)).toISOString(), + user, + message, + type: notificationType, + }, + recipients, + version: 'v3', + from: { + name: user.handle, + email: config.DEFAULT_REPLY_EMAIL, + }, + categories, + }; + eventMessage.data[eventMessage.data.type] = true; + + // send email message to bus + yield sendMessageToBus({ + topic: eventType, + originator: 'tc-notifications', + timestamp: (new Date()).toISOString(), + 'mime-type': 'application/json', + payload: eventMessage, + }); + logger.info(`Successfully sent ${eventType} event with body ${JSON.stringify(eventMessage, null, 4)} to bus api`); +} + +/** + * Get challenge details + * @param {Number} challengeId the challenge id + * @returns {Object} the challenge details + */ +function* getChallenge(challengeId) { + // this is public API, M2M token is not needed + const res = yield request.get(`${config.TC_API_V4_BASE_URL}/challenges/${challengeId}`); + if (!_.get(res, 'body.result.success')) { + throw new Error(`Failed to get challenge by id ${challengeId}`); + } + return _.get(res, 'body.result.content'); +} + +/** + * Notify users of message. + * @param {Array} users the users + * @param {Object} message the Kafka message + * @returns {Array} the notifications + */ +function* notifyUsersOfMessage(users, message) { + if (!users || users.length === 0) { + logger.info('No users to notify message.'); + return []; + } + + const notifications = []; + // handle each user + for (let i = 0; i < users.length; i += 1) { + const user = users[i]; + // construct notification, rest fields are set in consumer.js + notifications.push({ userId: user.userId }); + + /* TODO Sachin disabled this code + if (config.ENABLE_EMAILS) { + // notify user by email, ignore error in order not to block rest processing + try { + yield notifyUserViaEmail(user, message); + } catch (e) { + logger.error(`Failed to send email to user id: ${user.userId}, handle: ${user.handle}`); + logger.logFullError(e); + } + } */ + + } + logger.info(`Total ${notifications.length} users would be notified.`) + return notifications; +} + +/** + * Fetch Challenge usersInfo from challenge id. + * @param {String} challengeId infomix challenge id + * @returns {Array} the associated user's detail object + */ +function* getUsersInfoFromChallenge(challengeId) { + const token = yield getM2MToken() + let usersInfo = [] + const url = `${config.TC_API_V4_BASE_URL}/challenges/${challengeId}/resources` + logger.info(`calling challenge api ${url} `) + const res = yield request + .get(url) + .set('Authorization', `Bearer ${token}`) + .catch((err) => { + const errorDetails = _.get(err, 'message'); + throw new Error( + `Error in call challenge api by id ${challengeId}` + + (errorDetails ? ' Server response: ' + errorDetails : '') + ) + }) + if (!_.get(res, 'body.result.success')) { + throw new Error(`Failed to get challenge by id ${challengeId}`); + } + usersInfo = _.get(res, 'body.result.content'); + logger.info(`Feteched ${usersInfo.length} records from challenge api`) + return usersInfo; +} + +/** + * Filter associated challenge's user based on criteria + * @param {Array} usersInfo user object array + * @param {Array} filterCriteria on roles + * + * @returns {Array} of user object + */ +function filterChallengeUsers(usersInfo, filterCriteria = []) { + let users = [] + let totaleRoles = [] + _.map(usersInfo, (user) => { + let userId = _.get(user, 'properties.External Reference ID') + let role = _.get(user, 'role') + totaleRoles[role] = 1 + if (filterCriteria.length > 0 && _.indexOf(filterCriteria, role) >= 0) { + users.push({ userId: userId }) + } else if (filterCriteria.length == 0) { + users.push({ userId: userId }) + } + }) + logger.info(`Total roles availables in this challenge are: ${_.keys(totaleRoles).join(',')}`) + return users +} + +module.exports = { + getM2MToken, + getUsersBySkills, + getUsersByHandles, + sendMessageToBus, + notifyUserViaEmail, + getChallenge, + notifyUsersOfMessage, + getUsersInfoFromChallenge, + filterChallengeUsers, +}; diff --git a/src/processors/challenge/ChallengeCreatedHandler.js b/src/processors/challenge/ChallengeCreatedHandler.js new file mode 100644 index 0000000..40a7d72 --- /dev/null +++ b/src/processors/challenge/ChallengeCreatedHandler.js @@ -0,0 +1,20 @@ +/** + * Challenge created handler. + */ +const co = require('co'); +const service = require('../../services/ChallengeCreatedHandlerService'); + +/** + * Handle Kafka JSON message of challenge created. + * + * @param {Object} message the Kafka JSON message + * + * @return {Promise} promise resolved to notifications + */ +const handle = (message) => co(function* () { + return yield service.handle(message); +}); + +module.exports = { + handle, +}; diff --git a/src/processors/challenge/ChallengeHandler.js b/src/processors/challenge/ChallengeHandler.js new file mode 100644 index 0000000..39e60ec --- /dev/null +++ b/src/processors/challenge/ChallengeHandler.js @@ -0,0 +1,21 @@ +/** + * Challenge general handler. + */ +const co = require('co'); +const service = require('../../services/ChallengeService'); + +/** + * Handle Kafka JSON message of challenge created. + * + * @param {Object} message the Kafka JSON message + * @param {Object} ruleSets + * + * @return {Promise} promise resolved to notifications + */ +const handle = (message, ruleSets) => co(function* () { + return yield service.handle(message, ruleSets); +}); + +module.exports = { + handle, +}; diff --git a/src/processors/challenge/ChallengePhaseWarningHandler.js b/src/processors/challenge/ChallengePhaseWarningHandler.js new file mode 100644 index 0000000..647731e --- /dev/null +++ b/src/processors/challenge/ChallengePhaseWarningHandler.js @@ -0,0 +1,20 @@ +/** + * Challenge phase warning handler. + */ +const co = require('co'); +const service = require('../../services/ChallengePhaseWarningHandlerService'); + +/** + * Handle Kafka JSON message of challenge phase warning. + * + * @param {Object} message the Kafka JSON message + * + * @return {Promise} promise resolved to notifications + */ +const handle = (message) => co(function* () { + return yield service.handle(message); +}); + +module.exports = { + handle, +}; diff --git a/src/processors/index.js b/src/processors/index.js new file mode 100644 index 0000000..3fbc148 --- /dev/null +++ b/src/processors/index.js @@ -0,0 +1,15 @@ +/** + * This is entry point of the Kafka consumer processors. + */ +'use strict'; + +const ChallengeCreatedHandler = require('./challenge/ChallengeCreatedHandler'); +const ChallengePhaseWarningHandler = require('./challenge/ChallengePhaseWarningHandler'); +const ChallengeHandler = require('./challenge/ChallengeHandler'); + +// Exports +module.exports = { + handleChallengeCreated: ChallengeCreatedHandler.handle, + handleChallengePhaseWarning: ChallengePhaseWarningHandler.handle, + handleChallenge: ChallengeHandler.handle, +}; diff --git a/src/services/ChallengeCreatedHandlerService.js b/src/services/ChallengeCreatedHandlerService.js new file mode 100644 index 0000000..1f1b39e --- /dev/null +++ b/src/services/ChallengeCreatedHandlerService.js @@ -0,0 +1,45 @@ +/** + * Challenge created handler service. + */ + +'use strict'; + +const joi = require('joi'); +const logger = require('../common/logger'); +const tcApiHelper = require('../common/tcApiHelper'); + +/** + * Handle challenge created message + * @param {Object} message the Kafka message + * @returns {Array} the notifications + */ +function* handle(message) { + // get users by skills + const users = yield tcApiHelper.getUsersBySkills(message.payload.skills); + // notify users of message + return yield tcApiHelper.notifyUsersOfMessage(users, message); +} + +handle.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + challengeId: joi.number().integer().min(1).required(), + challengeTitle: joi.string().required(), + challengeUrl: joi.string().uri().required(), + userId: joi.number().integer().min(1), + initiatorUserId: joi.number().integer().min(1), + skills: joi.array().items(joi.string()), + }).unknown(true).required(), + }).required(), +}; + +// Exports +module.exports = { + handle, +}; + +logger.buildService(module.exports); diff --git a/src/services/ChallengePhaseWarningHandlerService.js b/src/services/ChallengePhaseWarningHandlerService.js new file mode 100644 index 0000000..bea0f03 --- /dev/null +++ b/src/services/ChallengePhaseWarningHandlerService.js @@ -0,0 +1,51 @@ +/** + * Challenge phase warning handler service. + */ + +'use strict'; + +const _ = require('lodash'); +const joi = require('joi'); +const logger = require('../common/logger'); +const tcApiHelper = require('../common/tcApiHelper'); + +/** + * Handle challenge phase warning message + * @param {Object} message the Kafka message + * @returns {Array} the notifications + */ +function* handle(message) { + // get challenge details + const challenge = yield tcApiHelper.getChallenge(message.payload.challengeId); + // get registrants handles + const handles = _.map(challenge.registrants || [], (r) => r.handle); + // get users by handles + const users = yield tcApiHelper.getUsersByHandles(handles); + // notify users of message + return yield tcApiHelper.notifyUsersOfMessage(users, message); +} + +handle.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + challengeId: joi.number().integer().min(1).required(), + challengeTitle: joi.string().required(), + challengeUrl: joi.string().uri().required(), + phase: joi.string().required(), + remainingTime: joi.number(), + userId: joi.number().integer().min(1), + initiatorUserId: joi.number().integer().min(1), + }).unknown(true).required(), + }).required(), +}; + +// Exports +module.exports = { + handle, +}; + +logger.buildService(module.exports); diff --git a/src/services/ChallengeService.js b/src/services/ChallengeService.js new file mode 100644 index 0000000..7435b78 --- /dev/null +++ b/src/services/ChallengeService.js @@ -0,0 +1,54 @@ +/** + * Challenge general handler service. + */ + +'use strict'; + +const joi = require('joi'); +const _ = require('lodash') +const logger = require('../common/logger'); +const tcApiHelper = require('../common/tcApiHelper'); + +/** + * Handle challenge message + * @param {Object} message the Kafka message + * @param {Object} ruleSets + * @returns {Array} the notifications + */ +function* handle(message, ruleSets) { + + if (message.payload.type === _.get(ruleSets, "type")) { + const challengeId = message.payload.data.id + const usersInfo = yield tcApiHelper.getUsersInfoFromChallenge(challengeId) + const filerOnRoles = _.get(ruleSets, "roles") + const users = tcApiHelper.filterChallengeUsers(usersInfo, filerOnRoles) + logger.info(`Successfully filetered ${users.length} users on rulesets ${JSON.stringify(filerOnRoles)} `) + // notify users of message + return yield tcApiHelper.notifyUsersOfMessage(users, message); + } + return {} +} + +handle.schema = { + message: joi.object().keys({ + topic: joi.string().required(), + originator: joi.string().required(), + timestamp: joi.date().required(), + 'mime-type': joi.string().required(), + payload: joi.object().keys({ + data: joi.object().keys({ + id: joi.number().integer().min(1) + }).unknown(true).required(), + type: joi.string().required(), + userId: joi.number().integer().min(1) + }).unknown(true).required(), + }).required(), + ruleSets: joi.object() +} + +// Exports +module.exports = { + handle, +} + +logger.buildService(module.exports); diff --git a/src/services/NotificationService.js b/src/services/NotificationService.js index 52772fd..027116d 100644 --- a/src/services/NotificationService.js +++ b/src/services/NotificationService.js @@ -7,6 +7,7 @@ const _ = require('lodash'); const Joi = require('joi'); const errors = require('../common/errors'); +const logger = require('../common/logger'); const models = require('../models'); const DEFAULT_LIMIT = 10; @@ -308,3 +309,5 @@ module.exports = { getSettings, updateSettings, }; + +logger.buildService(module.exports); diff --git a/test/init-db.js b/test/init-db.js new file mode 100644 index 0000000..1cbe392 --- /dev/null +++ b/test/init-db.js @@ -0,0 +1,20 @@ +/** + * Initialize database tables. Create tables if not present. + */ +'use strict'; + +global.Promise = require('bluebird'); +const models = require('../src/models'); +const logger = require('../src/common/logger'); + +logger.info('Initialize database tables...'); + +models.init() + .then(() => { + logger.info('Initialize database tables - COMPLETED'); + process.exit(); + }) + .catch((err) => { + logger.logFullError(err); + process.exit(1); + });