From 5865d47f8d79fb8d3607e1b6893c3d65a3dcbc5f Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 16:36:39 +0530 Subject: [PATCH 1/6] refactoring code --- .circleci/config.yml | 1 + package.json | 2 +- src/producer.js | 86 ++++++++++++++++++++++++-------------------- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c02740d..ca3986b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -103,6 +103,7 @@ workflows: branches: only: - dev-test-pg + - dev-test-pg-rf - "build-prod": context : org-global filters: diff --git a/package.json b/package.json index 3019890..f026db9 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "lint:fix": "standard --env mocha --fix", "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", - "producer_dd": "node ./src/producer_dd.js", + "producer_dd": "node ./src/producer.js failover", "start": "npm run producer & npm run producer_dd & npm run consumer" }, "author": "Topcoder", diff --git a/src/producer.js b/src/producer.js index 6a5be44..978537e 100644 --- a/src/producer.js +++ b/src/producer.js @@ -5,6 +5,7 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') +const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) @@ -12,71 +13,80 @@ const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) +const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { -//console.log(`${pgOptions.triggerFunctions}`); +//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) +async function setupPgClient() { + try { await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - //console.log(message); - try - { + // need to take care if empty message coming + pl_processid = message.processId + try { const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToKafka(payload) - } else { + if (isFailover) { + await pushToDynamoDb(payload) + } else { + await pushToKafka(payload) + audit(payload) + } + } else { logger.debug('Ignoring message with incorrect topic or originator') - + // push to slack - alertIt("slack message") } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { + } catch (error) { logger.error('Could not parse message payload') - logger.debug(`error-sync: producer parse message : "${error.message}"`) -await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) + logger.debug(`error-sync: producer parse message : "${error.message}"`) + //await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + logger.logFullError(error) + // push to slack - alertIt("slack message" } }) logger.info('pg-ifx-sync producer: Listening to notifications') } catch (err) { - logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) + logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) logger.error('Could not setup postgres client') logger.logFullError(err) terminate() } } + const terminate = () => process.exit() -async function run () { -try { - await setupPgClient() -} -catch(err) -{ -logger.debug(`Could not setup postgres client`) -logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) -} + +async function run() { + try { + await setupPgClient() + } + catch (err) { + logger.debug(`Could not setup postgres client`) + logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) + terminate() + } } +// execute run() +async function audit() { + var pl_seqid = payload.payload.payloadseqid + var pl_topic = payload.topic + var pl_table = payload.payload.table + var pl_uniquecolumn = payload.payload.Uniquecolumn + var pl_operation = payload.payload.operation + var pl_timestamp = payload.timestamp + var pl_payload = JSON.stringify(payload.payload) + logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') +} + app.get('/health', (req, res) => { - res.send('health ok') + res.send('health ok') }) app.listen(port, () => console.log(`app listening on port ${port}!`)) From 02f5826f70ba68a0f31a4d11557ad72cb85dea31 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 16:57:39 +0530 Subject: [PATCH 2/6] cleaning... --- src/producer.js | 11 ++++--- src/producer_dd.js | 79 ---------------------------------------------- 2 files changed, 6 insertions(+), 84 deletions(-) delete mode 100644 src/producer_dd.js diff --git a/src/producer.js b/src/producer.js index 978537e..a05a7dc 100644 --- a/src/producer.js +++ b/src/producer.js @@ -29,11 +29,11 @@ async function setupPgClient() { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - if (isFailover) { - await pushToDynamoDb(payload) - } else { + if (!isFailover) { await pushToKafka(payload) audit(payload) + } else { + await pushToDynamoDb(payload) } } else { logger.debug('Ignoring message with incorrect topic or originator') @@ -42,7 +42,9 @@ async function setupPgClient() { } catch (error) { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) - //await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + } logger.logFullError(error) // push to slack - alertIt("slack message" } @@ -52,7 +54,6 @@ async function setupPgClient() { logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) logger.error('Could not setup postgres client') logger.logFullError(err) - terminate() } } diff --git a/src/producer_dd.js b/src/producer_dd.js deleted file mode 100644 index 46226d6..0000000 --- a/src/producer_dd.js +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Listens to DML trigger notifications from postgres and pushes the trigger data into kafka - */ -const config = require('config') -const pg = require('pg') -const logger = require('./common/logger') -//const pushToKafka = require('./services/pushToKafka') -const pushToDynamoDb = require('./services/pushToDynamoDb') -const pgOptions = config.get('POSTGRES') -const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -const pgClient = new pg.Client(pgConnectionString) -const auditTrail = require('./services/auditTrail'); -const express = require('express') -const app = express() -const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) -var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { - await pgClient.connect() - for (const triggerFunction of pgOptions.triggerFunctions) { - await pgClient.query(`LISTEN ${triggerFunction}`) - } - pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - try - { - const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator - if (validTopicAndOriginator) { - logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToDynamoDb(payload) - } else { - logger.debug('Ignoring message with incorrect topic or originator') - } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-DynamoDb","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { - logger.debug(`error-sync: producer_dynamoDb parse message : "${error.message}"`) - logger.error('Could not parse message payload') -await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation',"error-DynamoDB","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) - } - }) - logger.info('Producer DynamoDb: Listening to notifications') - } catch (err) { - logger.error('Could not setup postgres client') - logger.debug(`error-sync: producer_dd postgres-setup 1 :"${err.message}"`) - //setup slack alert here - logger.logFullError(err) - terminate() - } -} -const terminate = () => process.exit() -async function run () { -try{ - await setupPgClient() -} -catch(err){ -logger.debug(`Producer_dd: Could not setup postgres client`) -logger.debug(`error-sync: producer_dynamoDb postgres-setup 0 :"${err.message}"`) -//setup slackmessage here -}} - -run() - -app.get('/health', (req, res) => { - res.send('health ok') -}) -app.listen(port, () => console.log(`app listening on port ${port}!`)) - From ca055bd5fc5280f035d8ec86e76bb835c477960f Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 17:47:27 +0530 Subject: [PATCH 3/6] cleaning.... --- src/producer.js | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/producer.js b/src/producer.js index a05a7dc..b43b7a0 100644 --- a/src/producer.js +++ b/src/producer.js @@ -30,9 +30,12 @@ async function setupPgClient() { const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { if (!isFailover) { + logger.info('trying to push on kafka topic') await pushToKafka(payload) audit(payload) + logger.info('pushed to kafka and added for audit trail') } else { + logger.info('taking backup on dynamodb for reconciliation') await pushToDynamoDb(payload) } } else { @@ -42,17 +45,16 @@ async function setupPgClient() { } catch (error) { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) + logger.logFullError(error) if (!isFailover) { await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') } - logger.logFullError(error) // push to slack - alertIt("slack message" } }) - logger.info('pg-ifx-sync producer: Listening to notifications') + logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.') } catch (err) { - logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) - logger.error('Could not setup postgres client') + logger.error('Error in setting up postgres client: ', err.message) logger.logFullError(err) terminate() } @@ -61,14 +63,8 @@ async function setupPgClient() { const terminate = () => process.exit() async function run() { - try { - await setupPgClient() - } - catch (err) { - logger.debug(`Could not setup postgres client`) - logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) - terminate() - } + logger.debug("Initialising producer setup...") + await setupPgClient() } // execute From 2994830597c39117dd0f975ecc825bddffa3d256 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 18:07:11 +0530 Subject: [PATCH 4/6] cleaning... --- src/producer.js | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/producer.js b/src/producer.js index b43b7a0..d5563de 100644 --- a/src/producer.js +++ b/src/producer.js @@ -14,8 +14,6 @@ const express = require('express') const app = express() const port = 3000 const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false -var pl_processid; -//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) async function setupPgClient() { try { await pgClient.connect() @@ -23,8 +21,6 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - // need to take care if empty message coming - pl_processid = message.processId try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -32,7 +28,7 @@ async function setupPgClient() { if (!isFailover) { logger.info('trying to push on kafka topic') await pushToKafka(payload) - audit(payload) + audit(message) logger.info('pushed to kafka and added for audit trail') } else { logger.info('taking backup on dynamodb for reconciliation') @@ -70,14 +66,16 @@ async function run() { // execute run() -async function audit() { - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) +async function audit(message) { + const pl_processid = message.processId + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } From 3802ff4c8062f597b4927e623052569d3e11fddf Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 19:01:18 +0530 Subject: [PATCH 5/6] cleaning... --- src/producer.js | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/producer.js b/src/producer.js index d5563de..e6fd61c 100644 --- a/src/producer.js +++ b/src/producer.js @@ -21,6 +21,7 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -28,12 +29,12 @@ async function setupPgClient() { if (!isFailover) { logger.info('trying to push on kafka topic') await pushToKafka(payload) - audit(message) logger.info('pushed to kafka and added for audit trail') } else { logger.info('taking backup on dynamodb for reconciliation') await pushToDynamoDb(payload) } + audit(message) } else { logger.debug('Ignoring message with incorrect topic or originator') // push to slack - alertIt("slack message") @@ -43,7 +44,7 @@ async function setupPgClient() { logger.debug(`error-sync: producer parse message : "${error.message}"`) logger.logFullError(error) if (!isFailover) { - await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", error.message, "", new Date(), ""], 'producer') } // push to slack - alertIt("slack message" } @@ -52,6 +53,7 @@ async function setupPgClient() { } catch (err) { logger.error('Error in setting up postgres client: ', err.message) logger.logFullError(err) + // push to slack - alertIt("slack message") terminate() } } @@ -70,16 +72,27 @@ async function audit(message) { const pl_processid = message.processId const payload = JSON.parse(message.payload) const pl_seqid = payload.payload.payloadseqid - const pl_topic = payload.topic + const pl_topic = payload.topic // TODO can move in config ? const pl_table = payload.payload.table const pl_uniquecolumn = payload.payload.Uniquecolumn const pl_operation = payload.payload.operation const pl_timestamp = payload.timestamp const pl_payload = JSON.stringify(payload.payload) - logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } +function alertIt(message) { + /** + // call slack + */ +} + app.get('/health', (req, res) => { res.send('health ok') }) From acc2e3f130367f7e9ef24903cae47b0280fafdb3 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 19:19:47 +0530 Subject: [PATCH 6/6] rearranging audit code.. --- src/producer.js | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/producer.js b/src/producer.js index e6fd61c..85d3f53 100644 --- a/src/producer.js +++ b/src/producer.js @@ -21,7 +21,6 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -43,9 +42,7 @@ async function setupPgClient() { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) logger.logFullError(error) - if (!isFailover) { - await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", error.message, "", new Date(), ""], 'producer') - } + audit(error) // push to slack - alertIt("slack message" } }) @@ -70,21 +67,28 @@ run() async function audit(message) { const pl_processid = message.processId - const payload = JSON.parse(message.payload) - const pl_seqid = payload.payload.payloadseqid - const pl_topic = payload.topic // TODO can move in config ? - const pl_table = payload.payload.table - const pl_uniquecolumn = payload.payload.Uniquecolumn - const pl_operation = payload.payload.operation - const pl_timestamp = payload.timestamp - const pl_payload = JSON.stringify(payload.payload) - const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` - if (!isFailover) { - logger.debug(`producer : ${logMessage}`); + if (pl_processid != 'undefined') { + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic // TODO can move in config ? + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } else { - logger.debug(`Producer DynamoDb : ${logMessage}`); + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer') + } } - auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } function alertIt(message) {