Skip to content

merge for new reconsiler2 to master #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ build_steps: &build_steps
#source buildenvvar
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer

./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
#./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
#source buildenvvar
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer


#echo "Running Masterscript - deploy postgres-ifx-processer producer"
Expand Down Expand Up @@ -88,11 +88,11 @@ build_steps: &build_steps
#source buildenvvar
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer

#echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2"
#if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
#./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar
#source buildenvvar
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2"
if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer

jobs:
# Build & Deploy against development backend #
Expand Down
10 changes: 5 additions & 5 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ module.exports = {
password: process.env.PG_PASSWORD || 'password',
port: parseInt(process.env.PG_PORT, 10) || 5432,

triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'prod_db_notifications', // List of trigger functions to listen to
triggerTopics: process.env.TRIGGER_TOPICS || ['prod.db.postgres.sync'], // Names of the topic in the trigger payload
triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'dev_db_notifications', // List of trigger functions to listen to
triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload
triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload
},
KAFKA: { // Kafka connection options
Expand All @@ -39,7 +39,7 @@ module.exports = {
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
recipients: ['[email protected]'], // Kafka partitions to use,
KAFKA_URL: process.env.KAFKA_URL,
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'prod-postgres-ifx-consumer',
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'dev-postgres-ifx-consumer',
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
},
Expand All @@ -49,13 +49,13 @@ module.exports = {
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
},
RECONSILER: {
RECONSILER_START: process.env.RECONSILER_START || 5,
RECONSILER_START: process.env.RECONSILER_START || 30,
RECONSILER_END: process.env.RECONSILER_END || 1,
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
},
DYNAMODB:
{
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'prod_pg_ifx_payload_sync',
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync',
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
},

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"consumer": "node ./src/consumer.js",
"producer_dd": "node ./src/producer.js failover",
"reconsiler1": "node ./src/reconsiler-audit.js",
"reconsiler2": "node ./src/reconsiler-dd.js",
"reconsiler2": "node ./src/reconsiler-dd-new.js",
"start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1 & npm run reconsiler2 & npm run producer_channel_2"
},
"author": "Topcoder",
Expand Down
3 changes: 3 additions & 0 deletions src/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const postMessage = require('./services/posttoslack')
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
const pgClient = new pg.Client(pgConnectionString)
const auditTrail = require('./services/auditTrail');
const paudit_dd = require('./services/producer_audit_dd')
const express = require('express')
const app = express()
const port = 3000
Expand Down Expand Up @@ -41,6 +42,8 @@ async function setupPgClient() {
} else {
logger.info('Push to dynamodb for reconciliation')
await pushToDynamoDb(payload)
logger.info('Push to producer_audit_dd for reconciliation')
await paudit_dd.pushToAuditDD(message)
}

} else {
Expand Down
155 changes: 155 additions & 0 deletions src/reconsiler-dd-new.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
const logger = require('./common/logger')
const config = require('config')
const pg = require('pg')
const pgOptions = config.get('POSTGRES')
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
const pgClient = new pg.Client(pgConnectionString)
const kafkaService = require('./services/pushToDirectKafka')
const postMessage = require('./services/posttoslack')
const pushToDynamoDb = require('./services/pushToDynamoDb')
const auditTrail = require('./services/auditTrail');

const AWS = require("aws-sdk");

const documentClient = new AWS.DynamoDB.DocumentClient({
region: 'us-east-1',
convertEmptyValues: true
});

async function setupPgClient() {
var payloadcopy
try {
const pgClient = new pg.Client(pgConnectionString)
if (!pgClient.connect()) {
await pgClient.connect()
}
rec_d_start = config.RECONSILER.RECONSILER_START
rec_d_end = config.RECONSILER.RECONSILER_END
rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE
var paramvalues = [rec_d_start,rec_d_end];
sql1 = "select a.payloadseqid from common_oltp.producer_audit_dd a where not exists (select from common_oltp.pgifx_sync_audit "
sql2 = " where pgifx_sync_audit.payloadseqid = a.payloadseqid) and a.auditdatetime "
sql3 = " between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($1)"
sql4 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)"

sql = sql1 + sql2 + sql3 + sql4
logger.info(`${sql}`)
const res = await pgClient.query(sql,paramvalues, async (err,result) => {
if (err) {
var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"`
logger.debug (errmsg0)
await callposttoslack(errmsg0)
}
else{
console.log("Reconsiler_dd_2 : Rowcount = ", result.rows.length)
if (result.rows.length > 0)
{
Promise.all(result.rows.map(async (row) => {
console.log(row.payloadseqid)
const x = await calldynamodb(row.payloadseqid)
console.log("val of x ",x)
}
//))
)).then(result => { if (typeof result.rows == "undefined")
{
console.log("terminating after posting to kafka")
pgClient.end()
terminate()
}
})
}
else {
console.log("terminate due to 0 rows")
pgClient.end()
terminate()
}
}
})
}
catch(err)
{
console.log(err)
process.exit(1)
}
}


async function calldynamodb(payloadseqid)
{
console.log("At dynamoc function")
var params = {
TableName : config.DYNAMODB.DYNAMODB_TABLE,
KeyConditionExpression: 'payloadseqid = :hkey',
ExpressionAttributeValues: {
':hkey': payloadseqid
}}

return new Promise(async function (resolve, reject) {
await documentClient.query(params, async function(err, data) {
if (err) console.log(err);
else {
var s_payload = (data.Items[0].pl_document)
// console.log("s_payload",s_payload)
logger.info(`Reconsiler2 Before Posting Payload : ${JSON.stringify(s_payload)}`)
await kafkaService.pushToKafka(s_payload)
await audit(s_payload, 1)
logger.info(`Reconsiler2 Payload posted`)
};
resolve("done");
});
})
}

async function audit(message, reconsileflag) {
var pl_producererr = "Reconsiler2"
const pl_processid = 5555
payload1 = (message.payload)
const pl_seqid = payload1.payloadseqid
const pl_topic = payload1.topic // TODO can move in config ?
const pl_table = payload1.table
const pl_uniquecolumn = payload1.Uniquecolumn
const pl_operation = payload1.operation
const pl_timestamp = payload1.timestamp
//const pl_payload = JSON.stringify(message)
const pl_payload = message
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation}`
logger.debug(`${pl_producererr} : ${logMessage}`);
await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "Reconsiler2", pl_payload, new Date(), ""], 'producer')
return
}

async function callposttoslack(slackmessage) {
if (config.SLACK.SLACKNOTIFY === 'true') {
return new Promise(function (resolve, reject) {
postMessage(slackmessage, (response) => {
console.log(`respnse : ${response}`)
if (response.statusCode < 400) {
logger.debug('Message posted successfully');
//callback(null);
} else if (response.statusCode < 500) {
const errmsg1 = `Slack Error: Reconsiler1: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
logger.debug(`error-sync: ${errmsg1}`)
} else {
logger.debug(`Reconsiler1: Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
}
resolve("done")
});
}) //end
}
return
}

//=================BEGIN HERE =======================
const terminate = () => process.exit()

async function run() {
logger.debug("Initialising Reconsiler2 setup...")
kafkaService.init().catch((e) => {
logger.error(`Kafka producer intialization error: "${e}"`)
terminate()
})
setupPgClient()
}

run()
82 changes: 82 additions & 0 deletions src/services/producer_audit_dd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@

const config = require('config')
const pg = require('pg')
const logger = require('../common/logger')
const postMessage = require('./posttoslack')
const pgOptions = config.get('POSTGRES')
const pgpool = require('./db.js');

const terminate = () => process.exit()

async function pushToAuditDD(message)
{
try
{
const pl_channel = message.channel
const pl_processid = message.processId
const payload = JSON.parse(message.payload)
const pl_seqid = payload.payload.payloadseqid
const pl_topic = payload.topic
const pl_table = payload.payload.table
const pl_uniquecolumn = payload.payload.Uniquecolumn
const pl_operation = payload.payload.operation
const pl_timestamp = payload.timestamp
const pl_payload = JSON.stringify(payload.payload)
const logMessage = `${pl_channel} ${pl_processid} ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`

let paramval = [pl_seqid,pl_timestamp,pl_channel,pl_table,pl_operation,JSON.stringify(message),pl_topic,pl_uniquecolumn,pl_processid]
sql0 = "INSERT INTO common_oltp.producer_audit_dd(payloadseqid,auditdatetime,channelname,tablename,dboperation,payload,topicname,uniquecolumn,processid) "
sql1= " VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict (payloadseqid) DO nothing ";
sql = sql0 + sql1

pgpool.on('error', (err, client) => {
logger.debug(`producer_audit_dd: Unexpected error on idle client : ${err}`)
})

pgpool.connect((err, client, release) => {
if (err) {
return logger.debug(`producer_audit_dd : Error acquiring client : ${err.stack}`)
}
client.query(sql, paramval, (err, res) => {
release()
if (err) {
return logger.debug(`producer_audit_dd :Error executing Query : ${err.stack}`)
}
logger.debug(`producer_audit_dd : Audit Trail update : ${res.rowCount}`)
})
})
}
catch(err)
{
logger.debug(`pushToAuditDD : ${err}`)
callposttoslack(`pushToAuditDD : ${err}`)
process.exit(1)
}

}


async function callposttoslack(slackmessage) {
if (config.SLACK.SLACKNOTIFY === 'true') {
return new Promise(function (resolve, reject) {
postMessage(slackmessage, (response) => {
console.log(`respnse : ${response}`)
if (response.statusCode < 400) {
logger.debug('Message posted successfully');
//callback(null);
} else if (response.statusCode < 500) {
const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
logger.debug(`error-sync: ${errmsg1}`)
}
else {
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
}
resolve("done")
});
}) //end
}
}


module.exports = { pushToAuditDD }