Skip to content

Commit 96b4bec

Browse files
author
nkumar
committed
slack integration
Committer: nkumar
1 parent 2a96987 commit 96b4bec

File tree

4 files changed

+130
-25
lines changed

4 files changed

+130
-25
lines changed

config/default.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ module.exports = {
3838
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
3939
recipients: ['[email protected]'] // Kafka partitions to use
4040
},
41+
SLACK: {
42+
URL: process.env.SLACKURL || 'us-east-1',
43+
SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator',
44+
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
45+
},
4146

4247
AUTH0_URL: process.env.AUTH0_URL ,
4348
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE ,

src/consumer.js

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ const pushToKafka = require('./services/pushToKafka')
99
const healthcheck = require('topcoder-healthcheck-dropin');
1010
const auditTrail = require('./services/auditTrail');
1111
const kafkaOptions = config.get('KAFKA')
12+
const postMessage = require('./services/posttoslack')
1213
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
1314
const consumer = new Kafka.SimpleConsumer({
1415
connectionString: kafkaOptions.brokers_url,
@@ -20,7 +21,6 @@ const consumer = new Kafka.SimpleConsumer({
2021
})
2122
})
2223

23-
2424
const check = function () {
2525
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
2626
return false;
@@ -41,7 +41,7 @@ const terminate = () => process.exit()
4141
* @param {String} topic The name of the message topic
4242
* @param {Number} partition The kafka partition to which messages are written
4343
*/
44-
//let message;
44+
var retryvar="";
4545
//let cs_payloadseqid;
4646
async function dataHandler(messageSet, topic, partition) {
4747
let cs_payloadseqid
@@ -54,10 +54,13 @@ let cs_payloadseqid
5454
logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `);
5555
await updateInformix(message)
5656
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
57+
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
5758
auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
58-
message.payload.operation,"Informix-updated","","","",message.payload.data, message.timestamp,message.topic],'consumer')
59+
message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer')
5960
} catch (err) {
60-
logger.error(`Could not process kafka message or informix DB error: "${err.message}"`)
61+
const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"`
62+
logger.error(errmsg2)
63+
//await callposttoslack(errmsg2)
6164
//logger.logFullError(err)
6265
logger.debug(`error-sync: consumer "${err.message}"`)
6366
if (!cs_payloadseqid){
@@ -67,40 +70,70 @@ let cs_payloadseqid
6770
await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
6871
'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer')
6972
try {
70-
var retryvar
71-
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
73+
//var retryvar
74+
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
7275
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
7376
await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
7477
'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
75-
logger.debug(`Trying to push same message after adding retryCounter`)
78+
//await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`)
79+
logger.debug(`Trying to push same message after adding retryCounter`)
7680
if (!message.payload.retryCount) {
7781
message.payload.retryCount = 0
7882
logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry);
7983
}
8084
if (message.payload.retryCount >= config.KAFKA.maxRetry) {
8185
logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);
82-
logger.debug(`error-sync: consumer max-retry-limit reached`)
83-
let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
86+
logger.debug(`error-sync: consumer max-retry-limit reached`)
87+
// push to slack - alertIt("slack message"
88+
await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`)
89+
let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
8490
notifiyMessage.payload['recipients'] = config.KAFKA.recipients
8591
logger.debug('pushing following message on kafka error alert queue:')
8692
//logger.debug(notifiyMessage)
87-
await pushToKafka(notifiyMessage)
93+
await pushToKafka(notifiyMessage)
8894
return
8995
}
9096
message.payload['retryCount'] = message.payload.retryCount + 1;
9197
await pushToKafka(message)
92-
logger.debug(` After kafka push Retry Count "${message.payload.retryCount}"`)
98+
var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
99+
logger.debug(errmsg9)
100+
//await callposttoslack(errmsg9)
93101
} catch (err) {
94102

95103
await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
96104
message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer')
97-
logger.error("Error occured in re-publishing kafka message", err)
98-
logger.debug(`error-sync: consumer re-publishing "${err.message}"`)
105+
const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"`
106+
logger.error(errmsg1)
107+
logger.debug(`error-sync: consumer re-publishing "${err.message}"`)
108+
// push to slack - alertIt("slack message"
109+
await callposttoslack(errmsg1)
99110
}
100111
}
101112
}
102113
}
103114

115+
async function callposttoslack(slackmessage) {
116+
if(config.SLACK.SLACKNOTIFY === 'true') {
117+
return new Promise(function (resolve, reject) {
118+
postMessage(slackmessage, (response) => {
119+
console.log(`respnse : ${response}`)
120+
if (response.statusCode < 400) {
121+
logger.debug('Message posted successfully');
122+
//callback(null);
123+
} else if (response.statusCode < 500) {
124+
const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
125+
logger.debug(`error-sync: ${errmsg1}`)
126+
}
127+
else {
128+
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
129+
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
130+
}
131+
resolve("done")
132+
});
133+
}) //end
134+
}
135+
136+
}
104137

105138
/**
106139
* Initialize kafka consumer

src/producer.js

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,35 @@ const logger = require('./common/logger')
77
const pushToKafka = require('./services/pushToKafka')
88
const pushToDynamoDb = require('./services/pushToDynamoDb')
99
const pgOptions = config.get('POSTGRES')
10+
const postMessage = require('./services/posttoslack')
1011
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1112
const pgClient = new pg.Client(pgConnectionString)
1213
const auditTrail = require('./services/auditTrail');
1314
const express = require('express')
1415
const app = express()
1516
const port = 3000
1617
const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false
18+
1719
async function setupPgClient() {
18-
try {
20+
var payloadcopy
21+
try {
1922
await pgClient.connect()
2023
for (const triggerFunction of pgOptions.triggerFunctions) {
2124
await pgClient.query(`LISTEN ${triggerFunction}`)
2225
}
2326
pgClient.on('notification', async (message) => {
2427
try {
28+
payloadcopy = ""
2529
const payload = JSON.parse(message.payload)
30+
payloadcopy = message
2631
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
2732
if (validTopicAndOriginator) {
2833
if (!isFailover) {
29-
logger.info('trying to push on kafka topic')
34+
//logger.info('trying to push on kafka topic')
3035
await pushToKafka(payload)
31-
logger.info('pushed to kafka and added for audit trail')
36+
logger.info('Push to kafka and added for audit trail')
3237
} else {
33-
logger.info('taking backup on dynamodb for reconciliation')
38+
logger.info('Push to dynamodb for reconciliation')
3439
await pushToDynamoDb(payload)
3540
}
3641
audit(message)
@@ -41,16 +46,20 @@ async function setupPgClient() {
4146
} catch (error) {
4247
logger.error('Could not parse message payload')
4348
logger.debug(`error-sync: producer parse message : "${error.message}"`)
44-
logger.logFullError(error)
49+
const errmsg1 = `postgres-ifx-processor: producer or dd : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"`
50+
logger.logFullError(error)
4551
audit(error)
4652
// push to slack - alertIt("slack message"
53+
await callposttoslack(errmsg1)
4754
}
4855
})
4956
logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.')
5057
} catch (err) {
51-
logger.error('Error in setting up postgres client: ', err.message)
58+
const errmsg = `postgres-ifx-processor: producer or dd : Error in setting up postgres client: "${err.message}"`
59+
logger.error(errmsg)
5260
logger.logFullError(err)
5361
// push to slack - alertIt("slack message")
62+
await callposttoslack(errmsg)
5463
terminate()
5564
}
5665
}
@@ -62,6 +71,28 @@ async function run() {
6271
await setupPgClient()
6372
}
6473

74+
async function callposttoslack(slackmessage) {
75+
if(config.SLACK.SLACKNOTIFY === 'true') {
76+
return new Promise(function (resolve, reject) {
77+
postMessage(slackmessage, (response) => {
78+
console.log(`respnse : ${response}`)
79+
if (response.statusCode < 400) {
80+
logger.debug('Message posted successfully');
81+
//callback(null);
82+
} else if (response.statusCode < 500) {
83+
const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
84+
logger.debug(`error-sync: ${errmsg1}`)
85+
}
86+
else {
87+
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
88+
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
89+
}
90+
resolve("done")
91+
});
92+
}) //end
93+
}
94+
95+
}
6596
// execute
6697
run()
6798

@@ -91,12 +122,6 @@ async function audit(message) {
91122
}
92123
}
93124

94-
function alertIt(message) {
95-
/**
96-
// call slack
97-
*/
98-
}
99-
100125
app.get('/health', (req, res) => {
101126
res.send('health ok')
102127
})

src/services/posttoslack.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const config = require('config');
2+
const zlib = require('zlib');
3+
const url = require('url');
4+
const https = require('https');
5+
hookUrl = config.SLACK.URL
6+
slackChannel = config.SLACK.SLACKCHANNEL
7+
async function postMessage(message, callback) {
8+
var slackMessage = {
9+
channel: `${slackChannel}`,
10+
text: `${message}`,
11+
}
12+
const body = JSON.stringify(slackMessage);
13+
const options = url.parse(hookUrl);
14+
options.method = 'POST';
15+
options.headers = {
16+
'Content-Type': 'application/json',
17+
'Content-Length': Buffer.byteLength(body),
18+
};
19+
return new Promise(function (resolve, reject) {
20+
const postReq = https.request(options, (res) => {
21+
const chunks = [];
22+
res.setEncoding('utf8');
23+
res.on('data', (chunk) => chunks.push(chunk));
24+
res.on('end', () => {
25+
if (callback) {
26+
callback({
27+
body: chunks.join(''),
28+
statusCode: res.statusCode,
29+
statusMessage: res.statusMessage,
30+
});
31+
}
32+
resolve(res)
33+
});
34+
// return res;
35+
});
36+
37+
postReq.write(body);
38+
postReq.end();
39+
})
40+
}
41+
42+
module.exports = postMessage

0 commit comments

Comments
 (0)