diff --git a/.circleci/config.yml b/.circleci/config.yml index f0e269d..c9d3902 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -57,9 +57,9 @@ build_steps: &build_steps #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer #echo "Running Masterscript - deploy postgres-ifx-processer producer" @@ -88,11 +88,11 @@ build_steps: &build_steps #source buildenvvar #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2" - #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi - #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar - #source buildenvvar - #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer jobs: # Build & Deploy against development backend # diff --git a/config/default.js b/config/default.js index 28666a0..3d9605d 100644 --- a/config/default.js +++ b/config/default.js @@ -23,8 +23,8 @@ module.exports = { password: process.env.PG_PASSWORD || 'password', port: parseInt(process.env.PG_PORT, 10) || 5432, - triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'prod_db_notifications', // List of trigger functions to listen to - triggerTopics: process.env.TRIGGER_TOPICS || ['prod.db.postgres.sync'], // Names of the topic in the trigger payload + triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'dev_db_notifications', // List of trigger functions to listen to + triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload }, KAFKA: { // Kafka connection options @@ -39,7 +39,7 @@ module.exports = { errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', recipients: ['admin@abc.com'], // Kafka partitions to use, KAFKA_URL: process.env.KAFKA_URL, - KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'prod-postgres-ifx-consumer', + KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'dev-postgres-ifx-consumer', KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null, KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, }, @@ -49,13 +49,13 @@ module.exports = { SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, RECONSILER: { - RECONSILER_START: process.env.RECONSILER_START || 5, + RECONSILER_START: process.env.RECONSILER_START || 30, RECONSILER_END: process.env.RECONSILER_END || 1, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, DYNAMODB: { - DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'prod_pg_ifx_payload_sync', + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync', DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 }, diff --git a/package.json b/package.json index 8819134..e27d0b3 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "consumer": "node ./src/consumer.js", "producer_dd": "node ./src/producer.js failover", "reconsiler1": "node ./src/reconsiler-audit.js", - "reconsiler2": "node ./src/reconsiler-dd.js", + "reconsiler2": "node ./src/reconsiler-dd-new.js", "start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1 & npm run reconsiler2 & npm run producer_channel_2" }, "author": "Topcoder", diff --git a/src/producer.js b/src/producer.js index edecb5b..caceb45 100644 --- a/src/producer.js +++ b/src/producer.js @@ -11,6 +11,7 @@ const postMessage = require('./services/posttoslack') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) const auditTrail = require('./services/auditTrail'); +const paudit_dd = require('./services/producer_audit_dd') const express = require('express') const app = express() const port = 3000 @@ -41,6 +42,8 @@ async function setupPgClient() { } else { logger.info('Push to dynamodb for reconciliation') await pushToDynamoDb(payload) + logger.info('Push to producer_audit_dd for reconciliation') + await paudit_dd.pushToAuditDD(message) } } else { diff --git a/src/reconsiler-dd-new.js b/src/reconsiler-dd-new.js new file mode 100644 index 0000000..4e42dca --- /dev/null +++ b/src/reconsiler-dd-new.js @@ -0,0 +1,155 @@ +const logger = require('./common/logger') +const config = require('config') +const pg = require('pg') +const pgOptions = config.get('POSTGRES') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +const pgClient = new pg.Client(pgConnectionString) +const kafkaService = require('./services/pushToDirectKafka') +const postMessage = require('./services/posttoslack') +const pushToDynamoDb = require('./services/pushToDynamoDb') +const auditTrail = require('./services/auditTrail'); + +const AWS = require("aws-sdk"); + +const documentClient = new AWS.DynamoDB.DocumentClient({ + region: 'us-east-1', + convertEmptyValues: true +}); + +async function setupPgClient() { + var payloadcopy + try { + const pgClient = new pg.Client(pgConnectionString) + if (!pgClient.connect()) { + await pgClient.connect() + } + rec_d_start = config.RECONSILER.RECONSILER_START + rec_d_end = config.RECONSILER.RECONSILER_END + rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE + var paramvalues = [rec_d_start,rec_d_end]; + sql1 = "select a.payloadseqid from common_oltp.producer_audit_dd a where not exists (select from common_oltp.pgifx_sync_audit " + sql2 = " where pgifx_sync_audit.payloadseqid = a.payloadseqid) and a.auditdatetime " + sql3 = " between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($1)" + sql4 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" + + sql = sql1 + sql2 + sql3 + sql4 + logger.info(`${sql}`) + const res = await pgClient.query(sql,paramvalues, async (err,result) => { + if (err) { + var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"` + logger.debug (errmsg0) + await callposttoslack(errmsg0) + } + else{ + console.log("Reconsiler_dd_2 : Rowcount = ", result.rows.length) + if (result.rows.length > 0) + { + Promise.all(result.rows.map(async (row) => { + console.log(row.payloadseqid) + const x = await calldynamodb(row.payloadseqid) + console.log("val of x ",x) + } + //)) + )).then(result => { if (typeof result.rows == "undefined") + { + console.log("terminating after posting to kafka") + pgClient.end() + terminate() + } + }) + } + else { + console.log("terminate due to 0 rows") + pgClient.end() + terminate() + } + } + }) + } + catch(err) + { + console.log(err) + process.exit(1) + } +} + + +async function calldynamodb(payloadseqid) +{ + console.log("At dynamoc function") + var params = { + TableName : config.DYNAMODB.DYNAMODB_TABLE, + KeyConditionExpression: 'payloadseqid = :hkey', + ExpressionAttributeValues: { + ':hkey': payloadseqid + }} + + return new Promise(async function (resolve, reject) { + await documentClient.query(params, async function(err, data) { + if (err) console.log(err); + else { + var s_payload = (data.Items[0].pl_document) + // console.log("s_payload",s_payload) + logger.info(`Reconsiler2 Before Posting Payload : ${JSON.stringify(s_payload)}`) + await kafkaService.pushToKafka(s_payload) + await audit(s_payload, 1) + logger.info(`Reconsiler2 Payload posted`) + }; + resolve("done"); + }); +}) +} + +async function audit(message, reconsileflag) { + var pl_producererr = "Reconsiler2" + const pl_processid = 5555 + payload1 = (message.payload) + const pl_seqid = payload1.payloadseqid + const pl_topic = payload1.topic // TODO can move in config ? + const pl_table = payload1.table + const pl_uniquecolumn = payload1.Uniquecolumn + const pl_operation = payload1.operation + const pl_timestamp = payload1.timestamp + //const pl_payload = JSON.stringify(message) + const pl_payload = message + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation}` + logger.debug(`${pl_producererr} : ${logMessage}`); + await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "Reconsiler2", pl_payload, new Date(), ""], 'producer') + return + } + + async function callposttoslack(slackmessage) { + if (config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: Reconsiler1: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } else { + logger.debug(`Reconsiler1: Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end + } + return + } + + //=================BEGIN HERE ======================= +const terminate = () => process.exit() + +async function run() { + logger.debug("Initialising Reconsiler2 setup...") + kafkaService.init().catch((e) => { + logger.error(`Kafka producer intialization error: "${e}"`) + terminate() + }) + setupPgClient() +} + +run() diff --git a/src/services/producer_audit_dd.js b/src/services/producer_audit_dd.js new file mode 100644 index 0000000..51fe721 --- /dev/null +++ b/src/services/producer_audit_dd.js @@ -0,0 +1,82 @@ + +const config = require('config') +const pg = require('pg') +const logger = require('../common/logger') +const postMessage = require('./posttoslack') +const pgOptions = config.get('POSTGRES') +const pgpool = require('./db.js'); + +const terminate = () => process.exit() + +async function pushToAuditDD(message) +{ +try +{ + const pl_channel = message.channel + const pl_processid = message.processId + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_channel} ${pl_processid} ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + + let paramval = [pl_seqid,pl_timestamp,pl_channel,pl_table,pl_operation,JSON.stringify(message),pl_topic,pl_uniquecolumn,pl_processid] + sql0 = "INSERT INTO common_oltp.producer_audit_dd(payloadseqid,auditdatetime,channelname,tablename,dboperation,payload,topicname,uniquecolumn,processid) " + sql1= " VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict (payloadseqid) DO nothing "; + sql = sql0 + sql1 + + pgpool.on('error', (err, client) => { + logger.debug(`producer_audit_dd: Unexpected error on idle client : ${err}`) + }) + + pgpool.connect((err, client, release) => { + if (err) { + return logger.debug(`producer_audit_dd : Error acquiring client : ${err.stack}`) + } + client.query(sql, paramval, (err, res) => { + release() + if (err) { + return logger.debug(`producer_audit_dd :Error executing Query : ${err.stack}`) + } + logger.debug(`producer_audit_dd : Audit Trail update : ${res.rowCount}`) + }) + }) +} +catch(err) +{ + logger.debug(`pushToAuditDD : ${err}`) + callposttoslack(`pushToAuditDD : ${err}`) + process.exit(1) +} + +} + + +async function callposttoslack(slackmessage) { + if (config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end + } +} + + +module.exports = { pushToAuditDD }