Skip to content

Commit cdbdfbd

Browse files
author
nkumar
committed
reconsiler check-in
Committer: nkumar
1 parent 96b4bec commit cdbdfbd

File tree

5 files changed

+135
-3
lines changed

5 files changed

+135
-3
lines changed

config/default.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ module.exports = {
4343
SLACKCHANNEL: process.env.SLACKCHANNEL || 'ifxpg-migrator',
4444
SLACKNOTIFY: process.env.SLACKNOTIFY || 'false'
4545
},
46+
RECONSILER:{
47+
RECONSILER_START: process.env.RECONSILER_START || 10,
48+
RECONSILER_END: process.env.RECONSILER_END || 5,
49+
RECONSILER_DURATION_TYPE: process.env.RECONSILER_DURATION_TYPE || 'm'
50+
},
4651

4752
AUTH0_URL: process.env.AUTH0_URL ,
4853
AUTH0_AUDIENCE: process.env.AUTH0_AUDIENCE ,

src/consumer.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ let cs_payloadseqid
7373
//var retryvar
7474
if (message.payload['retryCount']) retryvar = message.payload.retryCount;
7575
await consumer.commitOffset({ topic, partition, offset: m.offset }) // Commit success as will re-publish
76-
await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
77-
'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
76+
// await auditTrail([cs_payloadseqid,3333,'message.payload.table','message.payload.Uniquecolumn',
77+
// 'message.payload.operation',"Informix-Updated1",retryvar,"","",'message.payload.data',new Date(),'message.topic'],'consumer')
7878
//await callposttoslack(`Retry for Kafka push : retrycount : "${retryvar}"`)
7979
logger.debug(`Trying to push same message after adding retryCounter`)
8080
if (!message.payload.retryCount) {

src/producer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ async function audit(message) {
113113
} else {
114114
logger.debug(`Producer DynamoDb : ${logMessage}`);
115115
}
116-
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", pl_payload, pl_timestamp, pl_topic], 'producer')
116+
auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka", "", "", "", JSON.stringify(message), pl_timestamp, pl_topic], 'producer')
117117
} else {
118118
const pl_randonseq = 'err-' + (new Date()).getTime().toString(36) + Math.random().toString(36).slice(2)
119119
if (!isFailover) {

src/reconsiler-audit.js

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
const config = require('config')
2+
const pg = require('pg')
3+
const logger = require('./common/logger')
4+
const pushToKafka = require('./services/pushToKafka')
5+
const pgOptions = config.get('POSTGRES')
6+
const postMessage = require('./services/posttoslack')
7+
const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}@${pgOptions.host}:${pgOptions.port}/${pgOptions.database}`
8+
const pgClient = new pg.Client(pgConnectionString)
9+
const auditTrail = require('./services/auditTrail');
10+
const port = 3000
11+
12+
async function setupPgClient() {
13+
var payloadcopy
14+
try {
15+
await pgClient.connect()
16+
//sql1= "select * from pgifx_sync_audit where syncstatus in ('Informix-updated')
17+
//and auditdatetime >= (now()::date - interval '10m') and auditdatetime <= (now()::date - interval '5m') ";
18+
//>= 12.53 and <= 12.48
19+
rec_d_start = config.RECONSILER.RECONSILER_START
20+
rec_d_end = config.RECONSILER.RECONSILER_END
21+
rec_d_type = config.RECONSILER.RECONSILER_DURATION_TYPE
22+
var paramvalues = ['push-to-kafka',rec_d_start,rec_d_end];
23+
//var paramvalues = ['push-to-kafka','60002027'];
24+
//sql1 = 'select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
25+
sql1 = " select seq_id, payloadseqid,auditdatetime at time zone 'utc' at time zone 'Asia/Calcutta',syncstatus, payload where pgifx_sync_audit.syncstatus =($1)"
26+
sql2 = " and pgifx_sync_audit.auditdatetime >= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($2)"
27+
sql3 = " and pgifx_sync_audit.auditdatetime <= DATE(NOW()) - INTERVAL '1"+ rec_d_type + "' * ($3)"
28+
sql = sql1 + sql2 + sql3
29+
console.log('sql ', sql)
30+
// sql3 = ' select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.syncstatus =($1)'
31+
// sql4 = " and pgifx_sync_audit.payloadseqid = ($2)"
32+
33+
//sql = sql3 + sql4
34+
//const result = await pgClient.query(sql1, (err, res) => {
35+
await pgClient.query(sql,paramvalues, async (err,result) => {
36+
if (err) {
37+
var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"`
38+
logger.debug (errmsg0)
39+
// await callposttoslack(errmsg0)
40+
}
41+
else{
42+
console.log("Reconsiler Rowcount = ", result.rows.length)
43+
for (var i = 0; i < result.rows.length; i++) {
44+
for(var columnName in result.rows[i]) {
45+
// console.log('column "%s" has a value of "%j"', columnName, result.rows[i][columnName]);
46+
//if ((columnName === 'seq_id') || (columnName === 'payload')){
47+
if ((columnName === 'payload')){
48+
var reconsiler_payload = result.rows[i][columnName]
49+
}
50+
}//column for loop
51+
try {
52+
var s_payload = reconsiler_payload
53+
payload = JSON.parse(s_payload)
54+
payload1 = payload.payload
55+
await pushToKafka(payload1)
56+
logger.info('Reconsiler : Push to kafka and added for audit trail')
57+
audit(s_payload)
58+
} catch (error) {
59+
logger.error('Reconsiler: Could not parse message payload')
60+
logger.debug(`error-sync: Reconsiler parse message : "${error.message}"`)
61+
const errmsg1 = `postgres-ifx-processor: Reconsiler : Error Parse or payload : "${error.message}" \n payload : "${payloadcopy.payload}"`
62+
logger.logFullError(error)
63+
audit(error)
64+
await callposttoslack(errmsg1)
65+
}
66+
}//result for loop
67+
}
68+
pgClient.end()
69+
})
70+
}catch (err) {
71+
const errmsg = `postgres-ifx-processor: Reconsiler : Error in setting up postgres client: "${err.message}"`
72+
logger.error(errmsg)
73+
logger.logFullError(err)
74+
await callposttoslack(errmsg)
75+
terminate()
76+
}
77+
}
78+
79+
const terminate = () => process.exit()
80+
81+
async function run() {
82+
logger.debug("Initialising Reconsiler setup...")
83+
await setupPgClient()
84+
}
85+
86+
async function callposttoslack(slackmessage) {
87+
if (config.SLACK.SLACKNOTIFY === 'true') {
88+
return new Promise(function (resolve, reject) {
89+
postMessage(slackmessage, (response) => {
90+
console.log(`respnse : ${response}`)
91+
if (response.statusCode < 400) {
92+
logger.debug('Message posted successfully');
93+
//callback(null);
94+
} else if (response.statusCode < 500) {
95+
const errmsg1 = `Slack Error: posting message to Slack API: ${response.statusCode} - ${response.statusMessage}`
96+
logger.debug(`error-sync: ${errmsg1}`)
97+
} else {
98+
logger.debug(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
99+
//callback(`Server error when processing message: ${response.statusCode} - ${response.statusMessage}`);
100+
}
101+
resolve("done")
102+
});
103+
}) //end
104+
}
105+
106+
}
107+
// execute
108+
run()
109+
110+
async function audit(message) {
111+
const pl_processid = 5555
112+
const jsonpayload = JSON.parse(message)
113+
payload = JSON.parse(jsonpayload.payload)
114+
payload1 = payload.payload
115+
const pl_seqid = payload1.payloadseqid
116+
const pl_topic = payload1.topic // TODO can move in config ?
117+
const pl_table = payload1.table
118+
const pl_uniquecolumn = payload1.Uniquecolumn
119+
const pl_operation = payload1.operation
120+
const pl_timestamp = payload1.timestamp
121+
//const pl_payload = JSON.stringify(payload.payload)
122+
const logMessage = `${pl_seqid} ${pl_processid} ${pl_table} ${pl_uniquecolumn} ${pl_operation} ${payload.timestamp}`
123+
logger.debug(`reconsiler : ${logMessage}`);
124+
//await auditTrail([pl_seqid, pl_processid, pl_table, pl_uniquecolumn, pl_operation, "push-to-kafka-reconsiler", "", "reconsiler", "", "", new Date(), ""], 'producer')
125+
}
126+

src/services/auditTrail.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ if (sourcetype === 'producer'){
4545
// logger.debug(`--Audit Trail update success-- `)
4646
}
4747
})
48+
pgClient2.end()
4849
}
4950

5051

0 commit comments

Comments
 (0)