@@ -9,12 +9,28 @@ const healthcheck = require('topcoder-healthcheck-dropin')
9
9
const logger = require ( './common/logger' )
10
10
const helper = require ( './common/helper' )
11
11
const ProcessorService = require ( './services/ProcessorService' )
12
+ const Mutex = require ( 'async-mutex' ) . Mutex
12
13
13
14
// Start kafka consumer
14
15
logger . info ( 'Starting kafka consumer' )
15
16
// create consumer
16
17
const consumer = new Kafka . GroupConsumer ( helper . getKafkaOptions ( ) )
17
18
19
+ let count = 0
20
+ let mutex = new Mutex ( )
21
+
22
+ async function getLatestCount ( ) {
23
+ const release = await mutex . acquire ( )
24
+
25
+ try {
26
+ count = count + 1
27
+
28
+ return count
29
+ } finally {
30
+ release ( )
31
+ }
32
+ }
33
+
18
34
/*
19
35
* Data handler linked with Kafka consumer
20
36
* Whenever a new message is received by Kafka consumer,
@@ -25,12 +41,17 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
25
41
logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${
26
42
m . offset } ; Message: ${ message } .`)
27
43
let messageJSON
44
+ let messageCount = await getLatestCount ( )
45
+
46
+ logger . debug ( `Current message count: ${ messageCount } ` )
28
47
try {
29
48
messageJSON = JSON . parse ( message )
30
49
} catch ( e ) {
31
50
logger . error ( 'Invalid message JSON.' )
32
51
logger . logFullError ( e )
33
52
53
+ logger . debug ( `Commiting offset after processing message with count ${ messageCount } ` )
54
+
34
55
// commit the message and ignore it
35
56
await consumer . commitOffset ( { topic, partition, offset : m . offset } )
36
57
return
@@ -39,6 +60,8 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
39
60
if ( messageJSON . topic !== topic ) {
40
61
logger . error ( `The message topic ${ messageJSON . topic } doesn't match the Kafka topic ${ topic } .` )
41
62
63
+ logger . debug ( `Commiting offset after processing message with count ${ messageCount } ` )
64
+
42
65
// commit the message and ignore it
43
66
await consumer . commitOffset ( { topic, partition, offset : m . offset } )
44
67
return
@@ -57,10 +80,12 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
57
80
break
58
81
}
59
82
60
- logger . debug ( ' Successfully processed message' )
83
+ logger . debug ( ` Successfully processed message with count ${ messageCount } ` )
61
84
} catch ( err ) {
62
85
logger . logFullError ( err )
63
86
} finally {
87
+ logger . debug ( `Commiting offset after processing message with count ${ messageCount } ` )
88
+
64
89
// Commit offset regardless of error
65
90
await consumer . commitOffset ( { topic, partition, offset : m . offset } )
66
91
}
0 commit comments