Skip to content

Commit 3802ff4

Browse files
author
Sachin Maheshwari
committed
cleaning...
1 parent 2994830 commit 3802ff4

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

src/producer.js

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@ async function setupPgClient() {
2121
await pgClient.query(`LISTEN ${triggerFunction}`)
2222
}
2323
pgClient.on('notification', async (message) => {
24+
const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
2425
try {
2526
const payload = JSON.parse(message.payload)
2627
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
2728
if (validTopicAndOriginator) {
2829
if (!isFailover) {
2930
logger.info('trying to push on kafka topic')
3031
await pushToKafka(payload)
31-
audit(message)
3232
logger.info('pushed to kafka and added for audit trail')
3333
} else {
3434
logger.info('taking backup on dynamodb for reconciliation')
3535
await pushToDynamoDb(payload)
3636
}
37+
audit(message)
3738
} else {
3839
logger.debug('Ignoring message with incorrect topic or originator')
3940
// push to slack - alertIt("slack message")
@@ -43,7 +44,7 @@ async function setupPgClient() {
4344
logger.debug(`error-sync: producer parse message : "${error.message}"`)
4445
logger.logFullError(error)
4546
if (!isFailover) {
46-
await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer')
47+
await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", error.message, "", new Date(), ""], 'producer')
4748
}
4849
// push to slack - alertIt("slack message"
4950
}
@@ -52,6 +53,7 @@ async function setupPgClient() {
5253
} catch (err) {
5354
logger.error('Error in setting up postgres client: ', err.message)
5455
logger.logFullError(err)
56+
// push to slack - alertIt("slack message")
5557
terminate()
5658
}
5759
}
@@ -70,16 +72,27 @@ async function audit(message) {
7072
const pl_processid = message.processId
7173
const payload = JSON.parse(message.payload)
7274
const pl_seqid = payload.payload.payloadseqid
73-
const pl_topic = payload.topic
75+
const pl_topic = payload.topic // TODO can move in config ?
7476
const pl_table = payload.payload.table
7577
const pl_uniquecolumn = payload.payload.Uniquecolumn
7678
const pl_operation = payload.payload.operation
7779
const pl_timestamp = payload.timestamp
7880
const pl_payload = JSON.stringify(payload.payload)
79-
logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`);
81+
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
82+
if (!isFailover) {
83+
logger.debug(`producer : ${logMessage}`);
84+
} else {
85+
logger.debug(`Producer DynamoDb : ${logMessage}`);
86+
}
8087
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer')
8188
}
8289

90+
function alertIt(message) {
91+
/**
92+
// call slack
93+
*/
94+
}
95+
8396
app.get('/health', (req, res) => {
8497
res.send('health ok')
8598
})

0 commit comments

Comments
 (0)