Skip to content

Feature/test health check #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions connect/connectNotificationServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,25 @@ const getNotificationsForMentionedUser = (logger, eventConfig, content) => {
// only one per userHandle
notifications = _.uniqBy(notifications, 'userHandle');

return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
const handles = _.map(notifications, 'userHandle');
if (handles.length > 0) {
service.getUsersByHandle(handles).then((users) => {
_.forEach(notifications, (notification) => {
const mentionedUser = _.find(users, { handle: notification.userHandle });
notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle;
});
resolve(notifications);
}).catch((error) => {
if (logger) {
logger.error(error);
logger.info('Unable to send notification to mentioned user')
}
//resolves with empty notification which essentially means we are unable to send notification to mentioned user
resolve([]);
const handles = _.map(notifications, 'userHandle');
if (handles.length > 0) {
return service.getUsersByHandle(handles).then((users) => {
_.forEach(notifications, (notification) => {
const mentionedUser = _.find(users, { handle: notification.userHandle });
notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle;
});
} else {
resolve([]);
}
});
return Promise.resolve(notifications);
}).catch((error) => {
if (logger) {
logger.error(error);
logger.info('Unable to send notification to mentioned user')
}
//resolves with empty notification which essentially means we are unable to send notification to mentioned user
return Promise.resolve([]);
});
} else {
return Promise.resolve([]);
}
};

/**
Expand Down Expand Up @@ -151,7 +149,7 @@ const getProjectMembersNotifications = (eventConfig, project) => {
return Promise.resolve([]);
}

return new Promise((resolve) => {
return Promise.promisify((callback) => {
let notifications = [];
const projectMembers = _.get(project, 'members', []);

Expand Down Expand Up @@ -186,8 +184,8 @@ const getProjectMembersNotifications = (eventConfig, project) => {
// only one per userId
notifications = _.uniqBy(notifications, 'userId');

resolve(notifications);
});
callback(null, notifications);
})();
};

/**
Expand Down Expand Up @@ -415,10 +413,10 @@ const handler = (topic, message, logger, callback) => {
}

// get project details
service.getProject(projectId).then(project => {
return service.getProject(projectId).then(project => {
let allNotifications = [];

Promise.all([
return Promise.all([
// the order in this list defines the priority of notification for the SAME user
// upper in this list - higher priority
// NOTE: always add all handles here, they have to check by themselves:
Expand Down Expand Up @@ -506,5 +504,12 @@ if (config.ENABLE_EMAILS) {
// notificationServer.logger.error('Notification server errored out');
// });


process.on('unhandledRejection', (reason, promise) => {
console.log('Unhandled Rejection at:', promise, 'reason:', reason);
// aborts the process to let the HA of the container to restart the task
process.abort();
});

// if no need to init database, then directly start the server:
notificationServer.startKafkaConsumers();
29 changes: 23 additions & 6 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,37 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
});
});

var latestSubscriptions = null;

const check = function () {
logger.debug('Checking Health...') ;
logger.debug("Checking health");
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
logger.debug('Found unhealthy Kafka Brokers...');
return false;
}
let connected = true;
let currentSubscriptions = consumer.subscriptions;
for(var sIdx in currentSubscriptions) {
// current subscription
let sub = currentSubscriptions[sIdx];
// previous subscription
let prevSub = latestSubscriptions ? latestSubscriptions[sIdx] : null;
// levarage the `paused` field (https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L66) to
// determine if there was a possibility of an unhandled exception. If we find paused status for the same
// topic in two consecutive health checks, we assume it was stuck because of unhandled error
if (prevSub && prevSub.paused && sub.paused) {
logger.error(`Found subscription for ${sIdx} in paused state for consecutive health checks`);
return false;
}
}
// stores the latest subscription status in global variable
latestSubscriptions = consumer.subscriptions;
consumer.client.initialBrokers.forEach(conn => {
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
connected = conn.connected & connected;
logger.debug(`url ${conn.server()} - connected=${conn.connected}`)
connected = conn.connected & connected
});
logger.debug('Found all Kafka Brokers healthy...');
return connected;
};
return connected
}

consumer
.init()
Expand Down