diff --git a/config/default.js b/config/default.js index 2256a15..632c5ca 100644 --- a/config/default.js +++ b/config/default.js @@ -16,6 +16,7 @@ module.exports = { LOG_LEVEL: process.env.LOG_LEVEL || 'debug', PARTITION: process.env.PARTITION || 0, TOPIC: process.env.TOPIC || 'tc-x-events', + TOPIC_NOTIFICATION: process.env.TOPIC_NOTIFICATION || 'notifications.action.create', KAFKA_OPTIONS: { connectionString: process.env.KAFKA_URL || 'localhost:9092', groupId: process.env.KAFKA_GROUP_ID || 'topcoder-x-processor', @@ -25,6 +26,11 @@ module.exports = { passphrase: 'secret', // NOTE:* This configuration specifies the private key passphrase used while creating it. } }, + MAIL_NOTICIATION: { + type: 'tcx.mail_notification', + sendgridTemplateId: 'xxxxxx', + subject: 'Topcoder X Alert' + }, NEW_CHALLENGE_TEMPLATE: process.env.NEW_CHALLENGE_TEMPLATE || { status: 'Draft' }, diff --git a/index.js b/index.js index 3eed2a5..ee57023 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,7 @@ const config = require('config'); const _ = require('lodash'); -const kafka = require('./utils/kafka'); +const kafkaConsumer = require('./utils/kafka-consumer'); const logger = require('./utils/logger'); process.on('uncaughtException', (err) => { @@ -55,4 +55,4 @@ dumpConfigs(config, 0); logger.debug('--- End of List of Configurations ---'); // run the server -kafka.run(); +kafkaConsumer.run(); diff --git a/services/GithubService.js b/services/GithubService.js index a53bc36..6f2eafe 100644 --- a/services/GithubService.js +++ b/services/GithubService.js @@ -53,7 +53,7 @@ async function _authenticate(accessToken) { }); return octokit.rest; } catch (err) { - throw errors.convertGitHubError(err, 'Failed to authenticate to Github using access token of copilot.'); + throw errors.handleGitHubError(err, 'Failed to authenticate to Github using access token of copilot.'); } } @@ -75,7 +75,7 @@ async function _removeAssignees(github, owner, repo, number, assignees) { assignees }); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during remove assignees from issue.'); + throw errors.handleGitHubError(err, 'Error occurred during remove assignees from issue.'); } } @@ -93,6 +93,17 @@ async function _getUsernameById(id) { return user ? user.login : null; } +/** + * Get github issue url + * @param {String} repoPath the repo path + * @param {Number} number the issue number + * @returns {String} the url + * @private + */ +function _getIssueUrl(repoPath, number) { + return `https://github.com/${repoPath}/issues/${number}`; +} + /** * updates the title of github issue * @param {Object} copilot the copilot @@ -107,7 +118,7 @@ async function updateIssue(copilot, repoFullName, number, title) { try { await github.issues.update({owner, repo, issue_number: number, title}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during updating issue.'); + throw errors.handleGitHubError(err, 'Error occurred during updating issue.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github issue title is updated for issue number ${number}`); } @@ -139,7 +150,7 @@ async function assignUser(copilot, repoFullName, number, user) { } await github.issues.addAssignees({owner, repo, issue_number: number, assignees: [user]}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during assigning issue user.'); + throw errors.handleGitHubError(err, 'Error occurred during assigning issue user.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github issue with number ${number} is assigned to ${user}`); } @@ -184,7 +195,7 @@ async function createComment(copilot, repoFullName, number, body) { body = helper.prepareAutomatedComment(body, copilot); await github.issues.createComment({owner, repo, issue_number: number, body}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during creating comment on issue.'); + throw errors.handleGitHubError(err, 'Error occurred during creating comment on issue.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github comment is added on issue with message: "${body}"`); } @@ -262,7 +273,7 @@ async function markIssueAsPaid(copilot, repoFullName, number, challengeUUID, exi const body = helper.prepareAutomatedComment(commentMessage, copilot); await github.issues.createComment({owner, repo, issue_number: number, body}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during updating issue as paid.'); + throw errors.handleGitHubError(err, 'Error occurred during updating issue as paid.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github issue title is updated for as paid and fix accepted for ${number}`); } @@ -291,7 +302,7 @@ async function changeState(copilot, repoFullName, number, state) { try { await github.issues.update({owner, repo, issue_number: number, state}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during updating status of issue.'); + throw errors.handleGitHubError(err, 'Error occurred during updating status of issue.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github issue state is updated to '${state}' for issue number ${number}`); } @@ -317,7 +328,7 @@ async function addLabels(copilot, repoFullName, number, labels) { try { await github.issues.update({owner, repo, issue_number: number, labels}); } catch (err) { - throw errors.convertGitHubError(err, 'Error occurred during adding label in issue.'); + throw errors.handleGitHubError(err, 'Error occurred during adding label in issue.', copilot.topcoderUsername, _getIssueUrl(repoFullName, number)); } logger.debug(`Github issue is updated with new labels for ${number}`); } diff --git a/services/GitlabService.js b/services/GitlabService.js index 24fa38a..542e9bb 100644 --- a/services/GitlabService.js +++ b/services/GitlabService.js @@ -37,7 +37,7 @@ async function _authenticate(accessToken) { }); return gitlab; } catch (err) { - throw errors.convertGitLabError(err, 'Failed to during authenticate to Github using access token of copilot.'); + throw errors.handleGitLabError(err, 'Failed to during authenticate to Github using access token of copilot.'); } } @@ -55,25 +55,37 @@ async function _removeAssignees(gitlab, projectId, issueId, assignees) { const oldAssignees = _.difference(issue.assignee_ids, assignees); await gitlab.projects.issues.edit(projectId, issueId, {assignee_ids: oldAssignees}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during remove assignees from issue.'); + throw errors.handleGitLabError(err, 'Error occurred during remove assignees from issue.'); } } +/** + * Get gitlab issue url + * @param {String} repoPath the repo path + * @param {Number} issueId the issue number + * @returns {String} the url + * @private + */ +function _getIssueUrl(repoPath, issueId) { + return `https://gitlab.com/${repoPath}/issues/${issueId}`; +} + /** * creates the comments on gitlab issue * @param {Object} copilot the copilot - * @param {Number} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue number * @param {string} body the comment body text */ -async function createComment(copilot, projectId, issueId, body) { +async function createComment(copilot, project, issueId, body) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, body}, createComment.schema); const gitlab = await _authenticate(copilot.accessToken); try { body = helper.prepareAutomatedComment(body, copilot); await gitlab.projects.issues.notes.create(projectId, issueId, {body}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during creating comment on issue.'); + throw errors.handleGitLabError(err, 'Error occurred during creating comment on issue.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab comment is added on issue with message: "${body}"`); } @@ -88,17 +100,18 @@ createComment.schema = { /** * updates the title of gitlab issue * @param {Object} copilot the copilot - * @param {Number} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue number * @param {string} title new title */ -async function updateIssue(copilot, projectId, issueId, title) { +async function updateIssue(copilot, project, issueId, title) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, title}, updateIssue.schema); const gitlab = await _authenticate(copilot.accessToken); try { await gitlab.projects.issues.edit(projectId, issueId, {title}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during updating issue.'); + throw errors.handleGitLabError(err, 'Error occurred during updating issue.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab issue title is updated for issue number ${issueId}`); } @@ -113,11 +126,12 @@ updateIssue.schema = { /** * Assigns the issue to user login * @param {Object} copilot the copilot - * @param {Number} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue number * @param {Number} userId the user id of assignee */ -async function assignUser(copilot, projectId, issueId, userId) { +async function assignUser(copilot, project, issueId, userId) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, userId}, assignUser.schema); const gitlab = await _authenticate(copilot.accessToken); try { @@ -128,7 +142,7 @@ async function assignUser(copilot, projectId, issueId, userId) { } await gitlab.projects.issues.edit(projectId, issueId, {assignee_ids: [userId]}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during assigning issue user.'); + throw errors.handleGitLabError(err, 'Error occurred during assigning issue user.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab issue with number ${issueId} is assigned to ${issueId}`); } @@ -143,11 +157,12 @@ assignUser.schema = { /** * Removes an assignee from the issue * @param {Object} copilot the copilot - * @param {Number} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue number * @param {Number} userId the user id of assignee to remove */ -async function removeAssign(copilot, projectId, issueId, userId) { +async function removeAssign(copilot, project, issueId, userId) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, userId}, removeAssign.schema); const gitlab = await _authenticate(copilot.accessToken); await _removeAssignees(gitlab, projectId, issueId, [userId]); @@ -195,14 +210,15 @@ getUserIdByLogin.schema = { /** * updates the gitlab issue as paid and fix accepted * @param {Object} copilot the copilot - * @param {Number} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue number * @param {String} challengeUUID the challenge uuid * @param {Array} existLabels the issue labels * @param {String} winner the winner topcoder handle * @param {Boolean} createCopilotPayments the option to create copilot payments or not */ -async function markIssueAsPaid(copilot, projectId, issueId, challengeUUID, existLabels, winner, createCopilotPayments) { // eslint-disable-line max-params +async function markIssueAsPaid(copilot, project, issueId, challengeUUID, existLabels, winner, createCopilotPayments) { // eslint-disable-line max-params + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, challengeUUID, existLabels, winner, createCopilotPayments}, markIssueAsPaid.schema); const gitlab = await _authenticate(copilot.accessToken); const labels = _(existLabels).filter((i) => i !== config.FIX_ACCEPTED_ISSUE_LABEL) @@ -222,7 +238,7 @@ async function markIssueAsPaid(copilot, projectId, issueId, challengeUUID, exist const body = helper.prepareAutomatedComment(commentMessage, copilot); await gitlab.projects.issues.notes.create(projectId, issueId, {body}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during updating issue as paid.'); + throw errors.handleGitLabError(err, 'Error occurred during updating issue as paid.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab issue is updated for as paid and fix accepted for ${issueId}`); } @@ -240,17 +256,18 @@ markIssueAsPaid.schema = { /** * change the state of gitlab issue * @param {Object} copilot the copilot - * @param {string} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue issue id * @param {string} state new state */ -async function changeState(copilot, projectId, issueId, state) { +async function changeState(copilot, project, issueId, state) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, state}, changeState.schema); const gitlab = await _authenticate(copilot.accessToken); try { await gitlab.projects.issues.edit(projectId, issueId, {state_event: state}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during updating status of issue.'); + throw errors.handleGitLabError(err, 'Error occurred during updating status of issue.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab issue state is updated to '${state}' for issue number ${issueId}`); } @@ -265,17 +282,18 @@ changeState.schema = { /** * updates the gitlab issue with new labels * @param {Object} copilot the copilot - * @param {string} projectId the project id + * @param {Object} project the project object * @param {Number} issueId the issue issue id * @param {Number} labels the labels */ -async function addLabels(copilot, projectId, issueId, labels) { +async function addLabels(copilot, project, issueId, labels) { + const projectId = project.id; Joi.attempt({copilot, projectId, issueId, labels}, addLabels.schema); const gitlab = await _authenticate(copilot.accessToken); try { await gitlab.projects.issues.edit(projectId, issueId, {labels: _.join(labels, ',')}); } catch (err) { - throw errors.convertGitLabError(err, 'Error occurred during adding label in issue.'); + throw errors.handleGitLabError(err, 'Error occurred during adding label in issue.', copilot.topcoderUsername, _getIssueUrl(project.full_name, issueId)); } logger.debug(`Gitlab issue is updated with new labels for ${issueId}`); } diff --git a/utils/errors.js b/utils/errors.js index 29592de..81f6750 100644 --- a/utils/errors.js +++ b/utils/errors.js @@ -12,6 +12,7 @@ const _ = require('lodash'); const constants = require('../constants'); +const notification = require('./notification'); // the error class wrapper class ProcessorError extends Error { @@ -27,12 +28,17 @@ class ProcessorError extends Error { const errors = {}; /** -* Convert github api error. +* Handle github api error. Return converted error. * @param {Error} err the github api error * @param {String} message the error message +* @param {String} copilotHandle the handle name of the copilot +* @param {String} repoPath the link to related github page * @returns {Error} converted error */ -errors.convertGitHubError = function convertGitHubError(err, message) { +errors.handleGitHubError = function handleGitHubError(err, message, copilotHandle, repoPath) { + if (err.statusCode === 401 && copilotHandle && repoPath) { // eslint-disable-line no-magic-numbers + notification.sendTokenExpiredAlert(copilotHandle, repoPath, 'Github'); + } let resMsg = `${message}. ${err.message}.`; const detail = _.get(err, 'response.body.message'); if (detail) { @@ -47,12 +53,17 @@ errors.convertGitHubError = function convertGitHubError(err, message) { }; /** - * Convert gitlab api error. + * Handle gitlab api error. Return converted error. * @param {Error} err the gitlab api error * @param {String} message the error message +* @param {String} copilotHandle the handle name of the copilot +* @param {String} repoPath the link to related gitlab page * @returns {Error} converted error */ -errors.convertGitLabError = function convertGitLabError(err, message) { +errors.handleGitLabError = function handleGitLabError(err, message, copilotHandle, repoPath) { + if (err.statusCode === 401 && copilotHandle && repoPath) { // eslint-disable-line no-magic-numbers + notification.sendTokenExpiredAlert(copilotHandle, repoPath, 'Gitlab'); + } let resMsg = `${message}. ${err.message}.`; const detail = _.get(err, 'response.body.message'); if (detail) { diff --git a/utils/git-helper.js b/utils/git-helper.js index 28791d0..cc53626 100644 --- a/utils/git-helper.js +++ b/utils/git-helper.js @@ -27,7 +27,7 @@ class GitHelper { if (event.provider === 'github') { await gitHubService.createComment(event.copilot, event.data.repository.full_name, issueNumber, comment); } else if (event.provider === 'gitlab') { - await gitlabService.createComment(event.copilot, event.data.repository.id, issueNumber, comment); + await gitlabService.createComment(event.copilot, event.data.repository, issueNumber, comment); } } @@ -41,7 +41,7 @@ class GitHelper { if (event.provider === 'github') { await gitHubService.addLabels(event.copilot, event.data.repository.full_name, issueNumber, labels); } else if (event.provider === 'gitlab') { - await gitlabService.addLabels(event.copilot, event.data.repository.id, issueNumber, labels); + await gitlabService.addLabels(event.copilot, event.data.repository, issueNumber, labels); } } @@ -54,7 +54,7 @@ class GitHelper { if (event.provider === 'github') { await gitHubService.changeState(event.copilot, event.data.repository.full_name, issue.number, 'open'); } else if (event.provider === 'gitlab') { - await gitlabService.changeState(event.copilot, event.data.repository.id, issue.number, 'reopen'); + await gitlabService.changeState(event.copilot, event.data.repository, issue.number, 'reopen'); } } @@ -84,7 +84,7 @@ class GitHelper { if (event.provider === 'github') { await gitHubService.removeAssign(event.copilot, event.data.repository.full_name, issueNumber, assigneeUsername); } else if (event.provider === 'gitlab') { - await gitlabService.removeAssign(event.copilot, event.data.repository.id, issueNumber, assigneeUserId); + await gitlabService.removeAssign(event.copilot, event.data.repository, issueNumber, assigneeUserId); } } @@ -98,7 +98,7 @@ class GitHelper { if (event.provider === 'github') { await gitHubService.updateIssue(event.copilot, event.data.repository.full_name, issueNumber, newTitle); } else if (event.provider === 'gitlab') { - await gitlabService.updateIssue(event.copilot, event.data.repository.id, issueNumber, newTitle); + await gitlabService.updateIssue(event.copilot, event.data.repository, issueNumber, newTitle); } } @@ -113,7 +113,7 @@ class GitHelper { await gitHubService.assignUser(event.copilot, event.data.repository.full_name, issueNumber, assignedUser); } else if (event.provider === 'gitlab') { const userId = await gitlabService.getUserIdByLogin(event.copilot, assignedUser); - await gitlabService.assignUser(event.copilot, event.data.repository.id, issueNumber, userId); + await gitlabService.assignUser(event.copilot, event.data.repository, issueNumber, userId); } } @@ -131,7 +131,7 @@ class GitHelper { await gitHubService.markIssueAsPaid(event.copilot, event.data.repository.full_name, issueNumber, challengeUUID, existLabels, winner, createCopilotPayments); } else if (event.provider === 'gitlab') { - await gitlabService.markIssueAsPaid(event.copilot, event.data.repository.id, issueNumber, challengeUUID, existLabels, winner, + await gitlabService.markIssueAsPaid(event.copilot, event.data.repository, issueNumber, challengeUUID, existLabels, winner, createCopilotPayments); } else if (event.provider === 'azure') { await azureService.markIssueAsPaid(event.copilot, event.data.repository.full_name, issueNumber, challengeUUID, existLabels); diff --git a/utils/kafka-consumer.js b/utils/kafka-consumer.js new file mode 100644 index 0000000..65fc9c4 --- /dev/null +++ b/utils/kafka-consumer.js @@ -0,0 +1,85 @@ +/** + * Module wrapper for consume kafka topic. + * + * @author TCSCODER + * @version 1.0 + */ +'use strict'; + +const config = require('config'); +const _ = require('lodash'); + +const healthcheck = require('topcoder-healthcheck-dropin'); +const IssueService = require('../services/IssueService'); +const CopilotPaymentService = require('../services/CopilotPaymentService'); +const logger = require('./logger'); +const kafka = require('./kafka'); + +/** + * Handle the message from kafka + * @param {Object} messageSet object to handle + */ +function messageHandler(messageSet) { + logger.debug(` topics ======= ${JSON.stringify(messageSet)}`); + messageSet.forEach((item) => { + // The event should be a JSON object + let event; + try { + const message = JSON.parse(item.message.value.toString('utf8')); + event = JSON.parse(message.payload.value); + message.payload.value = event; + logger.debug(`received message from kafka: ${JSON.stringify(_.omit(message, 'payload.value.data.issue.body'))}`); + } catch (err) { + logger.error(`"message" is not a valid JSON-formatted string: ${err.message}`); + return; + } + + if (event && _.includes(['issue.created', 'issue.updated', 'issue.closed', 'issue.recreated', + 'comment.created', 'comment.updated', 'issue.assigned', 'issue.labelUpdated', 'issue.unassigned'] + , event.event)) { + IssueService + .process(event) + .catch(logger.error); + } + if (event && _.includes(['copilotPayment.add', 'copilotPayment.update', 'copilotPayment.delete', 'copilotPayment.checkUpdates'] + , event.event)) { + CopilotPaymentService + .process(event) + .catch(logger.error); + } + }); +} + +/** + * check if there is kafka connection alive + * @returns {Boolean} true + */ +function check() { + // if (!this.consumer.client.initialBrokers && !this.consumer.client.initialBrokers.length) { + // logger.info(`Brokers Exist Check Failed ${this.consumer.client.initialBrokers} ${this.consumer.client.initialBrokers.length}`) + // return false; + // } + // let connected = true; + // this.consumer.client.initialBrokers.forEach((conn) => { + // logger.info(`Brokers Check Failed ${conn.connected}`) + // connected = conn.connected && connected; + // }); + + // return connected; + return true; +} + +/** + * run the consumer + */ +function run() { + kafka.consumer.init().then(() => { + logger.info('kafka consumer is ready'); + healthcheck.init([check]); + kafka.consumer.subscribe(config.TOPIC, {}, messageHandler); + }).catch((err) => { + logger.error(`kafka consumer is not connected. ${err.stack}`); + }); +} + +module.exports = {run}; diff --git a/utils/kafka-sender.js b/utils/kafka-sender.js new file mode 100644 index 0000000..a053456 --- /dev/null +++ b/utils/kafka-sender.js @@ -0,0 +1,61 @@ +/** + * Module wrapper for sending messages to kafka. + * + * @author TCSCODER + * @version 1.0 + */ +'use strict'; + +const config = require('config'); +const kafka = require('./kafka'); + +/** + * Send message to general topic in kafka. + * @param {String} message the message to send + * @returns {Object} Result from kafka + */ +function send(message) { + const data = JSON.stringify({ + topic: config.TOPIC, + originator: 'topcoder-x-processor', + timestamp: (new Date()).toISOString(), + 'mime-type': 'application/json', + payload: { + value: message + } + }); + return kafka.producer.send({ + topic: config.TOPIC, + message: { + value: data + } + }); +} + +/** + * Send message to notification topic in kafka. + * @param {String} notification the message to send + * @returns {Object} Result from kafka + */ +function sendNotification(notification) { + const data = JSON.stringify({ + topic: config.TOPIC_NOTIFICATION, + originator: 'topcoder-x-processor', + timestamp: (new Date()).toISOString(), + 'mime-type': 'application/json', + payload: { + notifications: [notification] + } + }); + return kafka.producer.send({ + topic: config.TOPIC_NOTIFICATION, + message: { + value: data + } + }); +} + +module.exports = { + send, + sendNotification +}; diff --git a/utils/kafka.js b/utils/kafka.js index b731664..3056fba 100644 --- a/utils/kafka.js +++ b/utils/kafka.js @@ -12,11 +12,7 @@ 'use strict'; const config = require('config'); -const _ = require('lodash'); const kafka = require('no-kafka'); -const healthcheck = require('topcoder-healthcheck-dropin'); -const IssueService = require('../services/IssueService'); -const CopilotPaymentService = require('../services/CopilotPaymentService'); const logger = require('./logger'); class Kafka { @@ -29,82 +25,7 @@ class Kafka { }).catch((err) => { logger.error(`kafka producer is not connected. ${err.stack}`); }); - this.check = this.check.bind(this); - } - - messageHandler(messageSet) { - logger.debug(` topics ======= ${JSON.stringify(messageSet)}`); - messageSet.forEach((item) => { - // The event should be a JSON object - let event; - try { - const message = JSON.parse(item.message.value.toString('utf8')); - event = JSON.parse(message.payload.value); - message.payload.value = event; - logger.debug(`received message from kafka: ${JSON.stringify(_.omit(message, 'payload.value.data.issue.body'))}`); - } catch (err) { - logger.error(`"message" is not a valid JSON-formatted string: ${err.message}`); - return; - } - - if (event && _.includes(['issue.created', 'issue.updated', 'issue.closed', 'issue.recreated', - 'comment.created', 'comment.updated', 'issue.assigned', 'issue.labelUpdated', 'issue.unassigned'] - , event.event)) { - IssueService - .process(event) - .catch(logger.error); - } - if (event && _.includes(['copilotPayment.add', 'copilotPayment.update', 'copilotPayment.delete', 'copilotPayment.checkUpdates'] - , event.event)) { - CopilotPaymentService - .process(event) - .catch(logger.error); - } - }); - } - - // check if there is kafka connection alive - check() { - // if (!this.consumer.client.initialBrokers && !this.consumer.client.initialBrokers.length) { - // logger.info(`Brokers Exist Check Failed ${this.consumer.client.initialBrokers} ${this.consumer.client.initialBrokers.length}`) - // return false; - // } - // let connected = true; - // this.consumer.client.initialBrokers.forEach((conn) => { - // logger.info(`Brokers Check Failed ${conn.connected}`) - // connected = conn.connected && connected; - // }); - - // return connected; - return true; - } - - run() { - this.consumer.init().then(() => { - logger.info('kafka consumer is ready'); - healthcheck.init([this.check]); - this.consumer.subscribe(config.TOPIC, {}, this.messageHandler); - }).catch((err) => { - logger.error(`kafka consumer is not connected. ${err.stack}`); - }); - } - - send(message) { - const data = JSON.stringify({ - topic: config.TOPIC, - originator: 'topcoder-x-processor', - timestamp: (new Date()).toISOString(), - 'mime-type': 'application/json', - payload: { - value: message - } - }); - return this.producer.send({ - topic: config.TOPIC, - message: { - value: data - } - }); + // this.check = this.check.bind(this); } } diff --git a/utils/notification.js b/utils/notification.js new file mode 100644 index 0000000..ad9a8fa --- /dev/null +++ b/utils/notification.js @@ -0,0 +1,49 @@ +/** + * This module contains the helper methods + * for sending notification action to kafka service. + * + * @author TCSCODER + * @version 1.0 + */ +'use strict'; + +const config = require('config'); + +const kafkaSender = require('./kafka-sender'); +const topcoderApiHelper = require('./topcoder-api-helper'); +const logger = require('./logger'); + +const notification = {}; + +const content = `Hi {handle}, +You made an update to ticket {link}, but Topcoder-X couldn't process it properly because your {provider} token has expired. To fix this, please login to x.topcoder.com, click your handle in the upper right and then "Settings" to refresh your token. You will need to redo the action that failed in {provider}.`; // eslint-disable-line max-len + +notification.sendTokenExpiredAlert = async function sendTokenExpiredAlert(copilotHandle, repoPath, provider) { + const copilotId = await topcoderApiHelper.getTopcoderMemberId(copilotHandle); + const notificationConfigs = config.MAIL_NOTICIATION; + logger.debug(`Sending mail notification to copilot ${copilotHandle} Repo: ${repoPath} Provider: ${provider}`); + await kafkaSender.sendNotification({ + serviceId: 'email', + type: notificationConfigs.type, + details: { + from: 'noreply@topcoder.com', + recipients: [ + { + userId: copilotId + } + ], + cc: [], + data: { + subject: notificationConfigs.subject, + body: content + .replace(/{handle}/g, copilotHandle) + .replace(/{link}/g, repoPath) + .replace(/{provider}/g, provider) + }, + sendgridTemplateId: notificationConfigs.sendgridTemplateId, + version: 'v3' + } + }); +}; + +module.exports = notification;