Skip to content

Commit bb0b5a7

Browse files
Merge pull request #26 from topcoder-platform/dev
merge for new reconsiler2 to master [skip ci]
2 parents 58cd5a0 + 1494463 commit bb0b5a7

File tree

6 files changed

+254
-14
lines changed

6 files changed

+254
-14
lines changed

.circleci/config.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ build_steps: &build_steps
5757
#source buildenvvar
5858
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
5959
60-
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
61-
source buildenvvar
62-
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
60+
#./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-consumer-deployvar
61+
#source buildenvvar
62+
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
6363
6464
6565
#echo "Running Masterscript - deploy postgres-ifx-processer producer"
@@ -88,11 +88,11 @@ build_steps: &build_steps
8888
#source buildenvvar
8989
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
9090
91-
#echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2"
92-
#if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
93-
#./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar
94-
#source buildenvvar
95-
#./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
91+
echo "Running Masterscript - deploy postgres-ifx-processer reconsiler2"
92+
if [ -e ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar.json ]; then sudo rm -vf ${LOGICAL_ENV}-${APP_NAME}-reconsiler1-deployvar.json; fi
93+
./buildenv.sh -e ${DEPLOY_ENV} -b ${LOGICAL_ENV}-${APP_NAME}-reconsiler2-deployvar
94+
source buildenvvar
95+
./master_deploy.sh -d ECS -e ${DEPLOY_ENV} -t latest -s ${GLOBAL_ENV}-global-appvar,${LOGICAL_ENV}-${APP_NAME}-appvar -i postgres-ifx-processer
9696
9797
jobs:
9898
# Build & Deploy against development backend #

config/default.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ module.exports = {
2323
password: process.env.PG_PASSWORD || 'password',
2424
port: parseInt(process.env.PG_PORT, 10) || 5432,
2525

26-
triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'prod_db_notifications', // List of trigger functions to listen to
27-
triggerTopics: process.env.TRIGGER_TOPICS || ['prod.db.postgres.sync'], // Names of the topic in the trigger payload
26+
triggerFunctions: process.env.TRIGGER_FUNCTIONS || 'dev_db_notifications', // List of trigger functions to listen to
27+
triggerTopics: process.env.TRIGGER_TOPICS || ['dev.db.postgres.sync'], // Names of the topic in the trigger payload
2828
triggerOriginators: process.env.TRIGGER_ORIGINATORS || ['tc-postgres-delta-processor'] // Names of the originator in the trigger payload
2929
},
3030
KAFKA: { // Kafka connection options
@@ -39,7 +39,7 @@ module.exports = {
3939
errorTopic: process.env.ERROR_TOPIC || 'db.scorecardtable.error',
4040
recipients: ['[email protected]'], // Kafka partitions to use,
4141
KAFKA_URL: process.env.KAFKA_URL,
42-
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'prod-postgres-ifx-consumer',
42+
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'dev-postgres-ifx-consumer',
4343
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
4444
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
4545
},
@@ -49,13 +49,13 @@ module.exports = {
4949
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
5050
},
5151
RECONSILER: {
52-
RECONSILER_START: process.env.RECONSILER_START || 5,
52+
RECONSILER_START: process.env.RECONSILER_START || 30,
5353
RECONSILER_END: process.env.RECONSILER_END || 1,
5454
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
5555
},
5656
DYNAMODB:
5757
{
58-
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'prod_pg_ifx_payload_sync',
58+
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'dev_pg_ifx_payload_sync',
5959
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
6060
},
6161

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"consumer": "node ./src/consumer.js",
1212
"producer_dd": "node ./src/producer.js failover",
1313
"reconsiler1": "node ./src/reconsiler-audit.js",
14-
"reconsiler2": "node ./src/reconsiler-dd.js",
14+
"reconsiler2": "node ./src/reconsiler-dd-new.js",
1515
"start": "npm run producer & npm run producer_dd & npm run consumer & npm run reconsiler1 & npm run reconsiler2 & npm run producer_channel_2"
1616
},
1717
"author": "Topcoder",

src/producer.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const postMessage = require('./services/posttoslack')
1111
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
1212
const pgClient = new pg.Client(pgConnectionString)
1313
const auditTrail = require('./services/auditTrail');
14+
const paudit_dd = require('./services/producer_audit_dd')
1415
const express = require('express')
1516
const app = express()
1617
const port = 3000
@@ -41,6 +42,8 @@ async function setupPgClient() {
4142
} else {
4243
logger.info('Push to dynamodb for reconciliation')
4344
await pushToDynamoDb(payload)
45+
logger.info('Push to producer_audit_dd for reconciliation')
46+
await paudit_dd.pushToAuditDD(message)
4447
}
4548

4649
} else {

src/reconsiler-dd-new.js

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
const logger = require('./common/logger')
2+
const config = require('config')
3+
const pg = require('pg')
4+
const pgOptions = config.get('POSTGRES')
5+
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
6+
const pgClient = new pg.Client(pgConnectionString)
7+
const kafkaService = require('./services/pushToDirectKafka')
8+
const postMessage = require('./services/posttoslack')
9+
const pushToDynamoDb = require('./services/pushToDynamoDb')
10+
const auditTrail = require('./services/auditTrail');
11+
12+
const AWS = require("aws-sdk");
13+
14+
const documentClient = new AWS.DynamoDB.DocumentClient({
15+
region: 'us-east-1',
16+
convertEmptyValues: true
17+
});
18+
19+
async function setupPgClient() {
20+
var payloadcopy
21+
try {
22+
const pgClient = new pg.Client(pgConnectionString)
23+
if (!pgClient.connect()) {
24+
await pgClient.connect()
25+
}
26+
rec_d_start = config.RECONSILER.RECONSILER_START
27+
rec_d_end = config.RECONSILER.RECONSILER_END
28+
rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE
29+
var paramvalues = [rec_d_start,rec_d_end];
30+
sql1 = "select a.payloadseqid from common_oltp.producer_audit_dd a where not exists (select from common_oltp.pgifx_sync_audit "
31+
sql2 = " where pgifx_sync_audit.payloadseqid = a.payloadseqid) and a.auditdatetime "
32+
sql3 = " between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($1)"
33+
sql4 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)"
34+
35+
sql = sql1 + sql2 + sql3 + sql4
36+
logger.info(`${sql}`)
37+
const res = await pgClient.query(sql,paramvalues, async (err,result) => {
38+
if (err) {
39+
var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"`
40+
logger.debug (errmsg0)
41+
await callposttoslack(errmsg0)
42+
}
43+
else{
44+
console.log("Reconsiler_dd_2 : Rowcount = ", result.rows.length)
45+
if (result.rows.length > 0)
46+
{
47+
Promise.all(result.rows.map(async (row) => {
48+
console.log(row.payloadseqid)
49+
const x = await calldynamodb(row.payloadseqid)
50+
console.log("val of x ",x)
51+
}
52+
//))
53+
)).then(result => { if (typeof result.rows == "undefined")
54+
{
55+
console.log("terminating after posting to kafka")
56+
pgClient.end()
57+
terminate()
58+
}
59+
})
60+
}
61+
else {
62+
console.log("terminate due to 0 rows")
63+
pgClient.end()
64+
terminate()
65+
}
66+
}
67+
})
68+
}
69+
catch(err)
70+
{
71+
console.log(err)
72+
process.exit(1)
73+
}
74+
}
75+
76+
77+
async function calldynamodb(payloadseqid)
78+
{
79+
console.log("At dynamoc function")
80+
var params = {
81+
TableName : config.DYNAMODB.DYNAMODB_TABLE,
82+
KeyConditionExpression: 'payloadseqid = :hkey',
83+
ExpressionAttributeValues: {
84+
':hkey': payloadseqid
85+
}}
86+
87+
return new Promise(async function (resolve, reject) {
88+
await documentClient.query(params, async function(err, data) {
89+
if (err) console.log(err);
90+
else {
91+
var s_payload = (data.Items[0].pl_document)
92+
// console.log("s_payload",s_payload)
93+
logger.info(`Reconsiler2 Before Posting Payload : ${JSON.stringify(s_payload)}`)
94+
await kafkaService.pushToKafka(s_payload)
95+
await audit(s_payload, 1)
96+
logger.info(`Reconsiler2 Payload posted`)
97+
};
98+
resolve("done");
99+
});
100+
})
101+
}
102+
103+
async function audit(message, reconsileflag) {
104+
var pl_producererr = "Reconsiler2"
105+
const pl_processid = 5555
106+
payload1 = (message.payload)
107+
const pl_seqid = payload1.payloadseqid
108+
const pl_topic = payload1.topic // TODO can move in config ?
109+
const pl_table = payload1.table
110+
const pl_uniquecolumn = payload1.Uniquecolumn
111+
const pl_operation = payload1.operation
112+
const pl_timestamp = payload1.timestamp
113+
//const pl_payload = JSON.stringify(message)
114+
const pl_payload = message
115+
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation}`
116+
logger.debug(`${pl_producererr} : ${logMessage}`);
117+
await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "Reconsiler2", pl_payload, new Date(), ""], 'producer')
118+
return
119+
}
120+
121+
async function callposttoslack(slackmessage) {
122+
if (config.SLACK.SLACKNOTIFY === 'true') {
123+
return new Promise(function (resolve, reject) {
124+
postMessage(slackmessage, (response) => {
125+
console.log(`respnse : ${response}`)
126+
if (response.statusCode < 400) {
127+
logger.debug('Message posted successfully');
128+
//callback(null);
129+
} else if (response.statusCode < 500) {
130+
const errmsg1 = `Slack Error: Reconsiler1: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
131+
logger.debug(`error-sync: ${errmsg1}`)
132+
} else {
133+
logger.debug(`Reconsiler1: Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
134+
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
135+
}
136+
resolve("done")
137+
});
138+
}) //end
139+
}
140+
return
141+
}
142+
143+
//=================BEGIN HERE =======================
144+
const terminate = () => process.exit()
145+
146+
async function run() {
147+
logger.debug("Initialising Reconsiler2 setup...")
148+
kafkaService.init().catch((e) => {
149+
logger.error(`Kafka producer intialization error: "${e}"`)
150+
terminate()
151+
})
152+
setupPgClient()
153+
}
154+
155+
run()

src/services/producer_audit_dd.js

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
2+
const config = require('config')
3+
const pg = require('pg')
4+
const logger = require('../common/logger')
5+
const postMessage = require('./posttoslack')
6+
const pgOptions = config.get('POSTGRES')
7+
const pgpool = require('./db.js');
8+
9+
const terminate = () => process.exit()
10+
11+
async function pushToAuditDD(message)
12+
{
13+
try
14+
{
15+
const pl_channel = message.channel
16+
const pl_processid = message.processId
17+
const payload = JSON.parse(message.payload)
18+
const pl_seqid = payload.payload.payloadseqid
19+
const pl_topic = payload.topic
20+
const pl_table = payload.payload.table
21+
const pl_uniquecolumn = payload.payload.Uniquecolumn
22+
const pl_operation = payload.payload.operation
23+
const pl_timestamp = payload.timestamp
24+
const pl_payload = JSON.stringify(payload.payload)
25+
const logMessage = `${pl_channel} ${pl_processid} ${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
26+
27+
let paramval = [pl_seqid,pl_timestamp,pl_channel,pl_table,pl_operation,JSON.stringify(message),pl_topic,pl_uniquecolumn,pl_processid]
28+
sql0 = "INSERT INTO common_oltp.producer_audit_dd(payloadseqid,auditdatetime,channelname,tablename,dboperation,payload,topicname,uniquecolumn,processid) "
29+
sql1= " VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict (payloadseqid) DO nothing ";
30+
sql = sql0 + sql1
31+
32+
pgpool.on('error', (err, client) => {
33+
logger.debug(`producer_audit_dd: Unexpected error on idle client : ${err}`)
34+
})
35+
36+
pgpool.connect((err, client, release) => {
37+
if (err) {
38+
return logger.debug(`producer_audit_dd : Error acquiring client : ${err.stack}`)
39+
}
40+
client.query(sql, paramval, (err, res) => {
41+
release()
42+
if (err) {
43+
return logger.debug(`producer_audit_dd :Error executing Query : ${err.stack}`)
44+
}
45+
logger.debug(`producer_audit_dd : Audit Trail update : ${res.rowCount}`)
46+
})
47+
})
48+
}
49+
catch(err)
50+
{
51+
logger.debug(`pushToAuditDD : ${err}`)
52+
callposttoslack(`pushToAuditDD : ${err}`)
53+
process.exit(1)
54+
}
55+
56+
}
57+
58+
59+
async function callposttoslack(slackmessage) {
60+
if (config.SLACK.SLACKNOTIFY === 'true') {
61+
return new Promise(function (resolve, reject) {
62+
postMessage(slackmessage, (response) => {
63+
console.log(`respnse : ${response}`)
64+
if (response.statusCode < 400) {
65+
logger.debug('Message posted successfully');
66+
//callback(null);
67+
} else if (response.statusCode < 500) {
68+
const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
69+
logger.debug(`error-sync: ${errmsg1}`)
70+
}
71+
else {
72+
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
73+
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
74+
}
75+
resolve("done")
76+
});
77+
}) //end
78+
}
79+
}
80+
81+
82+
module.exports = { pushToAuditDD }

0 commit comments

Comments
 (0)