Skip to content

Commit b694eb6

Browse files
Merge pull request #28 from topcoder-platform/dev
initialing direct kafka producer
2 parents 140ac5c + 54cd8d5 commit b694eb6

File tree

1 file changed

+6
-8
lines changed

1 file changed

+6
-8
lines changed

src/consumer.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ const config = require('config')
55
const Kafka = require('no-kafka')
66
const logger = require('./common/logger')
77
const updateInformix = require('./services/updateInformix')
8-
const pushToKafka = require('./services/pushToKafka')
98
const healthcheck = require('topcoder-healthcheck-dropin');
109
const auditTrail = require('./services/auditTrail');
1110
const kafkaOptions = config.get('KAFKA')
@@ -60,13 +59,6 @@ async function dataHandler(messageSet, topic, partition) {
6059
logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `);
6160
//await updateInformix(message)
6261
ifxstatus = await updateInformix(message)
63-
// if (ifxstatus === 0 && `${message.payload.operation}` === 'INSERT') {
64-
// logger.debug(`operation : ${message.payload.operation}`)
65-
// logger.debug(`Consumer :informixt status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus} - Retrying`)
66-
// auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
67-
// message.payload.operation, "push-to-kafka", retryvar, "", "", JSON.stringify(message), new Date(), message.topic], 'consumer')
68-
// await retrypushtokakfa(message, topic, m, partition)
69-
//} else {
7062
logger.debug(`Consumer :informix status for ${message.payload.table} ${message.payload.payloadseqid} : ${ifxstatus}`)
7163
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
7264
await auditTrail([cs_payloadseqid, cs_processId, message.payload.table, message.payload.Uniquecolumn,
@@ -163,10 +155,16 @@ async function setupKafkaConsumer() {
163155
await consumer.init(strategies)
164156
logger.info('Initialized kafka consumer')
165157
healthcheck.init([check])
158+
kafkaService.init().catch(async (e) => {
159+
logger.error(`Kafka producer intialization error: "${e}"`)
160+
await callposttoslack(`error-sync: postgres-ifx-processor : consumer Kafka producer intialization : ${e}`)
161+
terminate()
162+
})
166163
} catch (err) {
167164
logger.error('Could not setup kafka consumer')
168165
logger.logFullError(err)
169166
logger.debug(`error-sync: consumer kafka-setup "${err.message}"`)
167+
await callposttoslack(`error-sync: postgres-ifx-processor : consumer kafka-setup : ${err.message}`)
170168
terminate()
171169
}
172170
}

0 commit comments

Comments
 (0)