From a3bf50d8ec70aad2e00726a6c7bfb486621638fe Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Thu, 13 Feb 2020 16:12:00 +0530 Subject: [PATCH 01/10] [skip ci] [skip ci] --- src/reconsiler-audit.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js index 0c44570..4846e72 100644 --- a/src/reconsiler-audit.js +++ b/src/reconsiler-audit.js @@ -22,9 +22,9 @@ async function setupPgClient() { var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; //var paramvalues = ['push-to-kafka','60002027']; //sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' - sql1 = " select seq_id, payloadseqid,auditdatetime at time zone 'utc' at time zone 'Asia/Calcutta',syncstatus, payload where pgifx_sync_audit.syncstatus =($1)" - sql2 = " and pgifx_sync_audit.auditdatetime >= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($2)" - sql3 = " and pgifx_sync_audit.auditdatetime <= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($3)" + sql1 = "select pgifx_sync_audit.seq_id, payloadseqid,auditdatetime ,syncstatus, payload from pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" + sql2 = " and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" + sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)" sql = sql1 + sql2 + sql3 console.log('sql ', sql) // sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' From 9be18c46b69171ebbac20f73b5e103efd717c66e Mon Sep 17 00:00:00 2001 From: nkumar Date: Mon, 17 Feb 2020 10:37:31 +0000 Subject: [PATCH 02/10] reconsiler checkin --- config/default.js | 7 +- src/reconsiler-audit.js | 221 ++++++++++++++++++++++++++++--------- src/services/auditTrail.js | 6 +- 3 files changed, 176 insertions(+), 58 deletions(-) diff --git a/config/default.js b/config/default.js index d22bb48..a207e87 100644 --- a/config/default.js +++ b/config/default.js @@ -44,13 +44,14 @@ module.exports = { SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, RECONSILER:{ - RECONSILER_START: process.env.RECONSILER_START || 10, - RECONSILER_END: process.env.RECONSILER_END || 5, + RECONSILER_START: process.env.RECONSILER_START || 5, + RECONSILER_END: process.env.RECONSILER_END || 1, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, DYNAMODB: { - DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync' + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync', + DD_ElapsedTime: process.env.DD_ElapsedTime || 600000 }, AUTH0_URL: process.env.AUTH0_URL , diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js index 4846e72..545cb85 100644 --- a/src/reconsiler-audit.js +++ b/src/reconsiler-audit.js @@ -1,45 +1,40 @@ const config = require('config') const pg = require('pg') +var AWS = require("aws-sdk"); const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') const pgOptions = config.get('POSTGRES') const postMessage = require('./services/posttoslack') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -const pgClient = new pg.Client(pgConnectionString) +//const pgClient = new pg.Client(pgConnectionString) const auditTrail = require('./services/auditTrail'); const port = 3000 - +//----------------------------Calling reconsiler 1 Audit log script ---------- async function setupPgClient() { var payloadcopy try { - await pgClient.connect() - //sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated') - //and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') "; - //>= 12.53 and <= 12.48 + const pgClient = new pg.Client(pgConnectionString) + if (!pgClient.connect()) { + await pgClient.connect() + } + //rec_d_start = 10 + //rec_d_end = 1 rec_d_start = config.RECONSILER.RECONSILER_START rec_d_end = config.RECONSILER.RECONSILER_END rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE - var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; - //var paramvalues = ['push-to-kafka','60002027']; - //sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' + var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; sql1 = "select pgifx_sync_audit.seq_id, payloadseqid,auditdatetime ,syncstatus, payload from pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" sql2 = " and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)" sql = sql1 + sql2 + sql3 - console.log('sql ', sql) - // sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' - // sql4 = " and pgifx_sync_audit.payloadseqid = ($2)" - - //sql = sql3 + sql4 - //const result = await pgClient.query(sql1, (err, res) => { await pgClient.query(sql,paramvalues, async (err,result) => { if (err) { - var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` + var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"` logger.debug (errmsg0) - // await callposttoslack(errmsg0) + await callposttoslack(errmsg0) } else{ - console.log("Reconsiler Rowcount = ", result.rows.length) + console.log("Reconsiler1 : Rowcount = ", result.rows.length) for (var i = 0; i < result.rows.length; i++) { for(var columnName in result.rows[i]) { // console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]); @@ -48,39 +43,37 @@ async function setupPgClient() { var reconsiler_payload = result.rows[i][columnName] } }//column for loop - try { + try { + //console.log("reconsiler_payload====",reconsiler_payload); + if (reconsiler_payload != ""){ var s_payload = reconsiler_payload payload = JSON.parse(s_payload) payload1 = payload.payload await pushToKafka(payload1) - logger.info('Reconsiler : Push to kafka and added for audit trail') - audit(s_payload) - } catch (error) { - logger.error('Reconsiler: Could not parse message payload') - logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`) - const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"` + logger.info('Reconsiler1 Push to kafka and added for audit trail') + await audit(s_payload,0) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb + } }catch (error) { + logger.error('Reconsiler1 : Could not parse message payload') + logger.debug(`error-sync: Reconsiler1 parse message : "${error.message}"`) + const errmsg1 = `error-sync: Reconsiler1 : Error Parse or payload : "${error.message}" ` logger.logFullError(error) - audit(error) + // await audit(error,0) await callposttoslack(errmsg1) - } - }//result for loop + terminate() + } + }//result for loop } pgClient.end() - }) + terminate() + }) }catch (err) { - const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"` + const errmsg = `postgres-ifx-processor: Reconsiler1 : Error in setting up postgres client: "${err.message}"` logger.error(errmsg) logger.logFullError(err) await callposttoslack(errmsg) terminate() } -} - -const terminate = () => process.exit() - -async function run() { - logger.debug("Initialising Reconsiler setup...") - await setupPgClient() +return } async function callposttoslack(slackmessage) { @@ -92,35 +85,159 @@ async function callposttoslack(slackmessage) { logger.debug('Message posted successfully'); //callback(null); } else if (response.statusCode < 500) { - const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + const errmsg1 = `Slack Error: Reconsiler1: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` logger.debug(`error-sync: ${errmsg1}`) } else { - logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + logger.debug(`Reconsiler1: Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); } resolve("done") }); }) //end } - +return } -// execute -run() -async function audit(message) { - const pl_processid = 5555 - const jsonpayload = JSON.parse(message) - payload = JSON.parse(jsonpayload.payload) - payload1 = payload.payload +async function audit(message,reconsileflag) { + if (reconsileflag === 1) + { + const jsonpayload = (message) + const payload = (jsonpayload.payload) + var pl_producererr= "Reconsiler2" + }else { + const jsonpayload = JSON.parse(message) + payload = JSON.parse(jsonpayload.payload) + var pl_producererr= "Reconsiler1" + } + const pl_processid = 5555 + //const jsonpayload = JSON.parse(message) + //payload = JSON.parse(jsonpayload.payload) + payload1 = (payload.payload) const pl_seqid = payload1.payloadseqid - const pl_topic = payload1.topic // TODO can move in config ? + const pl_topic = payload1.topic // TODO can move in config ? const pl_table = payload1.table const pl_uniquecolumn = payload1.Uniquecolumn const pl_operation = payload1.operation const pl_timestamp = payload1.timestamp - //const pl_payload = JSON.stringify(payload.payload) - const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` - logger.debug(`reconsiler : ${logMessage}`); - //await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer') + const pl_payload = JSON.stringify(message) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + logger.debug(`${pl_producererr} : ${logMessage}`); + await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", pl_producererr, pl_payload, new Date(), ""], 'producer') + return +} + +//===============RECONSILER2 DYNAMODB CODE STARTS HERE ========================== + +async function callReconsiler2() +{console.log("inside 2"); + docClient.scan(params, onScan); +} + +var docClient = new AWS.DynamoDB.DocumentClient({ + region: 'us-east-1', + convertEmptyValues: true + }); +//ElapsedTime = 094600000 +ElapsedTime = config.DYNAMODB.DD_ElapsedTime + var params = { + TableName: config.DYNAMODB.DYNAMODB_TABLE, + FilterExpression: "#timestamp between :time_1 and :time_2", + ExpressionAttributeNames: { + "#timestamp": "timestamp", + }, + ExpressionAttributeValues: { + ":time_1": Date.now() - ElapsedTime, + ":time_2": Date.now() + } + } + +function onScan(err, data) { + if (err) { + logger.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2)); + terminate() + } else { + try + { + console.log("Scan succeeded."); + let total_dd_records = 0; + let total_pushtokafka = 0; + data.Items.forEach(async function(item) { + //console.log(item.payloadseqid); + var retval = await verify_pg_record_exists(item.payloadseqid) + //console.log("retval", retval); + if (retval === false){ + var s_payload = (item.pl_document) + payload = s_payload + payload1 = (payload.payload) + await pushToKafka(item.pl_document) + await audit(s_payload,1) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb + logger.info(`Reconsiler2 : ${item.payloadseqid} posted to kafka: Total Kafka Count : ${total_pushtokafka}`) + total_pushtokafka += 1 + } + total_dd_records += 1 + }); + logger.info(`Reconsiler2 : count of total_dd_records ${total_dd_records}`); + if (typeof data.LastEvaluatedKey != "undefined") { + console.log("Scanning for more..."); + params.ExclusiveStartKey = data.LastEvaluatedKey; + docClient.scan(params, onScan); + } + } + catch (err) { + const errmsg = `error-sync: Reconsiler2 : Error during dynamodb scan/kafka push: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + callposttoslack(errmsg) + //terminate() + } + } + //terminate() } +async function verify_pg_record_exists(seqid) +{ + try { + const pgClient = new pg.Client(pgConnectionString) + if (!pgClient.connect()) {await pgClient.connect()} + var paramvalues = [seqid] + sql = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.payloadseqid = ($1)' + return new Promise(function (resolve, reject) { + pgClient.query(sql, paramvalues, async (err, result) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler2 query "${err.message}"` + console.log(errmsg0) + } + else { + if (result.rows.length > 0) { + //console.log("row length > 0 ") + resolve(true); + } + else { + //console.log("0") + resolve(false); + } + } + pgClient.end() + }) + })} + catch (err) { + const errmsg = `error-sync: Reconsiler2 : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + await callposttoslack(errmsg) + terminate() + } +} + +//=================BEGIN HERE ======================= +const terminate = () => process.exit() + +async function run() { + logger.debug("Initialising Reconsiler1 setup...") + await setupPgClient() + //logger.debug("Initialising Reconsiler2 setup...") + //callReconsiler2() + // terminate() +} +//execute +run() diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index 3f45f95..7e0d645 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -25,7 +25,7 @@ if (!pgClient2) { } if (sourcetype === 'producer'){ sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' - sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus) = ($6) where pgifx_sync_audit.payloadseqid = $1'; + sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,producer_err) = ($6,$9) where pgifx_sync_audit.payloadseqid = $1'; sql = sql0 + sql1 logger.debug(`--Audit Trail update producer--`) } else { @@ -34,12 +34,12 @@ if (sourcetype === 'producer'){ // where pgifx_sync_audit.payloadseqid = $1'; //and pgifx_sync_audit.processId = $2'; sql = sql0 + sql1 - logger.debug(`--Audit Trail update consumer--`) + logger.debug(`--${1} ${3} 1 Audit Trail update consumer--`) //logger.debug(`sql values "${sql}"`); } return pgClient2.query(sql, data, (err, res) => { if (err) { - logger.debug(`--Audit Trail update error-- ${err.stack}`) + logger.debug(`-- Audit Trail update error-- ${err.stack}`) //pgClient2.end() } else { // logger.debug(`--Audit Trail update success-- `) From 95c11c2b6ca29b3a77105ca1928ec5a643a0056f Mon Sep 17 00:00:00 2001 From: nkumar Date: Mon, 17 Feb 2020 12:17:43 +0000 Subject: [PATCH 03/10] changes to consumer/package/config.yml --- .circleci/config.yml | 7 +++++++ package.json | 3 ++- src/consumer.js | 25 +++++++++++-------------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ca3986b..9bc2c08 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -57,6 +57,13 @@ build_steps: &build_steps ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + jobs: # Build & Deploy against development backend # "build-dev": diff --git a/package.json b/package.json index f026db9..5f68789 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,8 @@ "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", "producer_dd": "node ./src/producer.js failover", - "start": "npm run producer & npm run producer_dd & npm run consumer" + "reconsiler1": "node ./src/reconsiler-audit.js", + "start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1" }, "author": "Topcoder", "license": "ISC", diff --git a/src/consumer.js b/src/consumer.js index c5d1e9b..9531776 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -56,26 +56,23 @@ let cs_payloadseqid await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success if (message.payload['retryCount']) retryvar = message.payload.retryCount; auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer') + message.payload.operation,"Informix-updated",retryvar,"","",JSON.stringify(message), message.timestamp,message.topic],'consumer') } catch (err) { - const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"` + const errmsg2 = `error-sync: Could not process kafka message or informix DB error: "${err.message}"` logger.error(errmsg2) - //await callposttoslack(errmsg2) - //logger.logFullError(err) logger.debug(`error-sync: consumer "${err.message}"`) if (!cs_payloadseqid){ - cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); - } - - await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2);} +/* await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') + }else{ + auditTrail([cs_payloadseqid,4444,message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,"Informix-updated",retryvar,"consumer2","",JSON.stringify(message), message.timestamp,message.topic],'consumer') + }*/ + try { - //var retryvar if (message.payload['retryCount']) retryvar = message.payload.retryCount; await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - // await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', - // 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') - //await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`) logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { message.payload.retryCount = 0 @@ -95,14 +92,14 @@ let cs_payloadseqid } message.payload['retryCount'] = message.payload.retryCount + 1; await pushToKafka(message) - var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` + var errmsg9 = `error-sync: Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` logger.debug(errmsg9) //await callposttoslack(errmsg9) } catch (err) { await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') - const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` + const errmsg1 = `error-sync: postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` logger.error(errmsg1) logger.debug(`error-sync: consumer re-publishing "${err.message}"`) // push to slack - alertIt("slack message" From faadfc3ac296290670775950e65a356f31c800b7 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 17 Feb 2020 17:50:23 +0530 Subject: [PATCH 04/10] Update config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9bc2c08..ca1984e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,7 +58,7 @@ build_steps: &build_steps source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" + echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar source buildenvvar From 8c08d7bdd250cbd709b495fb008e37a4feb99dcb Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 17 Feb 2020 18:09:40 +0530 Subject: [PATCH 05/10] Update config.yml --- .circleci/config.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ca1984e..d3c601c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,11 +58,11 @@ build_steps: &build_steps source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" - if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" + #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar + #source buildenvvar + #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer jobs: # Build & Deploy against development backend # From b3a0d316d7778d2012f9d915991f34019edf0204 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 17 Feb 2020 18:10:55 +0530 Subject: [PATCH 06/10] Update config.yml --- .circleci/config.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index d3c601c..5f0ea76 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,12 +58,6 @@ build_steps: &build_steps source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - #echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" - #if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi - #./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar - #source buildenvvar - #./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - jobs: # Build & Deploy against development backend # "build-dev": From cd2e3d43f4184cb7a99616b396729ba2507a3567 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 17 Feb 2020 18:21:36 +0530 Subject: [PATCH 07/10] Update config.yml --- .circleci/config.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5f0ea76..1d8dfba 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,6 +58,12 @@ build_steps: &build_steps source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + echo "Running Masterscript - deploy postgres-ifx-processer reconsiler1" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + jobs: # Build & Deploy against development backend # "build-dev": From 93311389f8f439d38a62c0176eb528b83c398588 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 19 Feb 2020 17:20:29 +0530 Subject: [PATCH 08/10] adding condition for cyclic sync [skip ci] --- pg-identity-func-trig-seq.sql | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql index 6679c67..1ccfde9 100644 --- a/pg-identity-func-trig-seq.sql +++ b/pg-identity-func-trig-seq.sql @@ -27,11 +27,30 @@ DECLARE payload TEXT; column_name TEXT; column_value TEXT; + pguserval TEXT; payload_items TEXT[]; uniquecolumn TEXT; logtime TEXT; payloadseqid INTEGER; BEGIN + + +--pguserval := (SELECT 1 FROM pg_roles WHERE rolname = 'pgsyncuser'); +pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + end if; + CASE TG_OP WHEN 'INSERT', 'UPDATE' THEN rec := NEW; @@ -99,13 +118,14 @@ BEGIN END LOOP; --logtime := (select date_display_tz()); logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); uniquecolumn := (SELECT c.column_name FROM information_schema.key_column_usage AS c LEFT JOIN information_schema.table_constraints AS t ON t.constraint_name = c.constraint_name - WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' limit 1); if (uniquecolumn = '') IS NOT FALSE then uniquecolumn := 'Not-Available'; @@ -131,9 +151,10 @@ BEGIN PERFORM pg_notify('test_db_notifications', payload); RETURN rec; -END; -$body$ LANGUAGE plpgsql; +END; +$body$ LANGUAGE plpgsql + CREATE TRIGGER "pg_email_trigger" AFTER INSERT OR DELETE OR UPDATE ON email FOR EACH ROW From 5983987874681e0dec86101ceea223bdfb84df44 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Fri, 21 Feb 2020 16:05:04 +0530 Subject: [PATCH 09/10] Update reconsiler-audit.js --- src/reconsiler-audit.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js index 545cb85..6604e68 100644 --- a/src/reconsiler-audit.js +++ b/src/reconsiler-audit.js @@ -22,8 +22,8 @@ async function setupPgClient() { rec_d_start = config.RECONSILER.RECONSILER_START rec_d_end = config.RECONSILER.RECONSILER_END rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE - var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; - sql1 = "select pgifx_sync_audit.seq_id, payloadseqid,auditdatetime ,syncstatus, payload from pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" + var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; + sql1 = "select pgifx_sync_audit.seq_id, pgifx_sync_audit.payloadseqid,pgifx_sync_audit.auditdatetime ,pgifx_sync_audit.syncstatus, pgifx_sync_audit.payload from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)" sql2 = " and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)" sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)" sql = sql1 + sql2 + sql3 From 20ce8c7ad74ef6585bb6f701858d9a9348ac6ef2 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Fri, 28 Feb 2020 15:36:34 +0530 Subject: [PATCH 10/10] [skip ci] [skip ci] --- pg-identity-func-trig-seq.sql | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql index 1ccfde9..ef34d95 100644 --- a/pg-identity-func-trig-seq.sql +++ b/pg-identity-func-trig-seq.sql @@ -202,10 +202,27 @@ DECLARE column_name TEXT; column_value TEXT; payload_items TEXT[]; + pguserval TEXT; uniquecolumn TEXT; logtime TEXT; payloadseqid INTEGER; BEGIN + + pguserval := (SELECT current_user); + if pguserval = 'pgsyncuser' then + RAISE notice 'pgsyncuser name : %', pguserval; + + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + return rec; + end if; + CASE TG_OP WHEN 'INSERT', 'UPDATE' THEN rec := NEW;