1
1
2
-
2
+
3
3
const config = require ( 'config' )
4
4
const pg = require ( 'pg' )
5
5
var AWS = require ( "aws-sdk" ) ;
@@ -13,139 +13,138 @@ const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password}
13
13
14
14
15
15
const port = 3000
16
+ var pgClient = null
16
17
//===============RECONSILER2 DYNAMODB CODE STARTS HERE ==========================
17
18
18
19
var docClient = new AWS . DynamoDB . DocumentClient ( {
19
- region : 'us-east-1' ,
20
- convertEmptyValues : true
21
- } ) ;
20
+ region : 'us-east-1' ,
21
+ convertEmptyValues : true
22
+ } ) ;
22
23
//ElapsedTime = 094600000
23
24
ElapsedTime = config . DYNAMODB . DD_ElapsedTime
24
- var params = {
25
- TableName : config . DYNAMODB . DYNAMODB_TABLE ,
26
- FilterExpression : "#timestamp between :time_1 and :time_2" ,
27
- ExpressionAttributeNames : {
28
- "#timestamp" : "timestamp" ,
29
- } ,
30
- ExpressionAttributeValues : {
31
- ":time_1" : Date . now ( ) - ElapsedTime ,
32
- ":time_2" : Date . now ( )
33
- }
25
+ var params = {
26
+ TableName : config . DYNAMODB . DYNAMODB_TABLE ,
27
+ FilterExpression : "#timestamp between :time_1 and :time_2" ,
28
+ ExpressionAttributeNames : {
29
+ "#timestamp" : "timestamp" ,
30
+ } ,
31
+ ExpressionAttributeValues : {
32
+ ":time_1" : Date . now ( ) - ElapsedTime ,
33
+ ":time_2" : Date . now ( )
34
34
}
35
-
36
- async function callReconsiler2 ( )
37
- {
38
- console . log ( "inside 2" ) ;
39
- await docClient . scan ( params , onScan ) ;
40
- return
41
- }
42
- async function onScan ( err , data ) {
43
- if ( err ) {
44
- logger . error ( "Unable to scan the table. Error JSON:" , JSON . stringify ( err , null , 2 ) ) ;
45
- terminate ( )
46
- } else {
47
- try
48
- {
49
- console . log ( "Scan succeeded." ) ;
50
- let total_dd_records = 0 ;
51
- let total_pushtokafka = 0 ;
52
- data . Items . forEach ( async function ( item ) {
53
- //console.log(item.payloadseqid);
54
- var retval = await verify_pg_record_exists ( item . payloadseqid )
55
- //console.log("retval", retval);
56
- var s_payload = ( item . pl_document )
57
- payload = s_payload
58
- payload1 = ( payload . payload )
59
- logger . info ( `Checking for : ${ item . payloadseqid } ${ payload1 . table } ` )
60
- logger . info ( `retval : ${ retval } ` )
61
- if ( retval === false && `"${ payload1 . table } "` !== "sync_test_id" ) {
62
- //await pushToKafka(item.pl_document)
63
- await kafkaService . pushToKafka ( s_payload )
64
- await audit ( s_payload , 1 ) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb
65
- logger . info ( `Reconsiler2 Posted Payload : ${ JSON . stringify ( item . pl_document ) } ` )
66
- logger . info ( `Total push-to-kafka Count : ${ total_pushtokafka } ` )
67
- total_pushtokafka += 1
68
- }
69
- total_dd_records += 1
70
- } ) ;
71
- logger . info ( `Reconsiler2 : count of total_dd_records ${ total_dd_records } ` ) ;
72
- if ( typeof data . LastEvaluatedKey != "undefined" ) {
73
- console . log ( "Scanning for more..." ) ;
74
- params . ExclusiveStartKey = data . LastEvaluatedKey ;
75
- await docClient . scan ( params , onScan ) ;
76
- }
77
- else
78
- {
79
- terminate ( )
80
- }
81
- }
82
- catch ( err ) {
83
- const errmsg = `error-sync: Reconsiler2 : Error during dynamodb scan/kafka push: "${ err . message } "`
84
- logger . error ( errmsg )
85
- logger . logFullError ( err )
86
- callposttoslack ( errmsg )
87
- //terminate()
88
- }
89
- }
90
- //terminate()
91
35
}
92
36
93
- async function verify_pg_record_exists ( seqid )
94
- {
37
+ async function callReconsiler2 ( ) {
38
+ console . log ( "inside 2" ) ;
39
+ await docClient . scan ( params , onScan ) ;
40
+ return
41
+ }
42
+ async function onScan ( err , data ) {
43
+ if ( err ) {
44
+ logger . error ( "Unable to scan the table. Error JSON:" , JSON . stringify ( err , null , 2 ) ) ;
45
+ terminate ( )
46
+ } else {
95
47
try {
96
- let pgClient = new pg . Client ( pgConnectionString )
97
- if ( ! pgClient . connect ( ) ) { await pgClient . connect ( ) }
98
- var paramvalues = [ seqid ]
99
- sql = "select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.payloadseqid = ($1)"
100
- logger . info ( `sql and params : ${ sql } ${ paramvalues } ` )
101
- return new Promise ( function ( resolve , reject ) {
102
- pgClient . query ( sql , paramvalues , ( err , result ) => {
103
- if ( err ) {
104
- var errmsg0 = `error-sync: Audit reconsiler2 query "${ err . message } "`
105
- console . log ( errmsg0 )
106
- }
107
- else {
108
- logger . info ( `Query result for ${ paramvalues } : ${ result . rowCount } ` )
109
- if ( result . rows . length > 0 ) {
110
- //console.log("row length > 0 ")
111
- resolve ( true ) ;
112
- }
113
- else {
114
- //console.log("0")
115
- resolve ( false ) ;
116
- }
117
- }
118
- pgClient . end ( )
119
- pgClient = null
120
- } )
121
- } )
122
- return
48
+ console . log ( "Scan succeeded." ) ;
49
+ await Promise . all ( data . Items . map ( async ( item ) => {
50
+ // data.Items.forEach(async function (item) {
51
+ //console.log(item.payloadseqid);
52
+ let retval ;
53
+ try {
54
+ retval = await verify_pg_record_exists ( item . payloadseqid )
55
+ } catch ( e ) {
56
+ logger . error ( `${ e } ` )
57
+ }
58
+ //console.log("retval", retval);
59
+ var s_payload = ( item . pl_document )
60
+ payload = s_payload
61
+ payload1 = ( payload . payload )
62
+ logger . info ( `Checking for : ${ item . payloadseqid } ${ payload1 . table } ` )
63
+ logger . info ( `retval : ${ retval } ` )
64
+ if ( retval === false && `"${ payload1 . table } "` !== "sync_test_id" ) {
65
+ //await pushToKafka(item.pl_document)
66
+ logger . info ( `Inside retval condition` )
67
+ await kafkaService . pushToKafka ( s_payload )
68
+ await audit ( s_payload , 1 ) //0 flag means reconsiler 1. 1 flag reconsiler 2 i,e dynamodb
69
+ logger . info ( `Reconsiler2 Posted Payload : ${ JSON . stringify ( item . pl_document ) } ` )
70
+ }
71
+ logger . info ( `after retval condition` )
72
+ } ) ) ;
73
+ if ( typeof data . LastEvaluatedKey != "undefined" ) {
74
+ console . log ( "Scanning for more..." ) ;
75
+ // logger.info(`params.ExclusiveStartKey : ${params.ExclusiveStartKey}`)
76
+ // logger.info(`data.LastEvaluatedKey: ${data.LastEvaluatedKey}`)
77
+ params . ExclusiveStartKey = data . LastEvaluatedKey ;
78
+ await docClient . scan ( params , onScan ) ;
79
+ }
80
+ else {
81
+ logger . info ( "Need to terminate." )
82
+ terminate ( )
83
+ //return
84
+
85
+ }
123
86
}
124
87
catch ( err ) {
125
- const errmsg = `error-sync: Reconsiler2 : Error in setting up postgres client: "${ err . message } "`
126
- logger . error ( errmsg )
127
- logger . logFullError ( err )
128
- await callposttoslack ( errmsg )
129
- terminate ( )
88
+ const errmsg = `error-sync: Reconsiler2 : Error during dynamodb scan/kafka push: "${ err . message } "`
89
+ logger . error ( errmsg )
90
+ logger . logFullError ( err )
91
+ callposttoslack ( errmsg )
92
+ //terminate()
93
+ }
130
94
}
95
+ //terminate()
96
+ }
97
+
98
+ async function verify_pg_record_exists ( seqid ) {
99
+ return new Promise ( async function ( resolve , reject ) {
100
+ try {
101
+ var paramvalues = [ seqid ]
102
+ let sql = "select * from common_oltp.pgifx_sync_audit where pgifx_sync_audit.payloadseqid = ($1)"
103
+ logger . info ( `sql and params : ${ sql } ${ paramvalues } ` )
104
+ pgClient . query ( sql , paramvalues , ( err , result ) => {
105
+ if ( err ) {
106
+ var errmsg0 = `error-sync: Audit reconsiler2 query "${ err . message } "`
107
+ logger . error ( errmsg0 )
108
+ reject ( errmsg0 ) ;
109
+ } else {
110
+ logger . info ( `Query result for ${ paramvalues } : ${ result . rowCount } ` )
111
+ if ( result . rows . length > 0 ) {
112
+ //console.log("row length > 0 ")
113
+ resolve ( true ) ;
114
+ } else {
115
+ //console.log("0")
116
+ resolve ( false ) ;
117
+ }
118
+ }
119
+ //pgClient.end()
120
+ } )
121
+ } catch ( err ) {
122
+ const errmsg = `error-sync: Reconsiler2 : Error in setting up postgres client: "${ err . message } "`
123
+ logger . error ( errmsg )
124
+ logger . logFullError ( err )
125
+ await callposttoslack ( errmsg )
126
+ reject ( errmsg )
127
+ terminate ( )
128
+ }
129
+ } ) ;
131
130
}
132
131
133
- async function audit ( message , reconsileflag ) {
134
- var pl_producererr = "Reconsiler2"
135
- const pl_processid = 5555
136
- payload1 = ( message . payload )
137
- const pl_seqid = payload1 . payloadseqid
138
- const pl_topic = payload1 . topic // TODO can move in config ?
139
- const pl_table = payload1 . table
140
- const pl_uniquecolumn = payload1 . Uniquecolumn
141
- const pl_operation = payload1 . operation
142
- const pl_timestamp = payload1 . timestamp
143
- //const pl_payload = JSON.stringify(message)
144
- const pl_payload = message
145
- const logMessage = `${ pl_seqid } ${ pl_processid } ${ pl_table } ${ pl_uniquecolumn } ${ pl_operation } `
146
- logger . debug ( `${ pl_producererr } : ${ logMessage } ` ) ;
147
- await auditTrail ( [ pl_seqid , pl_processid , pl_table , pl_uniquecolumn , pl_operation , "push-to-kafka" , "" , "" , "Reconsiler2" , pl_payload , new Date ( ) , "" ] , 'producer' )
148
- return
132
+ async function audit ( message , reconsileflag ) {
133
+ var pl_producererr = "Reconsiler2"
134
+ const pl_processid = 5555
135
+ payload1 = ( message . payload )
136
+ const pl_seqid = payload1 . payloadseqid
137
+ const pl_topic = payload1 . topic // TODO can move in config ?
138
+ const pl_table = payload1 . table
139
+ const pl_uniquecolumn = payload1 . Uniquecolumn
140
+ const pl_operation = payload1 . operation
141
+ const pl_timestamp = payload1 . timestamp
142
+ //const pl_payload = JSON.stringify(message)
143
+ const pl_payload = message
144
+ const logMessage = `${ pl_seqid } ${ pl_processid } ${ pl_table } ${ pl_uniquecolumn } ${ pl_operation } `
145
+ logger . debug ( `${ pl_producererr } : ${ logMessage } ` ) ;
146
+ await auditTrail ( [ pl_seqid , pl_processid , pl_table , pl_uniquecolumn , pl_operation , "push-to-kafka" , "" , "" , "Reconsiler2" , pl_payload , new Date ( ) , "" ] , 'producer' )
147
+ return
149
148
}
150
149
151
150
async function callposttoslack ( slackmessage ) {
@@ -167,22 +166,30 @@ async function callposttoslack(slackmessage) {
167
166
} ) ;
168
167
} ) //end
169
168
}
170
- return
169
+ return
171
170
}
172
171
173
172
//=================BEGIN HERE =======================
174
173
const terminate = ( ) => process . exit ( )
175
174
176
175
async function run ( ) {
177
176
//logger.debug("Initialising Reconsiler1 setup...")
178
- //await setupPgClient()
177
+ //await setupPgClient()
179
178
logger . debug ( "Initialising Reconsiler2 setup..." )
180
- kafkaService . init ( ) . catch ( ( e ) => {
181
- logger . error ( `Kafka producer intialization error: "${ e } "` )
182
- terminate ( )
183
- } )
179
+ kafkaService . init ( ) . catch ( ( e ) => {
180
+ logger . error ( `Kafka producer intialization error: "${ e } "` )
181
+ terminate ( )
182
+ } )
183
+ pgClient = new pg . Client ( pgConnectionString )
184
+ pgClient . connect ( err => {
185
+ if ( err ) {
186
+ logger . error ( `connection error, ${ err . stack } ` )
187
+ } else {
188
+ logger . info ( 'pg connected.' )
189
+ }
190
+ } )
184
191
callReconsiler2 ( )
185
- // terminate()
192
+ // terminate()
186
193
}
187
194
//execute
188
195
run ( )
0 commit comments