Skip to content

Commit c5b829b

Browse files
author
root
committed
kafka audit update
1 parent 43fd410 commit c5b829b

File tree

5 files changed

+104
-51
lines changed

5 files changed

+104
-51
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ env_producer.sh
55
env_consumer.sh
66
*.env
77
*.sh
8+
*.list

src/consumer.js

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ const logger = require('./common/logger')
77
const updateInformix = require('./services/updateInformix')
88
const pushToKafka = require('./services/pushToKafka')
99
const healthcheck = require('topcoder-healthcheck-dropin');
10+
const auditTrail = require('./services/auditTrail');
1011
const kafkaOptions = config.get('KAFKA')
11-
//const sleep = require('sleep');
1212
const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key
1313
const consumer = new Kafka.SimpleConsumer({
1414
connectionString: kafkaOptions.brokers_url,
@@ -21,17 +21,17 @@ const consumer = new Kafka.SimpleConsumer({
2121
})
2222

2323

24-
const check = function () {
25-
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
26-
return false;
27-
}
28-
let connected = true;
29-
consumer.client.initialBrokers.forEach(conn => {
30-
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
31-
connected = conn.connected & connected;
32-
});
33-
return connected;
34-
};
24+
const check = function () {
25+
if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) {
26+
return false;
27+
}
28+
let connected = true;
29+
consumer.client.initialBrokers.forEach(conn => {
30+
logger.debug(`url ${conn.server()} - connected=${conn.connected}`);
31+
connected = conn.connected & connected;
32+
});
33+
return connected;
34+
};
3535

3636

3737
const terminate = () => process.exit()
@@ -41,39 +41,54 @@ const terminate = () => process.exit()
4141
* @param {String} topic The name of the message topic
4242
* @param {Number} partition The kafka partition to which messages are written
4343
*/
44-
async function dataHandler (messageSet, topic, partition) {
44+
async function dataHandler(messageSet, topic, partition) {
4545
for (const m of messageSet) { // Process messages sequentially
46+
let message
4647
try {
47-
const payload = JSON.parse(m.message.value)
48-
logger.debug('Received payload from kafka:')
49-
logger.debug(payload)
50-
await updateInformix(payload)
51-
// await insertConsumerAudit(payload, true, undefined, false)
48+
message = JSON.parse(m.message.value)
49+
logger.debug('Received message from kafka:')
50+
logger.debug(JSON.stringify(message))
51+
await updateInformix(message)
5252
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success
53+
await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn,
54+
message.payload.operation,1,0,"",message.timestamp,new Date(),""],'consumer')
5355
} catch (err) {
5456
logger.error('Could not process kafka message')
55-
logger.logFullError(err)
56-
if (!payload.retryCount) {
57-
payload.retryCount = 0
58-
}
59-
if (payload.retryCount >= config.KAFKA.maxRetry) {
60-
await pushToKafka(
61-
Object.assign({}, payload, { topic: config.KAFKA.errorTopic, recipients: config.KAFKA.recipients })
62-
)
63-
return
64-
}
65-
await pushToKafka(
66-
Object.assign({}, payload, { retryCount: payload.retryCount + 1 })
67-
)
57+
//logger.logFullError(err)
58+
try {
59+
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
60+
logger.debug('Trying to push same message after adding retryCounter')
61+
if (!message.payload.retryCount) {
62+
message.payload.retryCount = 0
63+
logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry);
64+
}
65+
if (message.payload.retryCount >= config.KAFKA.maxRetry) {
66+
logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic);
67+
68+
let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic })
69+
notifiyMessage.payload['recipients'] = config.KAFKA.recipients
70+
logger.debug('pushing following message on kafka error alert queue:')
71+
logger.debug(notifiyMessage)
72+
await pushToKafka(notifiyMessage)
73+
return
74+
}
75+
message.payload['retryCount'] = message.payload.retryCount + 1;
76+
await pushToKafka(message)
77+
logger.debug('pushed same message after adding retryCount')
78+
} catch (err) {
79+
//await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn,
80+
// payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer')
81+
logger.error("Error occured in re-publishing kafka message", err)
6882
}
6983
}
7084
}
85+
}
7186

7287

7388
/**
7489
* Initialize kafka consumer
7590
*/
76-
async function setupKafkaConsumer () {
91+
async function setupKafkaConsumer() {
7792
try {
7893
await consumer.init()
7994
await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler)
@@ -87,4 +102,3 @@ async function setupKafkaConsumer () {
87102
}
88103

89104
setupKafkaConsumer()
90-

src/producer.js

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const pushToKafka = require('./services/pushToKafka')
88
const pgOptions = config.get('POSTGRES')
99
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1010
const pgClient = new pg.Client(pgConnectionString)
11-
11+
const auditTrail = require('./services/auditTrail');
1212
const express = require('express')
1313
const app = express()
1414
const port = 3000
@@ -17,28 +17,26 @@ const port = 3000
1717
async function setupPgClient () {
1818
try {
1919
await pgClient.connect()
20-
// Listen to each of the trigger functions
2120
for (const triggerFunction of pgOptions.triggerFunctions) {
2221
await pgClient.query(`LISTEN ${triggerFunction}`)
2322
}
2423
pgClient.on('notification', async (message) => {
25-
console.log('Received trigger payload:')
26-
logger.debug(`Received trigger payload:`)
27-
logger.debug(message)
28-
//console.log(message)
2924
try {
3025
const payload = JSON.parse(message.payload)
31-
console.log("level 0",payload);
3226
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
3327
if (validTopicAndOriginator) {
34-
// await pushToKafka(payload)
28+
console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`);
3529
await pushToKafka(payload)
36-
} else {
30+
} else {
3731
logger.debug('Ignoring message with incorrect topic or originator')
3832
}
39-
} catch (err) {
33+
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
34+
payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
35+
} catch (error) {
4036
logger.error('Could not parse message payload')
41-
logger.logFullError(err)
37+
await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn,
38+
payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer')
39+
logger.logFullError(error)
4240
}
4341
})
4442
logger.info('Listening to notifications')

src/services/auditTrail.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
const config = require('config')
2+
const pg = require('pg')
3+
const logger = require('../common/logger')
4+
5+
const pgOptions = config.get('POSTGRES')
6+
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
7+
let pgClient2
8+
9+
async function setupPgClient2 () {
10+
pgClient2 = new pg.Client(pgConnectionString)
11+
try {
12+
await pgClient2.connect()
13+
logger.debug('Connected to Pg Client2 Audit:')
14+
}
15+
catch (err) {
16+
logger.error('Could not setup postgres client2')
17+
logger.logFullError(err)
18+
process.exit()
19+
}
20+
}
21+
22+
async function auditTrail (data,sourcetype) {
23+
if (!pgClient2) {
24+
await setupPgClient2()
25+
}
26+
if (sourcetype === 'producer'){
27+
sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
28+
logger.debug(`--Audit Trail update producer--`)
29+
} else {
30+
sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)'
31+
logger.debug(`--Audit Trail update consumer--`)
32+
}
33+
return pgClient2.query(sql, data, (err, res) => {
34+
if (err) {
35+
logger.debug(`--Audit Trail update error-- ${err.stack}`)
36+
//pgClient2.end()
37+
} else {
38+
logger.debug(`--Audit Trail update success-- `)
39+
}
40+
})
41+
}
42+
43+
44+
module.exports = auditTrail
45+

src/services/updateInformix.js

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,10 @@ const logger = require('../common/logger')
77
* @param {Object} payload The DML trigger data
88
*/
99
async function updateInformix (payload) {
10-
logger.debug('Starting to update informix with data:')
11-
logger.debug(payload)
12-
if (payload.payload.table === 'scorecard_question'){
13-
logger.debug('inside scorecard_question')
14-
sleep.sleep(2);
15-
}
10+
logger.debug('=====Starting to update informix with data:====')
1611
//const operation = payload.operation.toLowerCase()
1712
const operation = payload.payload.operation.toLowerCase()
18-
console.log("level producer1 ",operation)
13+
console.log("level 1 informix ",operation)
1914
let sql = null
2015

2116
const columns = payload.payload.data

0 commit comments

Comments
 (0)