Skip to content

Commit 5865d47

Browse files
author
Sachin Maheshwari
committed
refactoring code
1 parent a045399 commit 5865d47

File tree

3 files changed

+50
-39
lines changed

3 files changed

+50
-39
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ workflows:
103103
branches:
104104
only:
105105
- dev-test-pg
106+
- dev-test-pg-rf
106107
- "build-prod":
107108
context : org-global
108109
filters:

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"lint:fix": "standard --env mocha --fix",
99
"producer": "node ./src/producer.js",
1010
"consumer": "node ./src/consumer.js",
11-
"producer_dd": "node ./src/producer_dd.js",
11+
"producer_dd": "node ./src/producer.js failover",
1212
"start": "npm run producer & npm run producer_dd & npm run consumer"
1313
},
1414
"author": "Topcoder",

src/producer.js

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,78 +5,88 @@ const config = require('config')
55
const pg = require('pg')
66
const logger = require('./common/logger')
77
const pushToKafka = require('./services/pushToKafka')
8+
const pushToDynamoDb = require('./services/pushToDynamoDb')
89
const pgOptions = config.get('POSTGRES')
910
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1011
const pgClient = new pg.Client(pgConnectionString)
1112
const auditTrail = require('./services/auditTrail');
1213
const express = require('express')
1314
const app = express()
1415
const port = 3000
15-
//console.log(`pgConnectionString value = ${pgConnectionString}`)
16+
const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false
1617
var pl_processid;
17-
var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);
18-
async function setupPgClient () {
19-
try {
20-
//console.log(`${pgOptions.triggerFunctions}`);
18+
//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
19+
async function setupPgClient() {
20+
try {
2121
await pgClient.connect()
2222
for (const triggerFunction of pgOptions.triggerFunctions) {
2323
await pgClient.query(`LISTEN ${triggerFunction}`)
2424
}
2525
pgClient.on('notification', async (message) => {
26-
//const payload = JSON.parse(message.payload);
27-
pl_processid = message.processId;
28-
//console.log(message);
29-
try
30-
{
26+
// need to take care if empty message coming
27+
pl_processid = message.processId
28+
try {
3129
const payload = JSON.parse(message.payload)
32-
var pl_seqid = payload.payload.payloadseqid
33-
var pl_topic = payload.topic
34-
var pl_table = payload.payload.table
35-
var pl_uniquecolumn = payload.payload.Uniquecolumn
36-
var pl_operation = payload.payload.operation
37-
var pl_timestamp = payload.timestamp
38-
var pl_payload = JSON.stringify(payload.payload)
39-
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
30+
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
4031
if (validTopicAndOriginator) {
41-
logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`);
42-
await pushToKafka(payload)
43-
} else {
32+
if (isFailover) {
33+
await pushToDynamoDb(payload)
34+
} else {
35+
await pushToKafka(payload)
36+
audit(payload)
37+
}
38+
} else {
4439
logger.debug('Ignoring message with incorrect topic or originator')
45-
40+
// push to slack - alertIt("slack message")
4641
}
47-
await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer')
48-
} catch (error) {
42+
} catch (error) {
4943
logger.error('Could not parse message payload')
50-
logger.debug(`error-sync: producer parse message : "${error.message}"`)
51-
await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer')
52-
logger.logFullError(error)
44+
logger.debug(`error-sync: producer parse message : "${error.message}"`)
45+
//await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer')
46+
logger.logFullError(error)
47+
// push to slack - alertIt("slack message"
5348
}
5449
})
5550
logger.info('pg-ifx-sync producer: Listening to notifications')
5651
} catch (err) {
57-
logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`)
52+
logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`)
5853
logger.error('Could not setup postgres client')
5954
logger.logFullError(err)
6055

6156
terminate()
6257
}
6358
}
59+
6460
const terminate = () => process.exit()
65-
async function run () {
66-
try {
67-
await setupPgClient()
68-
}
69-
catch(err)
70-
{
71-
logger.debug(`Could not setup postgres client`)
72-
logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`)
73-
}
61+
62+
async function run() {
63+
try {
64+
await setupPgClient()
65+
}
66+
catch (err) {
67+
logger.debug(`Could not setup postgres client`)
68+
logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`)
69+
terminate()
70+
}
7471
}
7572

73+
// execute
7674
run()
7775

76+
async function audit() {
77+
var pl_seqid = payload.payload.payloadseqid
78+
var pl_topic = payload.topic
79+
var pl_table = payload.payload.table
80+
var pl_uniquecolumn = payload.payload.Uniquecolumn
81+
var pl_operation = payload.payload.operation
82+
var pl_timestamp = payload.timestamp
83+
var pl_payload = JSON.stringify(payload.payload)
84+
logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`);
85+
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer')
86+
}
87+
7888
app.get('/health', (req, res) => {
79-
res.send('health ok')
89+
res.send('health ok')
8090
})
8191
app.listen(port, () => console.log(`app listening on port ${port}!`))
8292

0 commit comments

Comments
 (0)