diff --git a/.circleci/config.yml b/.circleci/config.yml index c02740d..ca3986b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -103,6 +103,7 @@ workflows: branches: only: - dev-test-pg + - dev-test-pg-rf - "build-prod": context : org-global filters: diff --git a/package.json b/package.json index 3019890..f026db9 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "lint:fix": "standard --env mocha --fix", "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", - "producer_dd": "node ./src/producer_dd.js", + "producer_dd": "node ./src/producer.js failover", "start": "npm run producer & npm run producer_dd & npm run consumer" }, "author": "Topcoder", diff --git a/src/producer.js b/src/producer.js index 6a5be44..85d3f53 100644 --- a/src/producer.js +++ b/src/producer.js @@ -5,6 +5,7 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') +const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) @@ -12,71 +13,92 @@ const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) -var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { -//console.log(`${pgOptions.triggerFunctions}`); +const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false +async function setupPgClient() { + try { await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - //console.log(message); - try - { + try { const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToKafka(payload) - } else { + if (!isFailover) { + logger.info('trying to push on kafka topic') + await pushToKafka(payload) + logger.info('pushed to kafka and added for audit trail') + } else { + logger.info('taking backup on dynamodb for reconciliation') + await pushToDynamoDb(payload) + } + audit(message) + } else { logger.debug('Ignoring message with incorrect topic or originator') - + // push to slack - alertIt("slack message") } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { + } catch (error) { logger.error('Could not parse message payload') - logger.debug(`error-sync: producer parse message : "${error.message}"`) -await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) + logger.debug(`error-sync: producer parse message : "${error.message}"`) + logger.logFullError(error) + audit(error) + // push to slack - alertIt("slack message" } }) - logger.info('pg-ifx-sync producer: Listening to notifications') + logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.') } catch (err) { - logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) - logger.error('Could not setup postgres client') + logger.error('Error in setting up postgres client: ', err.message) logger.logFullError(err) - + // push to slack - alertIt("slack message") terminate() } } + const terminate = () => process.exit() -async function run () { -try { - await setupPgClient() -} -catch(err) -{ -logger.debug(`Could not setup postgres client`) -logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) -} + +async function run() { + logger.debug("Initialising producer setup...") + await setupPgClient() } +// execute run() +async function audit(message) { + const pl_processid = message.processId + if (pl_processid != 'undefined') { + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic // TODO can move in config ? + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') + } else { + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer') + } + } +} + +function alertIt(message) { + /** + // call slack + */ +} + app.get('/health', (req, res) => { - res.send('health ok') + res.send('health ok') }) app.listen(port, () => console.log(`app listening on port ${port}!`)) diff --git a/src/producer_dd.js b/src/producer_dd.js deleted file mode 100644 index 46226d6..0000000 --- a/src/producer_dd.js +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Listens to DML trigger notifications from postgres and pushes the trigger data into kafka - */ -const config = require('config') -const pg = require('pg') -const logger = require('./common/logger') -//const pushToKafka = require('./services/pushToKafka') -const pushToDynamoDb = require('./services/pushToDynamoDb') -const pgOptions = config.get('POSTGRES') -const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -const pgClient = new pg.Client(pgConnectionString) -const auditTrail = require('./services/auditTrail'); -const express = require('express') -const app = express() -const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) -var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { - await pgClient.connect() - for (const triggerFunction of pgOptions.triggerFunctions) { - await pgClient.query(`LISTEN ${triggerFunction}`) - } - pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - try - { - const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator - if (validTopicAndOriginator) { - logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToDynamoDb(payload) - } else { - logger.debug('Ignoring message with incorrect topic or originator') - } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-DynamoDb","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { - logger.debug(`error-sync: producer_dynamoDb parse message : "${error.message}"`) - logger.error('Could not parse message payload') -await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation',"error-DynamoDB","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) - } - }) - logger.info('Producer DynamoDb: Listening to notifications') - } catch (err) { - logger.error('Could not setup postgres client') - logger.debug(`error-sync: producer_dd postgres-setup 1 :"${err.message}"`) - //setup slack alert here - logger.logFullError(err) - terminate() - } -} -const terminate = () => process.exit() -async function run () { -try{ - await setupPgClient() -} -catch(err){ -logger.debug(`Producer_dd: Could not setup postgres client`) -logger.debug(`error-sync: producer_dynamoDb postgres-setup 0 :"${err.message}"`) -//setup slackmessage here -}} - -run() - -app.get('/health', (req, res) => { - res.send('health ok') -}) -app.listen(port, () => console.log(`app listening on port ${port}!`)) -