diff --git a/connect/connectNotificationServer.js b/connect/connectNotificationServer.js index 2bb28c2..41184bb 100644 --- a/connect/connectNotificationServer.js +++ b/connect/connectNotificationServer.js @@ -86,27 +86,25 @@ const getNotificationsForMentionedUser = (logger, eventConfig, content) => { // only one per userHandle notifications = _.uniqBy(notifications, 'userHandle'); - return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars - const handles = _.map(notifications, 'userHandle'); - if (handles.length > 0) { - service.getUsersByHandle(handles).then((users) => { - _.forEach(notifications, (notification) => { - const mentionedUser = _.find(users, { handle: notification.userHandle }); - notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle; - }); - resolve(notifications); - }).catch((error) => { - if (logger) { - logger.error(error); - logger.info('Unable to send notification to mentioned user') - } - //resolves with empty notification which essentially means we are unable to send notification to mentioned user - resolve([]); + const handles = _.map(notifications, 'userHandle'); + if (handles.length > 0) { + return service.getUsersByHandle(handles).then((users) => { + _.forEach(notifications, (notification) => { + const mentionedUser = _.find(users, { handle: notification.userHandle }); + notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle; }); - } else { - resolve([]); - } - }); + return Promise.resolve(notifications); + }).catch((error) => { + if (logger) { + logger.error(error); + logger.info('Unable to send notification to mentioned user') + } + //resolves with empty notification which essentially means we are unable to send notification to mentioned user + return Promise.resolve([]); + }); + } else { + return Promise.resolve([]); + } }; /** @@ -151,7 +149,7 @@ const getProjectMembersNotifications = (eventConfig, project) => { return Promise.resolve([]); } - return new Promise((resolve) => { + return Promise.promisify((callback) => { let notifications = []; const projectMembers = _.get(project, 'members', []); @@ -186,8 +184,8 @@ const getProjectMembersNotifications = (eventConfig, project) => { // only one per userId notifications = _.uniqBy(notifications, 'userId'); - resolve(notifications); - }); + callback(null, notifications); + })(); }; /** @@ -415,10 +413,10 @@ const handler = (topic, message, logger, callback) => { } // get project details - service.getProject(projectId).then(project => { + return service.getProject(projectId).then(project => { let allNotifications = []; - Promise.all([ + return Promise.all([ // the order in this list defines the priority of notification for the SAME user // upper in this list - higher priority // NOTE: always add all handles here, they have to check by themselves: @@ -506,5 +504,12 @@ if (config.ENABLE_EMAILS) { // notificationServer.logger.error('Notification server errored out'); // }); + +process.on('unhandledRejection', (reason, promise) => { + console.log('Unhandled Rejection at:', promise, 'reason:', reason); + // aborts the process to let the HA of the container to restart the task + process.abort(); +}); + // if no need to init database, then directly start the server: notificationServer.startKafkaConsumers(); diff --git a/src/app.js b/src/app.js index ccdbbb3..5c7944d 100644 --- a/src/app.js +++ b/src/app.js @@ -75,20 +75,37 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) { }); }); + var latestSubscriptions = null; + const check = function () { - logger.debug('Checking Health...') ; + logger.debug("Checking health"); if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { logger.debug('Found unhealthy Kafka Brokers...'); return false; } let connected = true; + let currentSubscriptions = consumer.subscriptions; + for(var sIdx in currentSubscriptions) { + // current subscription + let sub = currentSubscriptions[sIdx]; + // previous subscription + let prevSub = latestSubscriptions ? latestSubscriptions[sIdx] : null; + // levarage the `paused` field (https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L66) to + // determine if there was a possibility of an unhandled exception. If we find paused status for the same + // topic in two consecutive health checks, we assume it was stuck because of unhandled error + if (prevSub && prevSub.paused && sub.paused) { + logger.error(`Found subscription for ${sIdx} in paused state for consecutive health checks`); + return false; + } + } + // stores the latest subscription status in global variable + latestSubscriptions = consumer.subscriptions; consumer.client.initialBrokers.forEach(conn => { - logger.debug(`url ${conn.server()} - connected=${conn.connected}`); - connected = conn.connected & connected; + logger.debug(`url ${conn.server()} - connected=${conn.connected}`) + connected = conn.connected & connected }); - logger.debug('Found all Kafka Brokers healthy...'); - return connected; - }; + return connected + } consumer .init()