Skip to content

Commit bc9f18c

Browse files
author
vikasrohit
authored
Merge pull request #128 from topcoder-platform/feature/test_health_check
Feature/test health check
2 parents 3ccf1c6 + 7213c7d commit bc9f18c

File tree

2 files changed

+53
-31
lines changed

2 files changed

+53
-31
lines changed

connect/connectNotificationServer.js

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -86,27 +86,25 @@ const getNotificationsForMentionedUser = (logger, eventConfig, content) => {
8686
// only one per userHandle
8787
notifications = _.uniqBy(notifications, 'userHandle');
8888

89-
return new Promise((resolve, reject) => { // eslint-disable-line no-unused-vars
90-
const handles = _.map(notifications, 'userHandle');
91-
if (handles.length > 0) {
92-
service.getUsersByHandle(handles).then((users) => {
93-
_.forEach(notifications, (notification) => {
94-
const mentionedUser = _.find(users, { handle: notification.userHandle });
95-
notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle;
96-
});
97-
resolve(notifications);
98-
}).catch((error) => {
99-
if (logger) {
100-
logger.error(error);
101-
logger.info('Unable to send notification to mentioned user')
102-
}
103-
//resolves with empty notification which essentially means we are unable to send notification to mentioned user
104-
resolve([]);
89+
const handles = _.map(notifications, 'userHandle');
90+
if (handles.length > 0) {
91+
return service.getUsersByHandle(handles).then((users) => {
92+
_.forEach(notifications, (notification) => {
93+
const mentionedUser = _.find(users, { handle: notification.userHandle });
94+
notification.userId = mentionedUser ? mentionedUser.userId.toString() : notification.userHandle;
10595
});
106-
} else {
107-
resolve([]);
108-
}
109-
});
96+
return Promise.resolve(notifications);
97+
}).catch((error) => {
98+
if (logger) {
99+
logger.error(error);
100+
logger.info('Unable to send notification to mentioned user')
101+
}
102+
//resolves with empty notification which essentially means we are unable to send notification to mentioned user
103+
return Promise.resolve([]);
104+
});
105+
} else {
106+
return Promise.resolve([]);
107+
}
110108
};
111109

112110
/**
@@ -151,7 +149,7 @@ const getProjectMembersNotifications = (eventConfig, project) => {
151149
return Promise.resolve([]);
152150
}
153151

154-
return new Promise((resolve) => {
152+
return Promise.promisify((callback) => {
155153
let notifications = [];
156154
const projectMembers = _.get(project, 'members', []);
157155

@@ -186,8 +184,8 @@ const getProjectMembersNotifications = (eventConfig, project) => {
186184
// only one per userId
187185
notifications = _.uniqBy(notifications, 'userId');
188186

189-
resolve(notifications);
190-
});
187+
callback(null, notifications);
188+
})();
191189
};
192190

193191
/**
@@ -415,10 +413,10 @@ const handler = (topic, message, logger, callback) => {
415413
}
416414

417415
// get project details
418-
service.getProject(projectId).then(project => {
416+
return service.getProject(projectId).then(project => {
419417
let allNotifications = [];
420418

421-
Promise.all([
419+
return Promise.all([
422420
// the order in this list defines the priority of notification for the SAME user
423421
// upper in this list - higher priority
424422
// NOTE: always add all handles here, they have to check by themselves:
@@ -506,5 +504,12 @@ if (config.ENABLE_EMAILS) {
506504
// notificationServer.logger.error('Notification server errored out');
507505
// });
508506

507+
508+
process.on('unhandledRejection', (reason, promise) => {
509+
console.log('Unhandled Rejection at:', promise, 'reason:', reason);
510+
// aborts the process to let the HA of the container to restart the task
511+
process.abort();
512+
});
513+
509514
// if no need to init database, then directly start the server:
510515
notificationServer.startKafkaConsumers();

src/app.js

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,37 @@ function startKafkaConsumer(handlers, notificationServiceHandlers) {
7575
});
7676
});
7777

78+
var latestSubscriptions = null;
79+
7880
const check = function () {
79-
logger.debug('Checking Health...') ;
81+
logger.debug("Checking health");
8082
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
8183
logger.debug('Found unhealthy Kafka Brokers...');
8284
return false;
8385
}
8486
let connected = true;
87+
let currentSubscriptions = consumer.subscriptions;
88+
for(var sIdx in currentSubscriptions) {
89+
// current subscription
90+
let sub = currentSubscriptions[sIdx];
91+
// previous subscription
92+
let prevSub = latestSubscriptions ? latestSubscriptions[sIdx] : null;
93+
// levarage the `paused` field (https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L66) to
94+
// determine if there was a possibility of an unhandled exception. If we find paused status for the same
95+
// topic in two consecutive health checks, we assume it was stuck because of unhandled error
96+
if (prevSub && prevSub.paused && sub.paused) {
97+
logger.error(`Found subscription for ${sIdx} in paused state for consecutive health checks`);
98+
return false;
99+
}
100+
}
101+
// stores the latest subscription status in global variable
102+
latestSubscriptions = consumer.subscriptions;
85103
consumer.client.initialBrokers.forEach(conn => {
86-
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
87-
connected = conn.connected & connected;
104+
logger.debug(`url ${conn.server()} - connected=${conn.connected}`)
105+
connected = conn.connected & connected
88106
});
89-
logger.debug('Found all Kafka Brokers healthy...');
90-
return connected;
91-
};
107+
return connected
108+
}
92109

93110
consumer
94111
.init()

0 commit comments

Comments
 (0)