@@ -5,78 +5,100 @@ const config = require('config')
5
5
const pg = require ( 'pg' )
6
6
const logger = require ( './common/logger' )
7
7
const pushToKafka = require ( './services/pushToKafka' )
8
+ const pushToDynamoDb = require ( './services/pushToDynamoDb' )
8
9
const pgOptions = config . get ( 'POSTGRES' )
9
10
const pgConnectionString = `postgresql://${ pgOptions . user } :${ pgOptions . password } @${ pgOptions . host } :${ pgOptions . port } /${ pgOptions . database } `
10
11
const pgClient = new pg . Client ( pgConnectionString )
11
12
const auditTrail = require ( './services/auditTrail' ) ;
12
13
const express = require ( 'express' )
13
14
const app = express ( )
14
15
const port = 3000
15
- //console.log(`pgConnectionString value = ${pgConnectionString}`)
16
- var pl_processid ;
17
- var pl_randonseq = 'err-' + ( new Date ( ) ) . getTime ( ) . toString ( 36 ) + Math . random ( ) . toString ( 36 ) . slice ( 2 ) ;
18
- async function setupPgClient ( ) {
19
- try {
20
- //console.log(`${pgOptions.triggerFunctions}`);
16
+ const isFailover = process . argv [ 2 ] != undefined ? ( process . argv [ 2 ] === 'failover' ? true : false ) : false
17
+ async function setupPgClient ( ) {
18
+ try {
21
19
await pgClient . connect ( )
22
20
for ( const triggerFunction of pgOptions . triggerFunctions ) {
23
21
await pgClient . query ( `LISTEN ${ triggerFunction } ` )
24
22
}
25
23
pgClient . on ( 'notification' , async ( message ) => {
26
- //const payload = JSON.parse(message.payload);
27
- pl_processid = message . processId ;
28
- //console.log(message);
29
- try
30
- {
24
+ try {
31
25
const payload = JSON . parse ( message . payload )
32
- var pl_seqid = payload . payload . payloadseqid
33
- var pl_topic = payload . topic
34
- var pl_table = payload . payload . table
35
- var pl_uniquecolumn = payload . payload . Uniquecolumn
36
- var pl_operation = payload . payload . operation
37
- var pl_timestamp = payload . timestamp
38
- var pl_payload = JSON . stringify ( payload . payload )
39
- const validTopicAndOriginator = ( pgOptions . triggerTopics . includes ( payload . topic ) ) && ( pgOptions . triggerOriginators . includes ( payload . originator ) ) // Check if valid topic and originator
26
+ const validTopicAndOriginator = ( pgOptions . triggerTopics . includes ( payload . topic ) ) && ( pgOptions . triggerOriginators . includes ( payload . originator ) ) // Check if valid topic and originator
40
27
if ( validTopicAndOriginator ) {
41
- logger . debug ( `producer : ${ pl_seqid } ${ pl_processid } ${ pl_table } ${ pl_uniquecolumn } ${ pl_operation } ${ payload . timestamp } ` ) ;
42
- await pushToKafka ( payload )
43
- } else {
28
+ if ( ! isFailover ) {
29
+ logger . info ( 'trying to push on kafka topic' )
30
+ await pushToKafka ( payload )
31
+ logger . info ( 'pushed to kafka and added for audit trail' )
32
+ } else {
33
+ logger . info ( 'taking backup on dynamodb for reconciliation' )
34
+ await pushToDynamoDb ( payload )
35
+ }
36
+ audit ( message )
37
+ } else {
44
38
logger . debug ( 'Ignoring message with incorrect topic or originator' )
45
-
39
+ // push to slack - alertIt("slack message")
46
40
}
47
- await auditTrail ( [ pl_seqid , pl_processid , pl_table , pl_uniquecolumn , pl_operation , "push-to-kafka" , "" , "" , "" , pl_payload , pl_timestamp , pl_topic ] , 'producer' )
48
- } catch ( error ) {
41
+ } catch ( error ) {
49
42
logger . error ( 'Could not parse message payload' )
50
- logger . debug ( `error-sync: producer parse message : "${ error . message } "` )
51
- await auditTrail ( [ pl_randonseq , 1111 , 'pl_table' , 'pl_uniquecolumn' , 'pl_operation' , "error-producer" , "" , "" , error . message , 'pl_payload' , new Date ( ) , 'pl_topic' ] , 'producer' )
52
- logger . logFullError ( error )
43
+ logger . debug ( `error-sync: producer parse message : "${ error . message } "` )
44
+ logger . logFullError ( error )
45
+ audit ( error )
46
+ // push to slack - alertIt("slack message"
53
47
}
54
48
} )
55
- logger . info ( 'pg-ifx-sync producer: Listening to notifications ' )
49
+ logger . info ( 'pg-ifx-sync producer: Listening to pg-trigger channel. ' )
56
50
} catch ( err ) {
57
- logger . debug ( `error-sync: producer postgres-setup 1 :"${ err . message } "` )
58
- logger . error ( 'Could not setup postgres client' )
51
+ logger . error ( 'Error in setting up postgres client: ' , err . message )
59
52
logger . logFullError ( err )
60
-
53
+ // push to slack - alertIt("slack message")
61
54
terminate ( )
62
55
}
63
56
}
57
+
64
58
const terminate = ( ) => process . exit ( )
65
- async function run ( ) {
66
- try {
67
- await setupPgClient ( )
68
- }
69
- catch ( err )
70
- {
71
- logger . debug ( `Could not setup postgres client` )
72
- logger . debug ( `error-sync: producer postgres-setup 0 :"${ err . message } "` )
73
- }
59
+
60
+ async function run ( ) {
61
+ logger . debug ( "Initialising producer setup..." )
62
+ await setupPgClient ( )
74
63
}
75
64
65
+ // execute
76
66
run ( )
77
67
68
+ async function audit ( message ) {
69
+ const pl_processid = message . processId
70
+ if ( pl_processid != 'undefined' ) {
71
+ const payload = JSON . parse ( message . payload )
72
+ const pl_seqid = payload . payload . payloadseqid
73
+ const pl_topic = payload . topic // TODO can move in config ?
74
+ const pl_table = payload . payload . table
75
+ const pl_uniquecolumn = payload . payload . Uniquecolumn
76
+ const pl_operation = payload . payload . operation
77
+ const pl_timestamp = payload . timestamp
78
+ const pl_payload = JSON . stringify ( payload . payload )
79
+ const logMessage = `${ pl_seqid } ${ pl_processid } ${ pl_table } ${ pl_uniquecolumn } ${ pl_operation } ${ payload . timestamp } `
80
+ if ( ! isFailover ) {
81
+ logger . debug ( `producer : ${ logMessage } ` ) ;
82
+ } else {
83
+ logger . debug ( `Producer DynamoDb : ${ logMessage } ` ) ;
84
+ }
85
+ auditTrail ( [ pl_seqid , pl_processid , pl_table , pl_uniquecolumn , pl_operation , "push-to-kafka" , "" , "" , "" , pl_payload , pl_timestamp , pl_topic ] , 'producer' )
86
+ } else {
87
+ const pl_randonseq = 'err-' + ( new Date ( ) ) . getTime ( ) . toString ( 36 ) + Math . random ( ) . toString ( 36 ) . slice ( 2 )
88
+ if ( ! isFailover ) {
89
+ await auditTrail ( [ pl_randonseq , 1111 , "" , "" , "" , "error-producer" , "" , "" , message . message , "" , new Date ( ) , "" ] , 'producer' )
90
+ }
91
+ }
92
+ }
93
+
94
+ function alertIt ( message ) {
95
+ /**
96
+ // call slack
97
+ */
98
+ }
99
+
78
100
app . get ( '/health' , ( req , res ) => {
79
- res . send ( 'health ok' )
101
+ res . send ( 'health ok' )
80
102
} )
81
103
app . listen ( port , ( ) => console . log ( `app listening on port ${ port } !` ) )
82
104
0 commit comments