Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 9be18c4

Browse files
author
nkumar
committedFeb 17, 2020
reconsiler checkin
1 parent a3bf50d commit 9be18c4

File tree

3 files changed

+176
-58
lines changed

3 files changed

+176
-58
lines changed
 

‎config/default.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ module.exports = {
4444
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
4545
},
4646
RECONSILER:{
47-
RECONSILER_START: process.env.RECONSILER_START || 10,
48-
RECONSILER_END: process.env.RECONSILER_END || 5,
47+
RECONSILER_START: process.env.RECONSILER_START || 5,
48+
RECONSILER_END: process.env.RECONSILER_END || 1,
4949
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
5050
},
5151
DYNAMODB:
5252
{
53-
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync'
53+
DYNAMODB_TABLE: process.env.DYNAMODB_TABLE || 'test_pg_ifx_payload_sync',
54+
DD_ElapsedTime: process.env.DD_ElapsedTime || 600000
5455
},
5556

5657
AUTH0_URL: process.env.AUTH0_URL ,

‎src/reconsiler-audit.js

Lines changed: 169 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,40 @@
11
const config = require('config')
22
const pg = require('pg')
3+
var AWS = require("aws-sdk");
34
const logger = require('./common/logger')
45
const pushToKafka = require('./services/pushToKafka')
56
const pgOptions = config.get('POSTGRES')
67
const postMessage = require('./services/posttoslack')
78
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
8-
const pgClient = new pg.Client(pgConnectionString)
9+
//const pgClient = new pg.Client(pgConnectionString)
910
const auditTrail = require('./services/auditTrail');
1011
const port = 3000
11-
12+
//----------------------------Calling reconsiler 1 Audit log script ----------
1213
async function setupPgClient() {
1314
var payloadcopy
1415
try {
15-
await pgClient.connect()
16-
//sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated')
17-
//and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') ";
18-
//>= 12.53 and <= 12.48
16+
const pgClient = new pg.Client(pgConnectionString)
17+
if (!pgClient.connect()) {
18+
await pgClient.connect()
19+
}
20+
//rec_d_start = 10
21+
//rec_d_end = 1
1922
rec_d_start = config.RECONSILER.RECONSILER_START
2023
rec_d_end = config.RECONSILER.RECONSILER_END
2124
rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE
22-
var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end];
23-
//var paramvalues = ['push-to-kafka','60002027'];
24-
//sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
25+
var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end];
2526
sql1 = "select pgifx_sync_audit.seq_id, payloadseqid,auditdatetime ,syncstatus, payload from pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)"
2627
sql2 = " and pgifx_sync_audit.auditdatetime between (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($2)"
2728
sql3 = " and (timezone('utc',now())) - interval '1"+ rec_d_type + "' * ($3)"
2829
sql = sql1 + sql2 + sql3
29-
console.log('sql ', sql)
30-
// sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
31-
// sql4 = " and pgifx_sync_audit.payloadseqid = ($2)"
32-
33-
//sql = sql3 + sql4
34-
//const result = await pgClient.query(sql1, (err, res) => {
3530
await pgClient.query(sql,paramvalues, async (err,result) => {
3631
if (err) {
37-
var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"`
32+
var errmsg0 = `error-sync: Audit Reconsiler1 query "${err.message}"`
3833
logger.debug (errmsg0)
39-
// await callposttoslack(errmsg0)
34+
await callposttoslack(errmsg0)
4035
}
4136
else{
42-
console.log("Reconsiler Rowcount = ", result.rows.length)
37+
console.log("Reconsiler1 : Rowcount = ", result.rows.length)
4338
for (var i = 0; i < result.rows.length; i++) {
4439
for(var columnName in result.rows[i]) {
4540
// console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]);
@@ -48,39 +43,37 @@ async function setupPgClient() {
4843
var reconsiler_payload = result.rows[i][columnName]
4944
}
5045
}//column for loop
51-
try {
46+
try {
47+
//console.log("reconsiler_payload====",reconsiler_payload);
48+
if (reconsiler_payload != ""){
5249
var s_payload = reconsiler_payload
5350
payload = JSON.parse(s_payload)
5451
payload1 = payload.payload
5552
await pushToKafka(payload1)
56-
logger.info('Reconsiler : Push to kafka and added for audit trail')
57-
audit(s_payload)
58-
} catch (error) {
59-
logger.error('Reconsiler: Could not parse message payload')
60-
logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`)
61-
const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"`
53+
logger.info('Reconsiler1 Push to kafka and added for audit trail')
54+
await audit(s_payload,0) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb
55+
} }catch (error) {
56+
logger.error('Reconsiler1 : Could not parse message payload')
57+
logger.debug(`error-sync: Reconsiler1 parse message : "${error.message}"`)
58+
const errmsg1 = `error-sync: Reconsiler1 : Error Parse or payload : "${error.message}" `
6259
logger.logFullError(error)
63-
audit(error)
60+
// await audit(error,0)
6461
await callposttoslack(errmsg1)
65-
}
66-
}//result for loop
62+
terminate()
63+
}
64+
}//result for loop
6765
}
6866
pgClient.end()
69-
})
67+
terminate()
68+
})
7069
}catch (err) {
71-
const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"`
70+
const errmsg = `postgres-ifx-processor: Reconsiler1 : Error in setting up postgres client: "${err.message}"`
7271
logger.error(errmsg)
7372
logger.logFullError(err)
7473
await callposttoslack(errmsg)
7574
terminate()
7675
}
77-
}
78-
79-
const terminate = () => process.exit()
80-
81-
async function run() {
82-
logger.debug("Initialising Reconsiler setup...")
83-
await setupPgClient()
76+
return
8477
}
8578

8679
async function callposttoslack(slackmessage) {
@@ -92,35 +85,159 @@ async function callposttoslack(slackmessage) {
9285
logger.debug('Message posted successfully');
9386
//callback(null);
9487
} else if (response.statusCode < 500) {
95-
const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
88+
const errmsg1 = `Slack Error: Reconsiler1: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
9689
logger.debug(`error-sync: ${errmsg1}`)
9790
} else {
98-
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
91+
logger.debug(`Reconsiler1: Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
9992
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
10093
}
10194
resolve("done")
10295
});
10396
}) //end
10497
}
105-
98+
return
10699
}
107-
// execute
108-
run()
109100

110-
async function audit(message) {
111-
const pl_processid = 5555
112-
const jsonpayload = JSON.parse(message)
113-
payload = JSON.parse(jsonpayload.payload)
114-
payload1 = payload.payload
101+
async function audit(message,reconsileflag) {
102+
if (reconsileflag === 1)
103+
{
104+
const jsonpayload = (message)
105+
const payload = (jsonpayload.payload)
106+
var pl_producererr= "Reconsiler2"
107+
}else {
108+
const jsonpayload = JSON.parse(message)
109+
payload = JSON.parse(jsonpayload.payload)
110+
var pl_producererr= "Reconsiler1"
111+
}
112+
const pl_processid = 5555
113+
//const jsonpayload = JSON.parse(message)
114+
//payload = JSON.parse(jsonpayload.payload)
115+
payload1 = (payload.payload)
115116
const pl_seqid = payload1.payloadseqid
116-
const pl_topic = payload1.topic // TODO can move in config ?
117+
const pl_topic = payload1.topic // TODO can move in config ?
117118
const pl_table = payload1.table
118119
const pl_uniquecolumn = payload1.Uniquecolumn
119120
const pl_operation = payload1.operation
120121
const pl_timestamp = payload1.timestamp
121-
//const pl_payload = JSON.stringify(payload.payload)
122-
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
123-
logger.debug(`reconsiler : ${logMessage}`);
124-
//await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer')
122+
const pl_payload = JSON.stringify(message)
123+
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
124+
logger.debug(`${pl_producererr} : ${logMessage}`);
125+
await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", pl_producererr, pl_payload, new Date(), ""], 'producer')
126+
return
127+
}
128+
129+
//===============RECONSILER2 DYNAMODB CODE STARTS HERE ==========================
130+
131+
async function callReconsiler2()
132+
{console.log("inside 2");
133+
docClient.scan(params, onScan);
134+
}
135+
136+
var docClient = new AWS.DynamoDB.DocumentClient({
137+
region: 'us-east-1',
138+
convertEmptyValues: true
139+
});
140+
//ElapsedTime = 094600000
141+
ElapsedTime = config.DYNAMODB.DD_ElapsedTime
142+
var params = {
143+
TableName: config.DYNAMODB.DYNAMODB_TABLE,
144+
FilterExpression: "#timestamp between :time_1 and :time_2",
145+
ExpressionAttributeNames: {
146+
"#timestamp": "timestamp",
147+
},
148+
ExpressionAttributeValues: {
149+
":time_1": Date.now() - ElapsedTime,
150+
":time_2": Date.now()
151+
}
152+
}
153+
154+
function onScan(err, data) {
155+
if (err) {
156+
logger.error("Unable to scan the table. Error JSON:", JSON.stringify(err, null, 2));
157+
terminate()
158+
} else {
159+
try
160+
{
161+
console.log("Scan succeeded.");
162+
let total_dd_records = 0;
163+
let total_pushtokafka = 0;
164+
data.Items.forEach(async function(item) {
165+
//console.log(item.payloadseqid);
166+
var retval = await verify_pg_record_exists(item.payloadseqid)
167+
//console.log("retval", retval);
168+
if (retval === false){
169+
var s_payload = (item.pl_document)
170+
payload = s_payload
171+
payload1 = (payload.payload)
172+
await pushToKafka(item.pl_document)
173+
await audit(s_payload,1) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb
174+
logger.info(`Reconsiler2 : ${item.payloadseqid} posted to kafka: Total Kafka Count : ${total_pushtokafka}`)
175+
total_pushtokafka += 1
176+
}
177+
total_dd_records += 1
178+
});
179+
logger.info(`Reconsiler2 : count of total_dd_records ${total_dd_records}`);
180+
if (typeof data.LastEvaluatedKey != "undefined") {
181+
console.log("Scanning for more...");
182+
params.ExclusiveStartKey = data.LastEvaluatedKey;
183+
docClient.scan(params, onScan);
184+
}
185+
}
186+
catch (err) {
187+
const errmsg = `error-sync: Reconsiler2 : Error during dynamodb scan/kafka push: "${err.message}"`
188+
logger.error(errmsg)
189+
logger.logFullError(err)
190+
callposttoslack(errmsg)
191+
//terminate()
192+
}
193+
}
194+
//terminate()
125195
}
126196

197+
async function verify_pg_record_exists(seqid)
198+
{
199+
try {
200+
const pgClient = new pg.Client(pgConnectionString)
201+
if (!pgClient.connect()) {await pgClient.connect()}
202+
var paramvalues = [seqid]
203+
sql = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.payloadseqid = ($1)'
204+
return new Promise(function (resolve, reject) {
205+
pgClient.query(sql, paramvalues, async (err, result) => {
206+
if (err) {
207+
var errmsg0 = `error-sync: Audit reconsiler2 query "${err.message}"`
208+
console.log(errmsg0)
209+
}
210+
else {
211+
if (result.rows.length > 0) {
212+
//console.log("row length > 0 ")
213+
resolve(true);
214+
}
215+
else {
216+
//console.log("0")
217+
resolve(false);
218+
}
219+
}
220+
pgClient.end()
221+
})
222+
})}
223+
catch (err) {
224+
const errmsg = `error-sync: Reconsiler2 : Error in setting up postgres client: "${err.message}"`
225+
logger.error(errmsg)
226+
logger.logFullError(err)
227+
await callposttoslack(errmsg)
228+
terminate()
229+
}
230+
}
231+
232+
//=================BEGIN HERE =======================
233+
const terminate = () => process.exit()
234+
235+
async function run() {
236+
logger.debug("Initialising Reconsiler1 setup...")
237+
await setupPgClient()
238+
//logger.debug("Initialising Reconsiler2 setup...")
239+
//callReconsiler2()
240+
// terminate()
241+
}
242+
//execute
243+
run()

‎src/services/auditTrail.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ if (!pgClient2) {
2525
}
2626
if (sourcetype === 'producer'){
2727
sql0 = 'INSERT INTO common_oltp.pgifx_sync_audit(payloadseqid,processId,tablename,uniquecolumn,dboperation,syncstatus,retrycount,consumer_err,producer_err,payload,auditdatetime,topicname) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)'
28-
sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus) = ($6) where pgifx_sync_audit.payloadseqid = $1';
28+
sql1= ' on conflict (payloadseqid) DO UPDATE SET (syncstatus,producer_err) = ($6,$9) where pgifx_sync_audit.payloadseqid = $1';
2929
sql = sql0 + sql1
3030
logger.debug(`--Audit Trail update producer--`)
3131
} else {
@@ -34,12 +34,12 @@ if (sourcetype === 'producer'){
3434
// where pgifx_sync_audit.payloadseqid = $1';
3535
//and pgifx_sync_audit.processId = $2';
3636
sql = sql0 + sql1
37-
logger.debug(`--Audit Trail update consumer--`)
37+
logger.debug(`--${1} ${3} 1 Audit Trail update consumer--`)
3838
//logger.debug(`sql values "${sql}"`);
3939
}
4040
return pgClient2.query(sql, data, (err, res) => {
4141
if (err) {
42-
logger.debug(`--Audit Trail update error-- ${err.stack}`)
42+
logger.debug(`-- Audit Trail update error-- ${err.stack}`)
4343
//pgClient2.end()
4444
} else {
4545
// logger.debug(`--Audit Trail update success-- `)

0 commit comments

Comments
 (0)
Please sign in to comment.