Skip to content

Dev test pg rf #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ workflows:
branches:
only:
- dev-test-pg
- dev-test-pg-rf
- "build-prod":
context : org-global
filters:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"lint:fix": "standard --env mocha --fix",
"producer": "node ./src/producer.js",
"consumer": "node ./src/consumer.js",
"producer_dd": "node ./src/producer_dd.js",
"producer_dd": "node ./src/producer.js failover",
"start": "npm run producer & npm run producer_dd & npm run consumer"
},
"author": "Topcoder",
Expand Down
106 changes: 64 additions & 42 deletions src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,78 +5,100 @@ const config = require('config')
const pg = require('pg')
const logger = require('./common/logger')
const pushToKafka = require('./services/pushToKafka')
const pushToDynamoDb = require('./services/pushToDynamoDb')
const pgOptions = config.get('POSTGRES')
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
const pgClient = new pg.Client(pgConnectionString)
const auditTrail = require('./services/auditTrail');
const express = require('express')
const app = express()
const port = 3000
//console.log(`pgConnectionString value = ${pgConnectionString}`)
var pl_processid;
var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);
async function setupPgClient () {
try {
//console.log(`${pgOptions.triggerFunctions}`);
const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false
async function setupPgClient() {
try {
await pgClient.connect()
for (const triggerFunction of pgOptions.triggerFunctions) {
await pgClient.query(`LISTEN ${triggerFunction}`)
}
pgClient.on('notification', async (message) => {
//const payload = JSON.parse(message.payload);
pl_processid = message.processId;
//console.log(message);
try
{
try {
const payload = JSON.parse(message.payload)
var pl_seqid = payload.payload.payloadseqid
var pl_topic = payload.topic
var pl_table = payload.payload.table
var pl_uniquecolumn = payload.payload.Uniquecolumn
var pl_operation = payload.payload.operation
var pl_timestamp = payload.timestamp
var pl_payload = JSON.stringify(payload.payload)
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator
if (validTopicAndOriginator) {
logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`);
await pushToKafka(payload)
} else {
if (!isFailover) {
logger.info('trying to push on kafka topic')
await pushToKafka(payload)
logger.info('pushed to kafka and added for audit trail')
} else {
logger.info('taking backup on dynamodb for reconciliation')
await pushToDynamoDb(payload)
}
audit(message)
} else {
logger.debug('Ignoring message with incorrect topic or originator')

// push to slack - alertIt("slack message")
}
await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer')
} catch (error) {
} catch (error) {
logger.error('Could not parse message payload')
logger.debug(`error-sync: producer parse message : "${error.message}"`)
await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer')
logger.logFullError(error)
logger.debug(`error-sync: producer parse message : "${error.message}"`)
logger.logFullError(error)
audit(error)
// push to slack - alertIt("slack message"
}
})
logger.info('pg-ifx-sync producer: Listening to notifications')
logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.')
} catch (err) {
logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`)
logger.error('Could not setup postgres client')
logger.error('Error in setting up postgres client: ', err.message)
logger.logFullError(err)

// push to slack - alertIt("slack message")
terminate()
}
}

const terminate = () => process.exit()
async function run () {
try {
await setupPgClient()
}
catch(err)
{
logger.debug(`Could not setup postgres client`)
logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`)
}

async function run() {
logger.debug("Initialising producer setup...")
await setupPgClient()
}

// execute
run()

async function audit(message) {
const pl_processid = message.processId
if (pl_processid != 'undefined') {
const payload = JSON.parse(message.payload)
const pl_seqid = payload.payload.payloadseqid
const pl_topic = payload.topic // TODO can move in config ?
const pl_table = payload.payload.table
const pl_uniquecolumn = payload.payload.Uniquecolumn
const pl_operation = payload.payload.operation
const pl_timestamp = payload.timestamp
const pl_payload = JSON.stringify(payload.payload)
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
if (!isFailover) {
logger.debug(`producer : ${logMessage}`);
} else {
logger.debug(`Producer DynamoDb : ${logMessage}`);
}
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer')
} else {
const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
if (!isFailover) {
await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer')
}
}
}

function alertIt(message) {
/**
// call slack
*/
}

app.get('/health', (req, res) => {
res.send('health ok')
res.send('health ok')
})
app.listen(port, () => console.log(`app listening on port ${port}!`))

79 changes: 0 additions & 79 deletions src/producer_dd.js

This file was deleted.