From d85dcf4f6feb3c851e962a655cbc62d094e822ef Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Mon, 30 Sep 2019 17:34:24 +0530 Subject: [PATCH 01/27] Update consumer.js --- src/consumer.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/consumer.js b/src/consumer.js index 826e978..9794446 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -91,7 +91,9 @@ async function dataHandler(messageSet, topic, partition) { async function setupKafkaConsumer() { try { await consumer.init() - await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) + //await consumer.subscribe(kafkaOptions.topic, kafkaOptions.partition, { time: Kafka.LATEST_OFFSET }, dataHandler) + await consumer.subscribe(kafkaOptions.topic, dataHandler) + logger.info('Initialized kafka consumer') healthcheck.init([check]) } catch (err) { From e7cc26d188ed87d226de0d5e2c138d3feee104e1 Mon Sep 17 00:00:00 2001 From: nkumar Date: Tue, 17 Dec 2019 11:41:45 +0000 Subject: [PATCH 02/27] for identity changes Committer: nkumar --- config/default.js | 4 ++-- config/test.js | 6 +++--- package.json | 1 + src/consumer.js | 36 ++++++++++++++++++++++----------- src/producer.js | 37 ++++++++++++++++++++++------------ src/services/auditTrail.js | 19 +++++++++++------ src/services/pushToDynamoDb.js | 33 ++++++++++++++++++++++++++++++ 7 files changed, 100 insertions(+), 36 deletions(-) create mode 100644 src/services/pushToDynamoDb.js diff --git a/config/default.js b/config/default.js index 54a0793..dd31193 100644 --- a/config/default.js +++ b/config/default.js @@ -22,8 +22,8 @@ module.exports = { database: process.env.PG_DATABASE || 'postgres', // database must exist before running the tool password: process.env.PG_PASSWORD || 'password', port: parseInt(process.env.PG_PORT, 10) || 5432, - triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['db_notifications'], // List of trigger functions to listen to - triggerTopics: process.env.TRIGGER_TOPICS || ['db.postgres.sync'], // Names of the topic in the trigger payload + triggerFunctions: process.env.TRIGGER_FUNCTIONS || ['test_db_notifications'], // List of trigger functions to listen to + triggerTopics: process.env.TRIGGER_TOPICS || ['test.db.postgres.sync'], // Names of the topic in the trigger payload triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload }, KAFKA: { // Kafka connection options diff --git a/config/test.js b/config/test.js index 228861f..a198f88 100644 --- a/config/test.js +++ b/config/test.js @@ -7,9 +7,9 @@ module.exports = { host: process.env.INFORMIX_HOST || 'localhost', port: parseInt(process.env.INFORMIX_PORT, 10) || 2021, user: process.env.INFORMIX_USER || 'informix', - password: process.env.INFORMIX_PASSWORD || '1nf0rm1x', - database: process.env.INFORMIX_DATABASE || 'tcs_catalog', - server: process.env.INFORMIX_SERVER || 'informixoltp_tcp', + password: process.env.INFORMIX_PASSWORD || 'password', + database: process.env.INFORMIX_DATABASE || 'db', + server: process.env.INFORMIX_SERVER || 'informixp', minpool: parseInt(process.env.MINPOOL, 10) || 1, maxpool: parseInt(process.env.MAXPOOL, 10) || 60, maxsize: parseInt(process.env.MAXSIZE, 10) || 0, diff --git a/package.json b/package.json index 21b72b6..3bebbd7 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "author": "Topcoder", "license": "ISC", "dependencies": { + "aws-sdk": "*", "config": "^3.2.2", "informix-wrapper": "git+https://github.com/appirio-tech/informix-wrapper.git", "no-kafka": "^3.4.3", diff --git a/src/consumer.js b/src/consumer.js index 9794446..7270cd3 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -33,7 +33,7 @@ const check = function () { return connected; }; - +let cs_processId; const terminate = () => process.exit() /** * @@ -41,23 +41,34 @@ const terminate = () => process.exit() * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ +let message; +let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { - for (const m of messageSet) { // Process messages sequentially + for (const m of messageSet) { // Process messages sequentially let message try { + //let message message = JSON.parse(m.message.value) +// cs_payloadseqid = message.payload.payloadseqid + //console.log(message); logger.debug('Received message from kafka:') - logger.debug(JSON.stringify(message)) - await updateInformix(message) + logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); + await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success - await auditTrail([message.payload.payloadseqid,'scorecard_consumer',message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,1,0,"",message.timestamp,new Date(),message.payload.data],'consumer') + await auditTrail([message.payload.payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,"Informix-updated","","","",message.payload.data, message.timestamp,message.topic],'consumer') } catch (err) { - logger.error('Could not process kafka message') + logger.error(`Could not process kafka message or informix DB error: "${err.message}"`) //logger.logFullError(err) + if (!cs_payloadseqid){ + cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); + } + + await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') try { await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - logger.debug('Trying to push same message after adding retryCounter') + logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { message.payload.retryCount = 0 logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); @@ -73,12 +84,13 @@ async function dataHandler(messageSet, topic, partition) { return } message.payload['retryCount'] = message.payload.retryCount + 1; + //await auditTrail([message.payload.payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + // message.payload.operation,"Error",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') await pushToKafka(message) - logger.debug('pushed same message after adding retryCount') + logger.debug(` After kafka push Retry Count "${message.payload.retryCount}"`) } catch (err) { - //await auditTrail([payload.payload.payloadseqid,'scorecard_consumer',payload.payload.table,payload.payload.Uniquecolumn, - // payload.payload.operation,0,message.payload.retryCount,"re-publish kafka err",payload.timestamp,new Date(),""],'consumer') - logger.error("Error occured in re-publishing kafka message", err) + + logger.error("Error occured in re-publishing kafka message", err) } } } diff --git a/src/producer.js b/src/producer.js index ce59982..f5bc6b1 100644 --- a/src/producer.js +++ b/src/producer.js @@ -12,34 +12,45 @@ const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 - - +//console.log(`pgConnectionString value = ${pgConnectionString}`) +var pl_processid; +var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); async function setupPgClient () { - try { + try { +//console.log(`${pgOptions.triggerFunctions}`); await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - try { + //const payload = JSON.parse(message.payload); + pl_processid = message.processId; + //console.log(message); + try + { const payload = JSON.parse(message.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + var pl_seqid = payload.payload.payloadseqid + var pl_topic = payload.topic + var pl_table = payload.payload.table + var pl_uniquecolumn = payload.payload.Uniquecolumn + var pl_operation = payload.payload.operation + var pl_timestamp = payload.timestamp + var pl_payload = JSON.stringify(payload.payload) + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - console.log(`${payload.topic} ${payload.payload.table} ${payload.payload.operation} ${payload.timestamp}`); + logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); await pushToKafka(payload) } else { logger.debug('Ignoring message with incorrect topic or originator') } - await auditTrail([payload.payload.payloadseqid,'scorecard_producer',1,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, - payload.payload.operation,"",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') - } catch (error) { +await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer') + } catch (error) { logger.error('Could not parse message payload') - await auditTrail([payload.payload.payloadseqid,'scorecard_producer',0,payload.topic,payload.payload.table,payload.payload.Uniquecolumn, - payload.payload.operation,"error",payload.timestamp,new Date(),JSON.stringify(payload.payload)],'producer') - logger.logFullError(error) +await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') + logger.logFullError(error) } }) - logger.info('Listening to notifications') + logger.info('pg-ifx-sync producer: Listening to notifications') } catch (err) { logger.error('Could not setup postgres client') logger.logFullError(err) diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index fcd26a5..5937517 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -5,7 +5,7 @@ const logger = require('../common/logger') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` let pgClient2 - +console.log(`"${pgConnectionString}"`); async function setupPgClient2 () { pgClient2 = new pg.Client(pgConnectionString) try { @@ -24,18 +24,25 @@ if (!pgClient2) { await setupPgClient2() } if (sourcetype === 'producer'){ - sql = 'INSERT INTO tcs_catalog.producer_scorecard_audit(payloadseqid,origin_source,kafka_post_status,topic_name,table_name,Uniquecolumn,operationtype,errormessage,payloadtime,auditdatetime,payload) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' -logger.debug(`--Audit Trail update producer--`) + sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' + sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus) = ($6) where pgifx_sync_audit.payloadseqid = $1'; + sql = sql0 + sql1 + logger.debug(`--Audit Trail update producer--`) } else { -sql = 'INSERT INTO tcs_catalog.consumer_scorecard_audit(payloadseqid,origin_source,table_name,Uniquecolumn,operationtype,dest_db_status, dest_retry_count,errormessage,payloadtime,auditdatetime,dest_operationquery) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)' -logger.debug(`--Audit Trail update consumer--`) + sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)' + sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,consumer_err,retrycount) = ($6,$8,$7)'; + // where pgifx_sync_audit.payloadseqid = $1'; + //and pgifx_sync_audit.processId = $2'; + sql = sql0 + sql1 + logger.debug(`--Audit Trail update consumer--`) + //logger.debug(`sql values "${sql}"`); } return pgClient2.query(sql, data, (err, res) => { if (err) { logger.debug(`--Audit Trail update error-- ${err.stack}`) //pgClient2.end() } else { - logger.debug(`--Audit Trail update success-- `) + // logger.debug(`--Audit Trail update success-- `) } }) } diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js new file mode 100644 index 0000000..3584263 --- /dev/null +++ b/src/services/pushToDynamoDb.js @@ -0,0 +1,33 @@ +const config = require('config') +const logger = require('../common/logger') +const _ = require('lodash') +var AWS = require("aws-sdk"); +async function pushToDynamoDb(payload) { + try { console.log('----Inside DynomoDB code -------'); + console.log(payload) + p_dd_payloadseqid = payload.payload.payloadseqid; + var params = { + TableName: 'test_pg_ifx_payload_sync', + Item: { + payloadseqid: payload.payload.payloadseqid, + pl_document: payload.payload, + pl_table: payload.payload.table, + pl_uniquecolumn: payload.payload.Uniquecolumn, + pl_operation: payload.payload.operation, + pl_time: payload.timestamp, + timestamp: Date.now() + } + } + var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); + docClient.put(params, function(err, data) { + if (err) console.log(err); + else console.log(data); + }); + + } catch (e) { + console.log(e) + } +} + +console.log("hello from DD") +module.exports = pushToDynamoDb From 69ea87195c47826e64880e75d6c3517715b91edc Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Tue, 17 Dec 2019 17:36:42 +0530 Subject: [PATCH 03/27] Update package.json --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 3bebbd7..3019890 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,8 @@ "lint:fix": "standard --env mocha --fix", "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", - "start": "npm run producer & npm run consumer" + "producer_dd": "node ./src/producer_dd.js", + "start": "npm run producer & npm run producer_dd & npm run consumer" }, "author": "Topcoder", "license": "ISC", From c92df701c40ba6dfd67adf0c90f25c2456e9d478 Mon Sep 17 00:00:00 2001 From: nkumar Date: Tue, 17 Dec 2019 12:09:29 +0000 Subject: [PATCH 04/27] producer dynamodb file Committer: nkumar --- src/producer_dd.js | 70 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/producer_dd.js diff --git a/src/producer_dd.js b/src/producer_dd.js new file mode 100644 index 0000000..2faebe9 --- /dev/null +++ b/src/producer_dd.js @@ -0,0 +1,70 @@ +/** + * Listens to DML trigger notifications from postgres and pushes the trigger data into kafka + */ +const config = require('config') +const pg = require('pg') +const logger = require('./common/logger') +//const pushToKafka = require('./services/pushToKafka') +const pushToDynamoDb = require('./services/pushToDynamoDb') +const pgOptions = config.get('POSTGRES') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +const pgClient = new pg.Client(pgConnectionString) +const auditTrail = require('./services/auditTrail'); +const express = require('express') +const app = express() +const port = 3100 +//console.log(`pgConnectionString value = ${pgConnectionString}`) +var pl_processid; +var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); +async function setupPgClient () { + try { + await pgClient.connect() + for (const triggerFunction of pgOptions.triggerFunctions) { + await pgClient.query(`LISTEN ${triggerFunction}`) + } + pgClient.on('notification', async (message) => { + //const payload = JSON.parse(message.payload); + pl_processid = message.processId; + try + { + const payload = JSON.parse(message.payload) + var pl_seqid = payload.payload.payloadseqid + var pl_topic = payload.topic + var pl_table = payload.payload.table + var pl_uniquecolumn = payload.payload.Uniquecolumn + var pl_operation = payload.payload.operation + var pl_timestamp = payload.timestamp + var pl_payload = JSON.stringify(payload.payload) + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + if (validTopicAndOriginator) { + logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + await pushToDynamoDb(payload) + } else { + logger.debug('Ignoring message with incorrect topic or originator') + } +await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-DynamoDb","","","",pl_payload,pl_timestamp,pl_topic],'producer') + } catch (error) { + logger.error('Could not parse message payload') +await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation',"error-DynamoDB","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') + logger.logFullError(error) + } + }) + logger.info('Producer DynamoDb: Listening to notifications') + } catch (err) { + logger.error('Could not setup postgres client') + logger.logFullError(err) + terminate() + } +} + +async function run () { + await setupPgClient() +} + +run() + +app.get('/health', (req, res) => { + res.send('health ok') +}) +app.listen(port, () => console.log(`app listening on port ${port}!`)) + From 0326ce2d7e76e0275a70c0ef86b0ef5309280cf2 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Tue, 17 Dec 2019 18:25:22 +0530 Subject: [PATCH 05/27] Update config.yml --- .circleci/config.yml | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index da48833..0f2a328 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -44,13 +44,18 @@ build_steps: &build_steps ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + echo "Running Masterscript - deploy postgres-ifx-processer producer" if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar source buildenvvar ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + + echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer jobs: # Build & Deploy against development backend # "build-dev": @@ -62,6 +67,17 @@ jobs: APP_NAME: "postgres-ifx-processer" steps: *build_steps # Build & Deploy against production backend +jobs: + # Build & Deploy against development backend # + "build-test": + <<: *defaults + environment: + DEPLOY_ENV: "TEST" + LOGICAL_ENV: "TEST" + GLOBAL_ENV: "dev" + APP_NAME: "postgres-ifx-processer" + steps: *build_steps + # Build & Deploy against production backend "build-prod": <<: *defaults environment: @@ -82,10 +98,15 @@ workflows: only: - dev - dev-retryfeature + - "build-test": + context : org-global + filters: + branches: + only: + - dev-test-pg - "build-prod": context : org-global filters: branches: only: - master - From 810d0c9f10ca9977504b7d431437bd1bcaf65f14 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Tue, 17 Dec 2019 18:27:33 +0530 Subject: [PATCH 06/27] Update config.yml --- .circleci/config.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0f2a328..dee2111 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -66,8 +66,6 @@ jobs: GLOBAL_ENV: "dev" APP_NAME: "postgres-ifx-processer" steps: *build_steps - # Build & Deploy against production backend -jobs: # Build & Deploy against development backend # "build-test": <<: *defaults From c3d50628d0b3673f4f52f7bcf1a67615524e8d3b Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Tue, 17 Dec 2019 18:33:37 +0530 Subject: [PATCH 07/27] Update config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index dee2111..cb28e0f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -70,7 +70,7 @@ jobs: "build-test": <<: *defaults environment: - DEPLOY_ENV: "TEST" + DEPLOY_ENV: "DEV" LOGICAL_ENV: "TEST" GLOBAL_ENV: "dev" APP_NAME: "postgres-ifx-processer" From cda1e5f1e5138f77ceae76ccb821b129a5118bc3 Mon Sep 17 00:00:00 2001 From: nkumar Date: Wed, 18 Dec 2019 07:23:28 +0000 Subject: [PATCH 08/27] fix dynomodb ecsrole Committer: nkumar --- src/producer_dd.js | 6 ++---- src/services/pushToDynamoDb.js | 12 ++++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/producer_dd.js b/src/producer_dd.js index 2faebe9..7796abd 100644 --- a/src/producer_dd.js +++ b/src/producer_dd.js @@ -4,7 +4,6 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') -//const pushToKafka = require('./services/pushToKafka') const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` @@ -12,8 +11,7 @@ const pgClient = new pg.Client(pgConnectionString) const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() -const port = 3100 -//console.log(`pgConnectionString value = ${pgConnectionString}`) +const port = 3000 var pl_processid; var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); async function setupPgClient () { @@ -37,7 +35,7 @@ async function setupPgClient () { var pl_payload = JSON.stringify(payload.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + logger.info(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); await pushToDynamoDb(payload) } else { logger.debug('Ignoring message with incorrect topic or originator') diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js index 3584263..1ea5a63 100644 --- a/src/services/pushToDynamoDb.js +++ b/src/services/pushToDynamoDb.js @@ -3,8 +3,8 @@ const logger = require('../common/logger') const _ = require('lodash') var AWS = require("aws-sdk"); async function pushToDynamoDb(payload) { - try { console.log('----Inside DynomoDB code -------'); - console.log(payload) + try { console.log('----Push To DynomoDB -------'); + // console.log(payload) p_dd_payloadseqid = payload.payload.payloadseqid; var params = { TableName: 'test_pg_ifx_payload_sync', @@ -20,14 +20,14 @@ async function pushToDynamoDb(payload) { } var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); docClient.put(params, function(err, data) { - if (err) console.log(err); - else console.log(data); + if (err) logger.error(err); + else logger.info(data); }); } catch (e) { - console.log(e) + logger.error(`Error at PushToDynamoDB "${e}"`) } } -console.log("hello from DD") +console.log("--from DyanomoDb==") module.exports = pushToDynamoDb From 40ab9edc1add18ae00c368e327908c6dd61cf9db Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 13:14:09 +0530 Subject: [PATCH 09/27] Update config.yml --- .circleci/config.yml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index cb28e0f..a35499a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -41,15 +41,6 @@ build_steps: &build_steps command: | ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - - echo "Running Masterscript - deploy postgres-ifx-processer producer" - if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi From 8855c208b0434497f1590fbe161c7da93bff1a70 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 13:22:31 +0530 Subject: [PATCH 10/27] [skip ci] [skip ci] --- .circleci/config.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index a35499a..21db21d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,6 +42,16 @@ build_steps: &build_steps ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + echo "Running Masterscript - deploy postgres-ifx-processer producer" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar From 95d4addab94716897390c4a11e3466867c976010 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 14:33:19 +0530 Subject: [PATCH 11/27] [skip ci] [skip ci] --- .circleci/config.yml | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 21db21d..17ffa0f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,16 +42,7 @@ build_steps: &build_steps ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - - echo "Running Masterscript - deploy postgres-ifx-processer producer" - if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi - ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer - + echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer_dd-deployvar From 7f5cbc834db39aed79b52cb91ed6fbd03cd15d4f Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 14:34:31 +0530 Subject: [PATCH 12/27] [skip ci] [skip ci] --- src/services/pushToDynamoDb.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js index 1ea5a63..762f7ea 100644 --- a/src/services/pushToDynamoDb.js +++ b/src/services/pushToDynamoDb.js @@ -18,7 +18,7 @@ async function pushToDynamoDb(payload) { timestamp: Date.now() } } - var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); + var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1',convertEmptyValues: true}); docClient.put(params, function(err, data) { if (err) logger.error(err); else logger.info(data); From 436924121fccceb045fc8d31202d3ec6df0fad23 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 14:36:12 +0530 Subject: [PATCH 13/27] [skip ci] [skip ci] --- src/services/updateInformix.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/services/updateInformix.js b/src/services/updateInformix.js index 5098162..70c868d 100644 --- a/src/services/updateInformix.js +++ b/src/services/updateInformix.js @@ -28,11 +28,11 @@ async function updateInformix (payload) { sql = `update ${payload.payload.schema}:${payload.payload.table} set ${Object.keys(columns).map((key) => `${key}='${columns[key]}'`).join(', ')} where ${primaryKey}=${columns[primaryKey]};` // "update : set col_1=val_1, col_2=val_2, ... where primary_key_col=primary_key_val" } break - // case 'delete': - // { - // sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" - // } - // break + case 'delete': + { + sql = `delete from ${payload.payload.schema}:${payload.payload.table} where ${primaryKey}=${columns[primaryKey]};` // ""delete from :
where primary_key_col=primary_key_val" + } + break default: throw new Error(`Operation ${operation} is not supported`) } From f862ef09774680b4aaf4464bad8f233ca36cbacf Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Wed, 18 Dec 2019 14:36:59 +0530 Subject: [PATCH 14/27] Update config.yml --- .circleci/config.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 17ffa0f..c02740d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,6 +42,15 @@ build_steps: &build_steps ./awsconfiguration.sh ${DEPLOY_ENV} source awsenvconf + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer + + echo "Running Masterscript - deploy postgres-ifx-processer producer" + if [ -e ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar.json; fi + ./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer echo "Running Masterscript - deploy postgres-ifx-processer producer_dd" if [ -e ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-producer-deployvar.json; fi From 3dbfde725db247ff188ab34c285524344db3312d Mon Sep 17 00:00:00 2001 From: nkumar Date: Thu, 2 Jan 2020 13:53:18 +0000 Subject: [PATCH 15/27] log update Committer: nkumar --- src/consumer.js | 30 ++++++++++++++++++------------ src/producer.js | 15 +++++++++++++-- src/producer_dd.js | 17 ++++++++++++++--- src/services/auditTrail.js | 2 +- src/services/pushToDynamoDb.js | 4 ++-- 5 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/consumer.js b/src/consumer.js index 7270cd3..17c1f0f 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -41,33 +41,37 @@ const terminate = () => process.exit() * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -let message; -let cs_payloadseqid; +//let message; +//let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { +let cs_payloadseqid for (const m of messageSet) { // Process messages sequentially let message try { - //let message message = JSON.parse(m.message.value) -// cs_payloadseqid = message.payload.payloadseqid - //console.log(message); logger.debug('Received message from kafka:') - logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); + if (message.payload.payloadseqid) cs_payloadseqid = message.payload.payloadseqid; + logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success - await auditTrail([message.payload.payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, message.payload.operation,"Informix-updated","","","",message.payload.data, message.timestamp,message.topic],'consumer') } catch (err) { logger.error(`Could not process kafka message or informix DB error: "${err.message}"`) //logger.logFullError(err) + logger.debug(`error-sync: consumer "${err.message}"`) if (!cs_payloadseqid){ cs_payloadseqid= 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); - } + } await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') try { + var retryvar + if (message.payload['retryCount']) retryvar = message.payload.retryCount; await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish + await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { message.payload.retryCount = 0 @@ -75,22 +79,23 @@ async function dataHandler(messageSet, topic, partition) { } if (message.payload.retryCount >= config.KAFKA.maxRetry) { logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); - + logger.debug(`error-sync: consumer max-retry-limit reached`) let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) notifiyMessage.payload['recipients'] = config.KAFKA.recipients logger.debug('pushing following message on kafka error alert queue:') - logger.debug(notifiyMessage) + //logger.debug(notifiyMessage) await pushToKafka(notifiyMessage) return } message.payload['retryCount'] = message.payload.retryCount + 1; - //await auditTrail([message.payload.payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - // message.payload.operation,"Error",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') await pushToKafka(message) logger.debug(` After kafka push Retry Count "${message.payload.retryCount}"`) } catch (err) { + await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, + message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') logger.error("Error occured in re-publishing kafka message", err) + logger.debug(`error-sync: consumer re-publishing "${err.message}"`) } } } @@ -111,6 +116,7 @@ async function setupKafkaConsumer() { } catch (err) { logger.error('Could not setup kafka consumer') logger.logFullError(err) + logger.debug(`error-sync: consumer kafka-setup "${err.message}"`) terminate() } } diff --git a/src/producer.js b/src/producer.js index f5bc6b1..6a5be44 100644 --- a/src/producer.js +++ b/src/producer.js @@ -42,24 +42,35 @@ async function setupPgClient () { await pushToKafka(payload) } else { logger.debug('Ignoring message with incorrect topic or originator') + } await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer') } catch (error) { logger.error('Could not parse message payload') + logger.debug(`error-sync: producer parse message : "${error.message}"`) await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') logger.logFullError(error) } }) logger.info('pg-ifx-sync producer: Listening to notifications') } catch (err) { + logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) logger.error('Could not setup postgres client') logger.logFullError(err) + terminate() } } - +const terminate = () => process.exit() async function run () { - await setupPgClient() +try { + await setupPgClient() +} +catch(err) +{ +logger.debug(`Could not setup postgres client`) +logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) +} } run() diff --git a/src/producer_dd.js b/src/producer_dd.js index 7796abd..46226d6 100644 --- a/src/producer_dd.js +++ b/src/producer_dd.js @@ -4,6 +4,7 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') +//const pushToKafka = require('./services/pushToKafka') const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` @@ -12,6 +13,7 @@ const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 +//console.log(`pgConnectionString value = ${pgConnectionString}`) var pl_processid; var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); async function setupPgClient () { @@ -35,13 +37,14 @@ async function setupPgClient () { var pl_payload = JSON.stringify(payload.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - logger.info(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); await pushToDynamoDb(payload) } else { logger.debug('Ignoring message with incorrect topic or originator') } await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-DynamoDb","","","",pl_payload,pl_timestamp,pl_topic],'producer') } catch (error) { + logger.debug(`error-sync: producer_dynamoDb parse message : "${error.message}"`) logger.error('Could not parse message payload') await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation',"error-DynamoDB","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') logger.logFullError(error) @@ -50,14 +53,22 @@ await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation', logger.info('Producer DynamoDb: Listening to notifications') } catch (err) { logger.error('Could not setup postgres client') - logger.logFullError(err) + logger.debug(`error-sync: producer_dd postgres-setup 1 :"${err.message}"`) + //setup slack alert here + logger.logFullError(err) terminate() } } - +const terminate = () => process.exit() async function run () { +try{ await setupPgClient() } +catch(err){ +logger.debug(`Producer_dd: Could not setup postgres client`) +logger.debug(`error-sync: producer_dynamoDb postgres-setup 0 :"${err.message}"`) +//setup slackmessage here +}} run() diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index 5937517..58aa7ee 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -5,7 +5,7 @@ const logger = require('../common/logger') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` let pgClient2 -console.log(`"${pgConnectionString}"`); +//console.log(`"${pgConnectionString}"`); async function setupPgClient2 () { pgClient2 = new pg.Client(pgConnectionString) try { diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js index 1ea5a63..4b0fb50 100644 --- a/src/services/pushToDynamoDb.js +++ b/src/services/pushToDynamoDb.js @@ -18,7 +18,7 @@ async function pushToDynamoDb(payload) { timestamp: Date.now() } } - var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); + var docClient = new AWS.DynamoDB.DocumentClient({region: 'us-east-1',convertEmptyValues: true}); docClient.put(params, function(err, data) { if (err) logger.error(err); else logger.info(data); @@ -29,5 +29,5 @@ async function pushToDynamoDb(payload) { } } -console.log("--from DyanomoDb==") +console.log("--from DyanomoDb--") module.exports = pushToDynamoDb From 5865d47f8d79fb8d3607e1b6893c3d65a3dcbc5f Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 16:36:39 +0530 Subject: [PATCH 16/27] refactoring code --- .circleci/config.yml | 1 + package.json | 2 +- src/producer.js | 86 ++++++++++++++++++++++++-------------------- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c02740d..ca3986b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -103,6 +103,7 @@ workflows: branches: only: - dev-test-pg + - dev-test-pg-rf - "build-prod": context : org-global filters: diff --git a/package.json b/package.json index 3019890..f026db9 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,7 @@ "lint:fix": "standard --env mocha --fix", "producer": "node ./src/producer.js", "consumer": "node ./src/consumer.js", - "producer_dd": "node ./src/producer_dd.js", + "producer_dd": "node ./src/producer.js failover", "start": "npm run producer & npm run producer_dd & npm run consumer" }, "author": "Topcoder", diff --git a/src/producer.js b/src/producer.js index 6a5be44..978537e 100644 --- a/src/producer.js +++ b/src/producer.js @@ -5,6 +5,7 @@ const config = require('config') const pg = require('pg') const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') +const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) @@ -12,71 +13,80 @@ const auditTrail = require('./services/auditTrail'); const express = require('express') const app = express() const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) +const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { -//console.log(`${pgOptions.triggerFunctions}`); +//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) +async function setupPgClient() { + try { await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - //console.log(message); - try - { + // need to take care if empty message coming + pl_processid = message.processId + try { const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator + const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToKafka(payload) - } else { + if (isFailover) { + await pushToDynamoDb(payload) + } else { + await pushToKafka(payload) + audit(payload) + } + } else { logger.debug('Ignoring message with incorrect topic or originator') - + // push to slack - alertIt("slack message") } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-kafka","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { + } catch (error) { logger.error('Could not parse message payload') - logger.debug(`error-sync: producer parse message : "${error.message}"`) -await auditTrail([pl_randonseq,1111,'pl_table','pl_uniquecolumn','pl_operation',"error-producer","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) + logger.debug(`error-sync: producer parse message : "${error.message}"`) + //await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + logger.logFullError(error) + // push to slack - alertIt("slack message" } }) logger.info('pg-ifx-sync producer: Listening to notifications') } catch (err) { - logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) + logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) logger.error('Could not setup postgres client') logger.logFullError(err) terminate() } } + const terminate = () => process.exit() -async function run () { -try { - await setupPgClient() -} -catch(err) -{ -logger.debug(`Could not setup postgres client`) -logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) -} + +async function run() { + try { + await setupPgClient() + } + catch (err) { + logger.debug(`Could not setup postgres client`) + logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) + terminate() + } } +// execute run() +async function audit() { + var pl_seqid = payload.payload.payloadseqid + var pl_topic = payload.topic + var pl_table = payload.payload.table + var pl_uniquecolumn = payload.payload.Uniquecolumn + var pl_operation = payload.payload.operation + var pl_timestamp = payload.timestamp + var pl_payload = JSON.stringify(payload.payload) + logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') +} + app.get('/health', (req, res) => { - res.send('health ok') + res.send('health ok') }) app.listen(port, () => console.log(`app listening on port ${port}!`)) From 02f5826f70ba68a0f31a4d11557ad72cb85dea31 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 16:57:39 +0530 Subject: [PATCH 17/27] cleaning... --- src/producer.js | 11 ++++--- src/producer_dd.js | 79 ---------------------------------------------- 2 files changed, 6 insertions(+), 84 deletions(-) delete mode 100644 src/producer_dd.js diff --git a/src/producer.js b/src/producer.js index 978537e..a05a7dc 100644 --- a/src/producer.js +++ b/src/producer.js @@ -29,11 +29,11 @@ async function setupPgClient() { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { - if (isFailover) { - await pushToDynamoDb(payload) - } else { + if (!isFailover) { await pushToKafka(payload) audit(payload) + } else { + await pushToDynamoDb(payload) } } else { logger.debug('Ignoring message with incorrect topic or originator') @@ -42,7 +42,9 @@ async function setupPgClient() { } catch (error) { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) - //await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + } logger.logFullError(error) // push to slack - alertIt("slack message" } @@ -52,7 +54,6 @@ async function setupPgClient() { logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) logger.error('Could not setup postgres client') logger.logFullError(err) - terminate() } } diff --git a/src/producer_dd.js b/src/producer_dd.js deleted file mode 100644 index 46226d6..0000000 --- a/src/producer_dd.js +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Listens to DML trigger notifications from postgres and pushes the trigger data into kafka - */ -const config = require('config') -const pg = require('pg') -const logger = require('./common/logger') -//const pushToKafka = require('./services/pushToKafka') -const pushToDynamoDb = require('./services/pushToDynamoDb') -const pgOptions = config.get('POSTGRES') -const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` -const pgClient = new pg.Client(pgConnectionString) -const auditTrail = require('./services/auditTrail'); -const express = require('express') -const app = express() -const port = 3000 -//console.log(`pgConnectionString value = ${pgConnectionString}`) -var pl_processid; -var pl_randonseq = 'err-'+(new Date()).getTime().toString(36) + Math.random().toString(36).slice(2); -async function setupPgClient () { - try { - await pgClient.connect() - for (const triggerFunction of pgOptions.triggerFunctions) { - await pgClient.query(`LISTEN ${triggerFunction}`) - } - pgClient.on('notification', async (message) => { - //const payload = JSON.parse(message.payload); - pl_processid = message.processId; - try - { - const payload = JSON.parse(message.payload) - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) - const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator - if (validTopicAndOriginator) { - logger.debug(`Producer DynamoDb : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); - await pushToDynamoDb(payload) - } else { - logger.debug('Ignoring message with incorrect topic or originator') - } -await auditTrail([pl_seqid,pl_processid,pl_table,pl_uniquecolumn,pl_operation,"push-to-DynamoDb","","","",pl_payload,pl_timestamp,pl_topic],'producer') - } catch (error) { - logger.debug(`error-sync: producer_dynamoDb parse message : "${error.message}"`) - logger.error('Could not parse message payload') -await auditTrail([pl_randonseq,2222,'pl_table','pl_uniquecolumn','pl_operation',"error-DynamoDB","","",error.message,'pl_payload',new Date(),'pl_topic'],'producer') - logger.logFullError(error) - } - }) - logger.info('Producer DynamoDb: Listening to notifications') - } catch (err) { - logger.error('Could not setup postgres client') - logger.debug(`error-sync: producer_dd postgres-setup 1 :"${err.message}"`) - //setup slack alert here - logger.logFullError(err) - terminate() - } -} -const terminate = () => process.exit() -async function run () { -try{ - await setupPgClient() -} -catch(err){ -logger.debug(`Producer_dd: Could not setup postgres client`) -logger.debug(`error-sync: producer_dynamoDb postgres-setup 0 :"${err.message}"`) -//setup slackmessage here -}} - -run() - -app.get('/health', (req, res) => { - res.send('health ok') -}) -app.listen(port, () => console.log(`app listening on port ${port}!`)) - From ca055bd5fc5280f035d8ec86e76bb835c477960f Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 17:47:27 +0530 Subject: [PATCH 18/27] cleaning.... --- src/producer.js | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/producer.js b/src/producer.js index a05a7dc..b43b7a0 100644 --- a/src/producer.js +++ b/src/producer.js @@ -30,9 +30,12 @@ async function setupPgClient() { const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { if (!isFailover) { + logger.info('trying to push on kafka topic') await pushToKafka(payload) audit(payload) + logger.info('pushed to kafka and added for audit trail') } else { + logger.info('taking backup on dynamodb for reconciliation') await pushToDynamoDb(payload) } } else { @@ -42,17 +45,16 @@ async function setupPgClient() { } catch (error) { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) + logger.logFullError(error) if (!isFailover) { await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') } - logger.logFullError(error) // push to slack - alertIt("slack message" } }) - logger.info('pg-ifx-sync producer: Listening to notifications') + logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.') } catch (err) { - logger.debug(`error-sync: producer postgres-setup 1 :"${err.message}"`) - logger.error('Could not setup postgres client') + logger.error('Error in setting up postgres client: ', err.message) logger.logFullError(err) terminate() } @@ -61,14 +63,8 @@ async function setupPgClient() { const terminate = () => process.exit() async function run() { - try { - await setupPgClient() - } - catch (err) { - logger.debug(`Could not setup postgres client`) - logger.debug(`error-sync: producer postgres-setup 0 :"${err.message}"`) - terminate() - } + logger.debug("Initialising producer setup...") + await setupPgClient() } // execute From 2994830597c39117dd0f975ecc825bddffa3d256 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 18:07:11 +0530 Subject: [PATCH 19/27] cleaning... --- src/producer.js | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/producer.js b/src/producer.js index b43b7a0..d5563de 100644 --- a/src/producer.js +++ b/src/producer.js @@ -14,8 +14,6 @@ const express = require('express') const app = express() const port = 3000 const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false -var pl_processid; -//var pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) async function setupPgClient() { try { await pgClient.connect() @@ -23,8 +21,6 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - // need to take care if empty message coming - pl_processid = message.processId try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -32,7 +28,7 @@ async function setupPgClient() { if (!isFailover) { logger.info('trying to push on kafka topic') await pushToKafka(payload) - audit(payload) + audit(message) logger.info('pushed to kafka and added for audit trail') } else { logger.info('taking backup on dynamodb for reconciliation') @@ -70,14 +66,16 @@ async function run() { // execute run() -async function audit() { - var pl_seqid = payload.payload.payloadseqid - var pl_topic = payload.topic - var pl_table = payload.payload.table - var pl_uniquecolumn = payload.payload.Uniquecolumn - var pl_operation = payload.payload.operation - var pl_timestamp = payload.timestamp - var pl_payload = JSON.stringify(payload.payload) +async function audit(message) { + const pl_processid = message.processId + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } From 3802ff4c8062f597b4927e623052569d3e11fddf Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 19:01:18 +0530 Subject: [PATCH 20/27] cleaning... --- src/producer.js | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/producer.js b/src/producer.js index d5563de..e6fd61c 100644 --- a/src/producer.js +++ b/src/producer.js @@ -21,6 +21,7 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -28,12 +29,12 @@ async function setupPgClient() { if (!isFailover) { logger.info('trying to push on kafka topic') await pushToKafka(payload) - audit(message) logger.info('pushed to kafka and added for audit trail') } else { logger.info('taking backup on dynamodb for reconciliation') await pushToDynamoDb(payload) } + audit(message) } else { logger.debug('Ignoring message with incorrect topic or originator') // push to slack - alertIt("slack message") @@ -43,7 +44,7 @@ async function setupPgClient() { logger.debug(`error-sync: producer parse message : "${error.message}"`) logger.logFullError(error) if (!isFailover) { - await auditTrail([pl_randonseq, 1111, 'pl_table', 'pl_uniquecolumn', 'pl_operation', "error-producer", "", "", error.message, 'pl_payload', new Date(), 'pl_topic'], 'producer') + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", error.message, "", new Date(), ""], 'producer') } // push to slack - alertIt("slack message" } @@ -52,6 +53,7 @@ async function setupPgClient() { } catch (err) { logger.error('Error in setting up postgres client: ', err.message) logger.logFullError(err) + // push to slack - alertIt("slack message") terminate() } } @@ -70,16 +72,27 @@ async function audit(message) { const pl_processid = message.processId const payload = JSON.parse(message.payload) const pl_seqid = payload.payload.payloadseqid - const pl_topic = payload.topic + const pl_topic = payload.topic // TODO can move in config ? const pl_table = payload.payload.table const pl_uniquecolumn = payload.payload.Uniquecolumn const pl_operation = payload.payload.operation const pl_timestamp = payload.timestamp const pl_payload = JSON.stringify(payload.payload) - logger.debug(`producer : ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`); + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } +function alertIt(message) { + /** + // call slack + */ +} + app.get('/health', (req, res) => { res.send('health ok') }) From acc2e3f130367f7e9ef24903cae47b0280fafdb3 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 3 Jan 2020 19:19:47 +0530 Subject: [PATCH 21/27] rearranging audit code.. --- src/producer.js | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/src/producer.js b/src/producer.js index e6fd61c..85d3f53 100644 --- a/src/producer.js +++ b/src/producer.js @@ -21,7 +21,6 @@ async function setupPgClient() { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { - const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) try { const payload = JSON.parse(message.payload) const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator @@ -43,9 +42,7 @@ async function setupPgClient() { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) logger.logFullError(error) - if (!isFailover) { - await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", error.message, "", new Date(), ""], 'producer') - } + audit(error) // push to slack - alertIt("slack message" } }) @@ -70,21 +67,28 @@ run() async function audit(message) { const pl_processid = message.processId - const payload = JSON.parse(message.payload) - const pl_seqid = payload.payload.payloadseqid - const pl_topic = payload.topic // TODO can move in config ? - const pl_table = payload.payload.table - const pl_uniquecolumn = payload.payload.Uniquecolumn - const pl_operation = payload.payload.operation - const pl_timestamp = payload.timestamp - const pl_payload = JSON.stringify(payload.payload) - const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` - if (!isFailover) { - logger.debug(`producer : ${logMessage}`); + if (pl_processid != 'undefined') { + const payload = JSON.parse(message.payload) + const pl_seqid = payload.payload.payloadseqid + const pl_topic = payload.topic // TODO can move in config ? + const pl_table = payload.payload.table + const pl_uniquecolumn = payload.payload.Uniquecolumn + const pl_operation = payload.payload.operation + const pl_timestamp = payload.timestamp + const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + if (!isFailover) { + logger.debug(`producer : ${logMessage}`); + } else { + logger.debug(`Producer DynamoDb : ${logMessage}`); + } + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } else { - logger.debug(`Producer DynamoDb : ${logMessage}`); + const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) + if (!isFailover) { + await auditTrail([pl_randonseq, 1111, "", "", "", "error-producer", "", "", message.message, "", new Date(), ""], 'producer') + } } - auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') } function alertIt(message) { From 2a96987b2bb264807ffe591b9bc9a71671bf36c4 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Sat, 4 Jan 2020 13:10:13 +0530 Subject: [PATCH 22/27] [skip ci] [skip ci] --- pg-identity-func-trig-seq.sql | 280 ++++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 pg-identity-func-trig-seq.sql diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql new file mode 100644 index 0000000..e8069a2 --- /dev/null +++ b/pg-identity-func-trig-seq.sql @@ -0,0 +1,280 @@ +SET search_path TO common_oltp; + +CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + payload_items TEXT[]; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + -- when + -- column_name = 'achievement_date' then + --column_value := (select to_date (column_value, 'MM/DD/YYYY')); + --column_value := (select to_date (column_value)); + --when + --column_name = 'password' then + --column_value := regexp_replace(column_value, '\s', '', 'g'); + --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g'); + else + -- RAISE NOTICE ' not boolean'; + end case; + payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'test.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": {' || array_to_string(payload_items, ',') || '}' + || '}}'; + + -- Notify the channel + PERFORM pg_notify('test_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + +CREATE TRIGGER "pg_email_trigger" + AFTER INSERT OR DELETE OR UPDATE ON email + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'email_id', 'email_type_id', 'address', 'primary_ind', 'status_id'); + +CREATE TRIGGER "pg_security_user_trigger" + AFTER INSERT OR DELETE OR UPDATE ON security_user + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('login_id', 'user_id', 'password', 'create_user_id'); + + +CREATE TRIGGER "pg_user_achievement_trigger" + AFTER INSERT OR DELETE OR UPDATE ON user_achievement + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'achievement_date', 'achievement_type_id', 'description', 'create_date'); + +CREATE TRIGGER "pg_user_group_xref_trigger" + AFTER INSERT OR DELETE OR UPDATE ON user_group_xref + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_group_id', 'login_id', 'group_id', 'create_user_id', 'security_status_id'); + +CREATE TRIGGER "pg_user_trigger" + AFTER INSERT OR DELETE OR UPDATE ON user + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'first_name', 'last_name', 'handle', 'status', 'activation_code', 'reg_source', 'utm_source', 'utm_medium', 'utm_campaign'); + +--drop SEQUENCE sequence_user_group_seq; +CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START +WITH 600000000 NO CYCLE; + + +--drop SEQUENCE sequence_email_seq; +CREATE SEQUENCE sequence_email_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START +WITH 60000000 NO CYCLE; + + +SET search_path TO informixoltp; + +CREATE OR REPLACE FUNCTION "informixoltp"."notify_trigger_informixoltp" () RETURNS trigger + VOLATILE +AS $body$ +DECLARE + rec RECORD; + payload TEXT; + column_name TEXT; + column_value TEXT; + payload_items TEXT[]; + uniquecolumn TEXT; + logtime TEXT; + payloadseqid INTEGER; +BEGIN + CASE TG_OP + WHEN 'INSERT', 'UPDATE' THEN + rec := NEW; + WHEN 'DELETE' THEN + rec := OLD; + ELSE + RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; + END CASE; + raise notice 'table name : %', TG_TABLE_NAME; + RAISE info 'hello world'; + -- Get required fields + FOREACH column_name IN ARRAY TG_ARGV LOOP + EXECUTE format('SELECT $1.%I::TEXT', column_name) + INTO column_value + USING rec; + case + when + column_name = 'upload_document' then + -- RAISE NOTICE 'upload_document boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'upload_document_required' then + -- RAISE NOTICE 'upload_document_required boolean'; + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_email_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'identify_handle_enabled' then + if column_value = 'false' then + column_value = '0'; + else + column_value = '1'; + end if; + when + column_name = 'create_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + when + column_name = 'modify_date' then + column_value := (select to_char (column_value::timestamp, 'YYYY-MM-DD HH24:MI:SS.MS')); + -- when + -- column_name = 'achievement_date' then + --column_value := (select to_date (column_value, 'MM/DD/YYYY')); + --column_value := (select to_date (column_value)); + --when + --column_name = 'password' then + --column_value := regexp_replace(column_value, '\s', '', 'g'); + --column_value := regexp_replace(column_value, E'[\\n\\r]+', '\n\r', 'g'); + else + -- RAISE NOTICE ' not boolean'; + end case; + payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); + END LOOP; + --logtime := (select date_display_tz()); + logtime := (SELECT to_char (now()::timestamptz at time zone 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')); + payloadseqid := (select nextval('payloadsequence'::regclass)); + + uniquecolumn := (SELECT c.column_name + FROM information_schema.key_column_usage AS c + LEFT JOIN information_schema.table_constraints AS t + ON t.constraint_name = c.constraint_name + WHERE t.table_name = TG_TABLE_NAME AND t.constraint_type = 'PRIMARY KEY' LIMIT 1); + + if (uniquecolumn = '') IS NOT FALSE then + uniquecolumn := 'Not-Available'; + end if; + + -- Build the payload + payload := '' + || '{' + || '"topic":"' || 'test.db.postgres.sync' || '",' + || '"originator":"' || 'tc-postgres-delta-processor' || '",' + || '"timestamp":"' || logtime || '",' + || '"mime-type":"' || 'application/json' || '",' + || '"payload": {' + || '"payloadseqid":"' || payloadseqid || '",' + || '"Uniquecolumn":"' || uniquecolumn || '",' + || '"operation":"' || TG_OP || '",' + || '"schema":"' || TG_TABLE_SCHEMA || '",' + || '"table":"' || TG_TABLE_NAME || '",' + || '"data": {' || array_to_string(payload_items, ',') || '}' + || '}}'; + + -- Notify the channel + PERFORM pg_notify('test_db_notifications', payload); + + RETURN rec; +END; +$body$ LANGUAGE plpgsql; + + +CREATE TRIGGER "pg_algo_rating" + AFTER INSERT OR DELETE OR UPDATE ON algo_rating + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'rating', 'vol', 'round_id', 'num_ratings', 'algo_rating_type_id', 'modify_date'); + +CREATE TRIGGER "pg_coder" + AFTER INSERT OR DELETE OR UPDATE ON coder + FOR EACH ROW +EXECUTE PROCEDURE notify_trigger_informixoltp('coder_id', 'quote', 'coder_type_id', 'comp_country_code', 'display_quote', 'quote_location', 'quote_color', 'display_banner', 'banner_style'); From 96b4bec90eecc71bf20039ec9b40793d1811bacc Mon Sep 17 00:00:00 2001 From: nkumar Date: Tue, 7 Jan 2020 15:34:36 +0000 Subject: [PATCH 23/27] slack integration Committer: nkumar --- config/default.js | 5 ++++ src/consumer.js | 59 +++++++++++++++++++++++++++++-------- src/producer.js | 49 ++++++++++++++++++++++-------- src/services/posttoslack.js | 42 ++++++++++++++++++++++++++ 4 files changed, 130 insertions(+), 25 deletions(-) create mode 100644 src/services/posttoslack.js diff --git a/config/default.js b/config/default.js index dd31193..03cef82 100644 --- a/config/default.js +++ b/config/default.js @@ -38,6 +38,11 @@ module.exports = { errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error', recipients: ['admin@abc.com'] // Kafka partitions to use }, + SLACK: { + URL: process.env.SLACKURL || 'us-east-1', + SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', + SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' + }, AUTH0_URL: process.env.AUTH0_URL , AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , diff --git a/src/consumer.js b/src/consumer.js index 17c1f0f..3d27497 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -9,6 +9,7 @@ const pushToKafka = require('./services/pushToKafka') const healthcheck = require('topcoder-healthcheck-dropin'); const auditTrail = require('./services/auditTrail'); const kafkaOptions = config.get('KAFKA') +const postMessage = require('./services/posttoslack') const isSslEnabled = kafkaOptions.SSL && kafkaOptions.SSL.cert && kafkaOptions.SSL.key const consumer = new Kafka.SimpleConsumer({ connectionString: kafkaOptions.brokers_url, @@ -20,7 +21,6 @@ const consumer = new Kafka.SimpleConsumer({ }) }) - const check = function () { if (!consumer.client.initialBrokers && !consumer.client.initialBrokers.length) { return false; @@ -41,7 +41,7 @@ const terminate = () => process.exit() * @param {String} topic The name of the message topic * @param {Number} partition The kafka partition to which messages are written */ -//let message; +var retryvar=""; //let cs_payloadseqid; async function dataHandler(messageSet, topic, partition) { let cs_payloadseqid @@ -54,10 +54,13 @@ let cs_payloadseqid logger.debug(`consumer : ${message.payload.payloadseqid} ${message.payload.table} ${message.payload.Uniquecolumn} ${message.payload.operation} ${message.timestamp} `); await updateInformix(message) await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit offset only on success + if (message.payload['retryCount']) retryvar = message.payload.retryCount; auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, - message.payload.operation,"Informix-updated","","","",message.payload.data, message.timestamp,message.topic],'consumer') + message.payload.operation,"Informix-updated",retryvar,"","",message.payload.data, message.timestamp,message.topic],'consumer') } catch (err) { - logger.error(`Could not process kafka message or informix DB error: "${err.message}"`) + const errmsg2 = `Could not process kafka message or informix DB error: "${err.message}"` + logger.error(errmsg2) + //await callposttoslack(errmsg2) //logger.logFullError(err) logger.debug(`error-sync: consumer "${err.message}"`) if (!cs_payloadseqid){ @@ -67,40 +70,70 @@ let cs_payloadseqid await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', 'message.payload.operation',"Error-Consumer","",err.message,"",'message.payload.data',new Date(),'message.topic'],'consumer') try { - var retryvar - if (message.payload['retryCount']) retryvar = message.payload.retryCount; + //var retryvar + if (message.payload['retryCount']) retryvar = message.payload.retryCount; await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') - logger.debug(`Trying to push same message after adding retryCounter`) + //await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`) + logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { message.payload.retryCount = 0 logger.debug('setting retry counter to 0 and max try count is : ', config.KAFKA.maxRetry); } if (message.payload.retryCount >= config.KAFKA.maxRetry) { logger.debug('Recached at max retry counter, sending it to error queue: ', config.KAFKA.errorTopic); - logger.debug(`error-sync: consumer max-retry-limit reached`) - let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) + logger.debug(`error-sync: consumer max-retry-limit reached`) + // push to slack - alertIt("slack message" + await callposttoslack(`error-sync: postgres-ifx-processor : consumer max-retry-limit reached: "${message.payload.table}": payloadseqid : "${cs_payloadseqid}"`) + let notifiyMessage = Object.assign({}, message, { topic: config.KAFKA.errorTopic }) notifiyMessage.payload['recipients'] = config.KAFKA.recipients logger.debug('pushing following message on kafka error alert queue:') //logger.debug(notifiyMessage) - await pushToKafka(notifiyMessage) + await pushToKafka(notifiyMessage) return } message.payload['retryCount'] = message.payload.retryCount + 1; await pushToKafka(message) - logger.debug(` After kafka push Retry Count "${message.payload.retryCount}"`) + var errmsg9 = `Retry for Kafka push : retrycount : "${message.payload.retryCount}" : "${cs_payloadseqid}"` + logger.debug(errmsg9) + //await callposttoslack(errmsg9) } catch (err) { await auditTrail([cs_payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn, message.payload.operation,"Error-republishing",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer') - logger.error("Error occured in re-publishing kafka message", err) - logger.debug(`error-sync: consumer re-publishing "${err.message}"`) + const errmsg1 = `postgres-ifx-processor: consumer : Error-republishing: "${err.message}"` + logger.error(errmsg1) + logger.debug(`error-sync: consumer re-publishing "${err.message}"`) + // push to slack - alertIt("slack message" + await callposttoslack(errmsg1) } } } } +async function callposttoslack(slackmessage) { +if(config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end +} + +} /** * Initialize kafka consumer diff --git a/src/producer.js b/src/producer.js index 85d3f53..6108c6f 100644 --- a/src/producer.js +++ b/src/producer.js @@ -7,6 +7,7 @@ const logger = require('./common/logger') const pushToKafka = require('./services/pushToKafka') const pushToDynamoDb = require('./services/pushToDynamoDb') const pgOptions = config.get('POSTGRES') +const postMessage = require('./services/posttoslack') const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` const pgClient = new pg.Client(pgConnectionString) const auditTrail = require('./services/auditTrail'); @@ -14,23 +15,27 @@ const express = require('express') const app = express() const port = 3000 const isFailover = process.argv[2] != undefined ? (process.argv[2] === 'failover' ? true : false) : false + async function setupPgClient() { - try { +var payloadcopy +try { await pgClient.connect() for (const triggerFunction of pgOptions.triggerFunctions) { await pgClient.query(`LISTEN ${triggerFunction}`) } pgClient.on('notification', async (message) => { try { + payloadcopy = "" const payload = JSON.parse(message.payload) + payloadcopy = message const validTopicAndOriginator = (pgOptions.triggerTopics.includes(payload.topic)) && (pgOptions.triggerOriginators.includes(payload.originator)) // Check if valid topic and originator if (validTopicAndOriginator) { if (!isFailover) { - logger.info('trying to push on kafka topic') + //logger.info('trying to push on kafka topic') await pushToKafka(payload) - logger.info('pushed to kafka and added for audit trail') + logger.info('Push to kafka and added for audit trail') } else { - logger.info('taking backup on dynamodb for reconciliation') + logger.info('Push to dynamodb for reconciliation') await pushToDynamoDb(payload) } audit(message) @@ -41,16 +46,20 @@ async function setupPgClient() { } catch (error) { logger.error('Could not parse message payload') logger.debug(`error-sync: producer parse message : "${error.message}"`) - logger.logFullError(error) + const errmsg1 = `postgres-ifx-processor: producer or dd : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"` + logger.logFullError(error) audit(error) // push to slack - alertIt("slack message" + await callposttoslack(errmsg1) } }) logger.info('pg-ifx-sync producer: Listening to pg-trigger channel.') } catch (err) { - logger.error('Error in setting up postgres client: ', err.message) + const errmsg = `postgres-ifx-processor: producer or dd : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) logger.logFullError(err) // push to slack - alertIt("slack message") + await callposttoslack(errmsg) terminate() } } @@ -62,6 +71,28 @@ async function run() { await setupPgClient() } +async function callposttoslack(slackmessage) { +if(config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 =`Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } + else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end +} + +} // execute run() @@ -91,12 +122,6 @@ async function audit(message) { } } -function alertIt(message) { - /** - // call slack - */ -} - app.get('/health', (req, res) => { res.send('health ok') }) diff --git a/src/services/posttoslack.js b/src/services/posttoslack.js new file mode 100644 index 0000000..f5cf15a --- /dev/null +++ b/src/services/posttoslack.js @@ -0,0 +1,42 @@ +const config = require('config'); +const zlib = require('zlib'); +const url = require('url'); +const https = require('https'); +hookUrl = config.SLACK.URL +slackChannel = config.SLACK.SLACKCHANNEL +async function postMessage(message, callback) { + var slackMessage = { + channel: `${slackChannel}`, + text: `${message}`, + } + const body = JSON.stringify(slackMessage); + const options = url.parse(hookUrl); + options.method = 'POST'; + options.headers = { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }; + return new Promise(function (resolve, reject) { + const postReq = https.request(options, (res) => { + const chunks = []; + res.setEncoding('utf8'); + res.on('data', (chunk) => chunks.push(chunk)); + res.on('end', () => { + if (callback) { + callback({ + body: chunks.join(''), + statusCode: res.statusCode, + statusMessage: res.statusMessage, + }); + } + resolve(res) + }); + // return res; + }); + + postReq.write(body); + postReq.end(); + }) +} + +module.exports = postMessage From cdbdfbd4f476031292c3ca173557ff90bc77208d Mon Sep 17 00:00:00 2001 From: nkumar Date: Sat, 11 Jan 2020 06:37:48 +0000 Subject: [PATCH 24/27] reconsiler check-in Committer: nkumar --- config/default.js | 5 ++ src/consumer.js | 4 +- src/producer.js | 2 +- src/reconsiler-audit.js | 126 +++++++++++++++++++++++++++++++++++++ src/services/auditTrail.js | 1 + 5 files changed, 135 insertions(+), 3 deletions(-) create mode 100644 src/reconsiler-audit.js diff --git a/config/default.js b/config/default.js index 03cef82..9368337 100644 --- a/config/default.js +++ b/config/default.js @@ -43,6 +43,11 @@ module.exports = { SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator', SLACKNOTIFY: process.env.SLACKNOTIFY || 'false' }, + RECONSILER:{ + RECONSILER_START: process.env.RECONSILER_START || 10, + RECONSILER_END: process.env.RECONSILER_END || 5, + RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' + }, AUTH0_URL: process.env.AUTH0_URL , AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , diff --git a/src/consumer.js b/src/consumer.js index 3d27497..c5d1e9b 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -73,8 +73,8 @@ let cs_payloadseqid //var retryvar if (message.payload['retryCount']) retryvar = message.payload.retryCount; await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish - await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', - 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') + // await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn', + // 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer') //await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`) logger.debug(`Trying to push same message after adding retryCounter`) if (!message.payload.retryCount) { diff --git a/src/producer.js b/src/producer.js index 6108c6f..983de78 100644 --- a/src/producer.js +++ b/src/producer.js @@ -113,7 +113,7 @@ async function audit(message) { } else { logger.debug(`Producer DynamoDb : ${logMessage}`); } - auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer') + auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer') } else { const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2) if (!isFailover) { diff --git a/src/reconsiler-audit.js b/src/reconsiler-audit.js new file mode 100644 index 0000000..0c44570 --- /dev/null +++ b/src/reconsiler-audit.js @@ -0,0 +1,126 @@ +const config = require('config') +const pg = require('pg') +const logger = require('./common/logger') +const pushToKafka = require('./services/pushToKafka') +const pgOptions = config.get('POSTGRES') +const postMessage = require('./services/posttoslack') +const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}` +const pgClient = new pg.Client(pgConnectionString) +const auditTrail = require('./services/auditTrail'); +const port = 3000 + +async function setupPgClient() { + var payloadcopy + try { + await pgClient.connect() + //sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated') + //and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') "; + //>= 12.53 and <= 12.48 + rec_d_start = config.RECONSILER.RECONSILER_START + rec_d_end = config.RECONSILER.RECONSILER_END + rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE + var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end]; + //var paramvalues = ['push-to-kafka','60002027']; + //sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' + sql1 = " select seq_id, payloadseqid,auditdatetime at time zone 'utc' at time zone 'Asia/Calcutta',syncstatus, payload where pgifx_sync_audit.syncstatus =($1)" + sql2 = " and pgifx_sync_audit.auditdatetime >= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($2)" + sql3 = " and pgifx_sync_audit.auditdatetime <= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($3)" + sql = sql1 + sql2 + sql3 + console.log('sql ', sql) + // sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)' + // sql4 = " and pgifx_sync_audit.payloadseqid = ($2)" + + //sql = sql3 + sql4 + //const result = await pgClient.query(sql1, (err, res) => { + await pgClient.query(sql,paramvalues, async (err,result) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` + logger.debug (errmsg0) + // await callposttoslack(errmsg0) + } + else{ + console.log("Reconsiler Rowcount = ", result.rows.length) + for (var i = 0; i < result.rows.length; i++) { + for(var columnName in result.rows[i]) { + // console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]); + //if ((columnName === 'seq_id') || (columnName === 'payload')){ + if ((columnName === 'payload')){ + var reconsiler_payload = result.rows[i][columnName] + } + }//column for loop + try { + var s_payload = reconsiler_payload + payload = JSON.parse(s_payload) + payload1 = payload.payload + await pushToKafka(payload1) + logger.info('Reconsiler : Push to kafka and added for audit trail') + audit(s_payload) + } catch (error) { + logger.error('Reconsiler: Could not parse message payload') + logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`) + const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"` + logger.logFullError(error) + audit(error) + await callposttoslack(errmsg1) + } + }//result for loop + } + pgClient.end() + }) + }catch (err) { + const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"` + logger.error(errmsg) + logger.logFullError(err) + await callposttoslack(errmsg) + terminate() + } +} + +const terminate = () => process.exit() + +async function run() { + logger.debug("Initialising Reconsiler setup...") + await setupPgClient() +} + +async function callposttoslack(slackmessage) { + if (config.SLACK.SLACKNOTIFY === 'true') { + return new Promise(function (resolve, reject) { + postMessage(slackmessage, (response) => { + console.log(`respnse : ${response}`) + if (response.statusCode < 400) { + logger.debug('Message posted successfully'); + //callback(null); + } else if (response.statusCode < 500) { + const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}` + logger.debug(`error-sync: ${errmsg1}`) + } else { + logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + //callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`); + } + resolve("done") + }); + }) //end + } + +} +// execute +run() + +async function audit(message) { + const pl_processid = 5555 + const jsonpayload = JSON.parse(message) + payload = JSON.parse(jsonpayload.payload) + payload1 = payload.payload + const pl_seqid = payload1.payloadseqid + const pl_topic = payload1.topic // TODO can move in config ? + const pl_table = payload1.table + const pl_uniquecolumn = payload1.Uniquecolumn + const pl_operation = payload1.operation + const pl_timestamp = payload1.timestamp + //const pl_payload = JSON.stringify(payload.payload) + const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}` + logger.debug(`reconsiler : ${logMessage}`); + //await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer') +} + diff --git a/src/services/auditTrail.js b/src/services/auditTrail.js index 58aa7ee..3f45f95 100644 --- a/src/services/auditTrail.js +++ b/src/services/auditTrail.js @@ -45,6 +45,7 @@ if (sourcetype === 'producer'){ // logger.debug(`--Audit Trail update success-- `) } }) +pgClient2.end() } From 32a4ded13dac435602cb6807a7d7da282b9b730c Mon Sep 17 00:00:00 2001 From: nkumar Date: Sat, 11 Jan 2020 09:24:39 +0000 Subject: [PATCH 25/27] dynamodb payload, config changes Committer: nkumar --- config/default.js | 4 ++++ src/services/pushToDynamoDb.js | 9 +++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/config/default.js b/config/default.js index 9368337..d22bb48 100644 --- a/config/default.js +++ b/config/default.js @@ -48,6 +48,10 @@ module.exports = { RECONSILER_END: process.env.RECONSILER_END || 5, RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm' }, + DYNAMODB: + { + DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync' + }, AUTH0_URL: process.env.AUTH0_URL , AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE , diff --git a/src/services/pushToDynamoDb.js b/src/services/pushToDynamoDb.js index 4b0fb50..78d995d 100644 --- a/src/services/pushToDynamoDb.js +++ b/src/services/pushToDynamoDb.js @@ -4,13 +4,11 @@ const _ = require('lodash') var AWS = require("aws-sdk"); async function pushToDynamoDb(payload) { try { console.log('----Push To DynomoDB -------'); - // console.log(payload) - p_dd_payloadseqid = payload.payload.payloadseqid; var params = { - TableName: 'test_pg_ifx_payload_sync', + TableName: config.DYNAMODB.DYNAMODB_TABLE, Item: { payloadseqid: payload.payload.payloadseqid, - pl_document: payload.payload, + pl_document: payload, pl_table: payload.payload.table, pl_uniquecolumn: payload.payload.Uniquecolumn, pl_operation: payload.payload.operation, @@ -25,9 +23,8 @@ async function pushToDynamoDb(payload) { }); } catch (e) { - logger.error(`Error at PushToDynamoDB "${e}"`) + logger.error(`error-sync: Error at PushToDynamoDB "${e}"`) } } -console.log("--from DyanomoDb--") module.exports = pushToDynamoDb From 3e81f9bca9f4b0072e591df14524f678dfc02636 Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Sat, 11 Jan 2020 16:35:11 +0530 Subject: [PATCH 26/27] [skip ci] [skip ci] --- pg-identity-func-trig-seq.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql index e8069a2..9baa04a 100644 --- a/pg-identity-func-trig-seq.sql +++ b/pg-identity-func-trig-seq.sql @@ -137,18 +137,18 @@ CREATE TRIGGER "pg_user_group_xref_trigger" EXECUTE PROCEDURE notify_trigger_common_oltp('user_group_id', 'login_id', 'group_id', 'create_user_id', 'security_status_id'); CREATE TRIGGER "pg_user_trigger" - AFTER INSERT OR DELETE OR UPDATE ON user + AFTER INSERT OR DELETE OR UPDATE ON "user" FOR EACH ROW EXECUTE PROCEDURE notify_trigger_common_oltp('user_id', 'first_name', 'last_name', 'handle', 'status', 'activation_code', 'reg_source', 'utm_source', 'utm_medium', 'utm_campaign'); --drop SEQUENCE sequence_user_group_seq; CREATE SEQUENCE sequence_user_group_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START -WITH 600000000 NO CYCLE; +WITH 601000000 NO CYCLE; --drop SEQUENCE sequence_email_seq; CREATE SEQUENCE sequence_email_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START -WITH 60000000 NO CYCLE; +WITH 70100000 NO CYCLE; SET search_path TO informixoltp; From acc099da40fdc510df6c3086d5dadf433f258c4c Mon Sep 17 00:00:00 2001 From: nkumar-topcoder <33625707+nkumar-topcoder@users.noreply.github.com> Date: Sat, 11 Jan 2020 16:42:56 +0530 Subject: [PATCH 27/27] [skip ci] [skip ci] --- pg-identity-func-trig-seq.sql | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pg-identity-func-trig-seq.sql b/pg-identity-func-trig-seq.sql index 9baa04a..6679c67 100644 --- a/pg-identity-func-trig-seq.sql +++ b/pg-identity-func-trig-seq.sql @@ -1,5 +1,24 @@ SET search_path TO common_oltp; +CREATE TABLE + pgifx_sync_audit + ( + seq_id SERIAL NOT NULL, + payloadseqid CHARACTER VARYING, + processid INTEGER, + tablename CHARACTER VARYING(64), + uniquecolumn CHARACTER VARYING(64), + dboperation CHARACTER VARYING(32), + syncstatus CHARACTER VARYING(64), + retrycount CHARACTER VARYING(64), + consumer_err CHARACTER VARYING, + producer_err CHARACTER VARYING, + payload CHARACTER VARYING, + auditdatetime TIMESTAMP(6) WITHOUT TIME ZONE, + topicname CHARACTER VARYING(64), + UNIQUE (payloadseqid) + ); + CREATE OR REPLACE FUNCTION "common_oltp"."notify_trigger_common_oltp" () RETURNS trigger VOLATILE AS $body$