@@ -41,56 +41,61 @@ const terminate = () => process.exit()
41
41
* @param {String } topic The name of the message topic
42
42
* @param {Number } partition The kafka partition to which messages are written
43
43
*/
44
- let message ;
45
- let cs_payloadseqid ;
44
+ // let message;
45
+ // let cs_payloadseqid;
46
46
async function dataHandler ( messageSet , topic , partition ) {
47
+ let cs_payloadseqid
47
48
for ( const m of messageSet ) { // Process messages sequentially
48
49
let message
49
50
try {
50
- //let message
51
51
message = JSON . parse ( m . message . value )
52
- // cs_payloadseqid = message.payload.payloadseqid
53
- //console.log(message);
54
52
logger . debug ( 'Received message from kafka:' )
55
- logger . debug ( `consumer : ${ message . payload . payloadseqid } ${ message . payload . table } ${ message . payload . Uniquecolumn } ${ message . payload . operation } ${ message . timestamp } ` ) ;
53
+ if ( message . payload . payloadseqid ) cs_payloadseqid = message . payload . payloadseqid ;
54
+ logger . debug ( `consumer : ${ message . payload . payloadseqid } ${ message . payload . table } ${ message . payload . Uniquecolumn } ${ message . payload . operation } ${ message . timestamp } ` ) ;
56
55
await updateInformix ( message )
57
56
await consumer . commitOffset ( { topic, partition, offset : m . offset } ) // Commit offset only on success
58
- await auditTrail ( [ message . payload . payloadseqid , cs_processId , message . payload . table , message . payload . Uniquecolumn ,
57
+ auditTrail ( [ cs_payloadseqid , cs_processId , message . payload . table , message . payload . Uniquecolumn ,
59
58
message . payload . operation , "Informix-updated" , "" , "" , "" , message . payload . data , message . timestamp , message . topic ] , 'consumer' )
60
59
} catch ( err ) {
61
60
logger . error ( `Could not process kafka message or informix DB error: "${ err . message } "` )
62
61
//logger.logFullError(err)
62
+ logger . debug ( `error-sync: consumer "${ err . message } "` )
63
63
if ( ! cs_payloadseqid ) {
64
64
cs_payloadseqid = 'err-' + ( new Date ( ) ) . getTime ( ) . toString ( 36 ) + Math . random ( ) . toString ( 36 ) . slice ( 2 ) ;
65
- }
65
+ }
66
66
67
67
await auditTrail ( [ cs_payloadseqid , 3333 , 'message.payload.table' , 'message.payload.Uniquecolumn' ,
68
68
'message.payload.operation' , "Error-Consumer" , "" , err . message , "" , 'message.payload.data' , new Date ( ) , 'message.topic' ] , 'consumer' )
69
69
try {
70
+ var retryvar
71
+ if ( message . payload [ 'retryCount' ] ) retryvar = message . payload . retryCount ;
70
72
await consumer . commitOffset ( { topic, partition, offset : m . offset } ) // Commit success as will re-publish
73
+ await auditTrail ( [ cs_payloadseqid , 3333 , 'message.payload.table' , 'message.payload.Uniquecolumn' ,
74
+ 'message.payload.operation' , "Informix-Updated1" , retryvar , "" , "" , 'message.payload.data' , new Date ( ) , 'message.topic' ] , 'consumer' )
71
75
logger . debug ( `Trying to push same message after adding retryCounter` )
72
76
if ( ! message . payload . retryCount ) {
73
77
message . payload . retryCount = 0
74
78
logger . debug ( 'setting retry counter to 0 and max try count is : ' , config . KAFKA . maxRetry ) ;
75
79
}
76
80
if ( message . payload . retryCount >= config . KAFKA . maxRetry ) {
77
81
logger . debug ( 'Recached at max retry counter, sending it to error queue: ' , config . KAFKA . errorTopic ) ;
78
-
82
+ logger . debug ( `error-sync: consumer max-retry-limit reached` )
79
83
let notifiyMessage = Object . assign ( { } , message , { topic : config . KAFKA . errorTopic } )
80
84
notifiyMessage . payload [ 'recipients' ] = config . KAFKA . recipients
81
85
logger . debug ( 'pushing following message on kafka error alert queue:' )
82
- logger . debug ( notifiyMessage )
86
+ // logger.debug(notifiyMessage)
83
87
await pushToKafka ( notifiyMessage )
84
88
return
85
89
}
86
90
message . payload [ 'retryCount' ] = message . payload . retryCount + 1 ;
87
- //await auditTrail([message.payload.payloadseqid,cs_processId,message.payload.table,message.payload.Uniquecolumn,
88
- // message.payload.operation,"Error",message.payload['retryCount'],err.message,"",message.payload.data, message.timestamp,message.topic],'consumer')
89
91
await pushToKafka ( message )
90
92
logger . debug ( ` After kafka push Retry Count "${ message . payload . retryCount } "` )
91
93
} catch ( err ) {
92
94
95
+ await auditTrail ( [ cs_payloadseqid , cs_processId , message . payload . table , message . payload . Uniquecolumn ,
96
+ message . payload . operation , "Error-republishing" , message . payload [ 'retryCount' ] , err . message , "" , message . payload . data , message . timestamp , message . topic ] , 'consumer' )
93
97
logger . error ( "Error occured in re-publishing kafka message" , err )
98
+ logger . debug ( `error-sync: consumer re-publishing "${ err . message } "` )
94
99
}
95
100
}
96
101
}
@@ -111,6 +116,7 @@ async function setupKafkaConsumer() {
111
116
} catch ( err ) {
112
117
logger . error ( 'Could not setup kafka consumer' )
113
118
logger . logFullError ( err )
119
+ logger . debug ( `error-sync: consumer kafka-setup "${ err . message } "` )
114
120
terminate ( )
115
121
}
116
122
}
0 commit comments