Skip to content

Commit 2994830

Browse files
author
Sachin Maheshwari
committed
cleaning...
1 parent ca055bd commit 2994830

File tree

1 file changed

+11
-13
lines changed

1 file changed

+11
-13
lines changed

src/producer.js

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,21 @@ const express = require('express')
1414
const app = express()
1515
const port = 3000
1616
const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false
17-
var pl_processid;
18-
//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
1917
async function setupPgClient() {
2018
try {
2119
await pgClient.connect()
2220
for (const triggerFunction of pgOptions.triggerFunctions) {
2321
await pgClient.query(`LISTEN ${triggerFunction}`)
2422
}
2523
pgClient.on('notification', async (message) => {
26-
// need to take care if empty message coming
27-
pl_processid = message.processId
2824
try {
2925
const payload = JSON.parse(message.payload)
3026
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
3127
if (validTopicAndOriginator) {
3228
if (!isFailover) {
3329
logger.info('trying to push on kafka topic')
3430
await pushToKafka(payload)
35-
audit(payload)
31+
audit(message)
3632
logger.info('pushed to kafka and added for audit trail')
3733
} else {
3834
logger.info('taking backup on dynamodb for reconciliation')
@@ -70,14 +66,16 @@ async function run() {
7066
// execute
7167
run()
7268

73-
async function audit() {
74-
var pl_seqid = payload.payload.payloadseqid
75-
var pl_topic = payload.topic
76-
var pl_table = payload.payload.table
77-
var pl_uniquecolumn = payload.payload.Uniquecolumn
78-
var pl_operation = payload.payload.operation
79-
var pl_timestamp = payload.timestamp
80-
var pl_payload = JSON.stringify(payload.payload)
69+
async function audit(message) {
70+
const pl_processid = message.processId
71+
const payload = JSON.parse(message.payload)
72+
const pl_seqid = payload.payload.payloadseqid
73+
const pl_topic = payload.topic
74+
const pl_table = payload.payload.table
75+
const pl_uniquecolumn = payload.payload.Uniquecolumn
76+
const pl_operation = payload.payload.operation
77+
const pl_timestamp = payload.timestamp
78+
const pl_payload = JSON.stringify(payload.payload)
8179
logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`);
8280
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer')
8381
}

0 commit comments

Comments
 (0)