Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a96f4b4

Browse files
author
Sachin Maheshwari
committedJul 22, 2020
direct repush to kafka from consumer
1 parent 1494463 commit a96f4b4

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed
 

‎src/consumer.js

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const healthcheck = require('topcoder-healthcheck-dropin');
1010
const auditTrail = require('./services/auditTrail');
1111
const kafkaOptions = config.get('KAFKA')
1212
const postMessage = require('./services/posttoslack')
13+
const kafkaService = require('./services/pushToDirectKafka')
14+
1315
//const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
1416

1517
const options = {
@@ -50,25 +52,25 @@ async function dataHandler(messageSet, topic, partition) {
5052
let message
5153
let ifxstatus = 0
5254
try {
53-
// let ifxstatus = 0
55+
// let ifxstatus = 0
5456
let cs_payloadseqid;
5557
message = JSON.parse(m.message.value)
5658
//logger.debug(`Consumer Received from kafka :${JSON.stringify(message)}`)
5759
if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid;
5860
logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `);
5961
//await updateInformix(message)
6062
ifxstatus = await updateInformix(message)
61-
// if (ifxstatus === 0 && `${message.payload.operation}` === 'INSERT') {
62-
// logger.debug(`operation : ${message.payload.operation}`)
63-
// logger.debug(`Consumer :informixt status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus} - Retrying`)
64-
// auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
65-
// message.payload.operation, "push-to-kafka", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer')
63+
// if (ifxstatus === 0 && `${message.payload.operation}` === 'INSERT') {
64+
// logger.debug(`operation : ${message.payload.operation}`)
65+
// logger.debug(`Consumer :informixt status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus} - Retrying`)
66+
// auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
67+
// message.payload.operation, "push-to-kafka", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer')
6668
// await retrypushtokakfa(message, topic, m, partition)
6769
//} else {
68-
logger.debug(`Consumer :informix status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`)
69-
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
70-
await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
71-
message.payload.operation, "Informix-updated", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer')
70+
logger.debug(`Consumer :informix status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`)
71+
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
72+
await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
73+
message.payload.operation, "Informix-updated", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer')
7274
//}
7375
} catch (err) {
7476
logger.debug(`Consumer:ifx return status error for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`)
@@ -99,16 +101,18 @@ async function retrypushtokakfa(message, topic, m, partition) {
99101
if (message.payload.retryCount >= config.KAFKA.maxRetry) {
100102
logger.debug('Reached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);
101103
logger.debug(`error-sync: consumer max-retry-limit reached`)
102-
await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`)
104+
//await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`)
103105
let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
104106
notifiyMessage.payload['recipients'] = config.KAFKA.recipients
105107
logger.debug('pushing following message on kafka error alert queue:')
106108
//retry push to error topic kafka again
107-
await pushToKafka(notifiyMessage)
109+
//await pushToKafka(notifiyMessage)
110+
await kafkaService.pushToKafka(notifiyMessage)
108111
return
109112
}
110113
message.payload['retryCount'] = message.payload.retryCount + 1;
111-
await pushToKafka(message)
114+
//await pushToKafka(message)
115+
kafkaService.pushToKafka(message)
112116
var errmsg9 = `consumer : Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"`
113117
logger.debug(errmsg9)
114118
}

‎src/services/updateInformix.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const logger = require('../common/logger')
44

55
String.prototype.escapeSpecialChars = function () {
66
return this.replace(/\n/g, "\\n")
7-
.replace(/\r/g, "\\r");
7+
.replace(/\r/g, "\\r");
88
};
99

1010
async function updateInformix(payload) {
@@ -45,6 +45,7 @@ async function updateInformix(payload) {
4545
t0.forEach((name, index) => t0[index] = `${name.escapeSpecialChars()}`);
4646
//logger.debug(`Param values : ${t0}`);
4747
let temp1 = "[" + `${t0}` + "]"
48+
logger.debug(`preparing json before parsing: ${temp1}`)
4849
let finalparam = JSON.parse(temp1)
4950

5051
/*console.log(`Typeof finalparam : ${typeof(finalparam)}`)

0 commit comments

Comments
 (0)
Please sign in to comment.