Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 8942530

Browse files
Undo changes carried out for the issue of ES not updating all users and instead add mutex at top level
1 parent c86a22a commit 8942530

File tree

3 files changed

+40
-130
lines changed

3 files changed

+40
-130
lines changed

src/app.js

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
global.Promise = require('bluebird')
66
const config = require('config')
77
const Kafka = require('no-kafka')
8-
const _ = require('lodash')
98
const healthcheck = require('topcoder-healthcheck-dropin')
109
const logger = require('./common/logger')
1110
const helper = require('./common/helper')
@@ -17,79 +16,62 @@ logger.info('Starting kafka consumer')
1716
// create consumer
1817
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
1918

20-
let count = 0
2119
let mutex = new Mutex()
2220

23-
async function getLatestCount () {
24-
const release = await mutex.acquire()
25-
26-
try {
27-
count = count + 1
28-
29-
return count
30-
} finally {
31-
release()
32-
}
33-
}
34-
3521
/*
3622
* Data handler linked with Kafka consumer
3723
* Whenever a new message is received by Kafka consumer,
3824
* this function will be invoked
3925
*/
4026
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
27+
const release = await mutex.acquire()
4128
const message = m.message.value.toString('utf8')
4229
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
4330
m.offset}; Message: ${message}.`)
4431
let messageJSON
45-
let messageCount = await getLatestCount()
4632

47-
logger.debug(`Current message count: ${messageCount}`)
4833
try {
4934
messageJSON = JSON.parse(message)
5035
} catch (e) {
5136
logger.error('Invalid message JSON.')
5237
logger.logFullError(e)
5338

54-
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
55-
5639
// commit the message and ignore it
5740
await consumer.commitOffset({ topic, partition, offset: m.offset })
5841
return
42+
} finally {
43+
release()
5944
}
6045

6146
if (messageJSON.topic !== topic) {
6247
logger.error(`The message topic ${messageJSON.topic} doesn't match the Kafka topic ${topic}.`)
6348

64-
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
65-
6649
// commit the message and ignore it
6750
await consumer.commitOffset({ topic, partition, offset: m.offset })
51+
release()
6852
return
6953
}
70-
const transactionId = _.uniqueId('transaction_')
54+
7155
try {
7256
switch (topic) {
7357
case config.UBAHN_CREATE_TOPIC:
74-
await ProcessorService.processCreate(messageJSON, transactionId)
58+
await ProcessorService.processCreate(messageJSON)
7559
break
7660
case config.UBAHN_UPDATE_TOPIC:
77-
await ProcessorService.processUpdate(messageJSON, transactionId)
61+
await ProcessorService.processUpdate(messageJSON)
7862
break
7963
case config.UBAHN_DELETE_TOPIC:
80-
await ProcessorService.processDelete(messageJSON, transactionId)
64+
await ProcessorService.processDelete(messageJSON)
8165
break
8266
}
8367

84-
logger.debug(`Successfully processed message with count ${messageCount}`)
68+
logger.debug('Successfully processed message')
8569
} catch (err) {
8670
logger.logFullError(err)
8771
} finally {
88-
helper.checkEsMutexRelease(transactionId)
89-
logger.debug(`Commiting offset after processing message with count ${messageCount}`)
90-
9172
// Commit offset regardless of error
9273
await consumer.commitOffset({ topic, partition, offset: m.offset })
74+
release()
9375
}
9476
})
9577

src/common/helper.js

Lines changed: 9 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,11 @@ const config = require('config')
77
const elasticsearch = require('elasticsearch')
88
const _ = require('lodash')
99
const Joi = require('@hapi/joi')
10-
const { Mutex } = require('async-mutex')
11-
const logger = require('./logger')
1210

1311
AWS.config.region = config.ES.AWS_REGION
1412

1513
// Elasticsearch client
1614
let esClient
17-
let transactionId
18-
// Mutex to ensure that only one elasticsearch action is carried out at any given time
19-
const esClientMutex = new Mutex()
20-
const mutexReleaseMap = {}
2115

2216
/**
2317
* Get Kafka options
@@ -57,31 +51,6 @@ async function getESClient () {
5751
host
5852
})
5953
}
60-
61-
// Patch the transport to enable mutex
62-
esClient.transport.originalRequest = esClient.transport.request
63-
esClient.transport.request = async (params) => {
64-
const tId = _.get(params.query, 'transactionId')
65-
params.query = _.omit(params.query, 'transactionId')
66-
if (!tId || tId !== transactionId) {
67-
const release = await esClientMutex.acquire()
68-
mutexReleaseMap[tId || 'noTransaction'] = release
69-
transactionId = tId
70-
}
71-
try {
72-
return await esClient.transport.originalRequest(params)
73-
} finally {
74-
if (params.method !== 'GET' || !tId) {
75-
const release = mutexReleaseMap[tId || 'noTransaction']
76-
delete mutexReleaseMap[tId || 'noTransaction']
77-
transactionId = undefined
78-
if (release) {
79-
release()
80-
}
81-
}
82-
}
83-
}
84-
8554
return esClient
8655
}
8756

@@ -102,66 +71,50 @@ function validProperties (payload, keys) {
10271
/**
10372
* Function to get user from es
10473
* @param {String} userId
105-
* @param {String} transactionId
10674
* @returns {Object} user
10775
*/
108-
async function getUser (userId, transactionId) {
76+
async function getUser (userId) {
10977
const client = await getESClient()
110-
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
111-
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
78+
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
11279
}
11380

11481
/**
11582
* Function to update es user
11683
* @param {String} userId
117-
* @param {Number} seqNo
118-
* @param {Number} primaryTerm
119-
* @param {String} transactionId
12084
* @param {Object} body
12185
*/
122-
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
86+
async function updateUser (userId, body) {
12387
const client = await getESClient()
12488
await client.update({
12589
index: config.get('ES.USER_INDEX'),
12690
type: config.get('ES.USER_TYPE'),
12791
id: userId,
128-
transactionId,
129-
body: { doc: body },
130-
if_seq_no: seqNo,
131-
if_primary_term: primaryTerm
92+
body: { doc: body }
13293
})
13394
}
13495

13596
/**
13697
* Function to get org from es
13798
* @param {String} organizationId
138-
* @param {String} transactionId
13999
* @returns {Object} organization
140100
*/
141-
async function getOrg (organizationId, transactionId) {
101+
async function getOrg (organizationId) {
142102
const client = await getESClient()
143-
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
144-
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
103+
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
145104
}
146105

147106
/**
148107
* Function to update es organization
149108
* @param {String} organizationId
150-
* @param {Number} seqNo
151-
* @param {Number} primaryTerm
152-
* @param {String} transactionId
153109
* @param {Object} body
154110
*/
155-
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
111+
async function updateOrg (organizationId, body) {
156112
const client = await getESClient()
157113
await client.update({
158114
index: config.get('ES.ORGANIZATION_INDEX'),
159115
type: config.get('ES.ORGANIZATION_TYPE'),
160116
id: organizationId,
161-
transactionId,
162-
body: { doc: body },
163-
if_seq_no: seqNo,
164-
if_primary_term: primaryTerm
117+
body: { doc: body }
165118
})
166119
}
167120

@@ -177,21 +130,6 @@ function getErrorWithStatus (message, statusCode) {
177130
return error
178131
}
179132

180-
/**
181-
* Ensure the esClient mutex is released
182-
* @param {String} tId transactionId
183-
*/
184-
function checkEsMutexRelease (tId) {
185-
if (tId === transactionId) {
186-
const release = mutexReleaseMap[tId]
187-
delete mutexReleaseMap[tId]
188-
transactionId = undefined
189-
if (release) {
190-
release()
191-
}
192-
}
193-
}
194-
195133
module.exports = {
196134
getKafkaOptions,
197135
getESClient,
@@ -200,6 +138,5 @@ module.exports = {
200138
updateUser,
201139
getOrg,
202140
updateOrg,
203-
getErrorWithStatus,
204-
checkEsMutexRelease
141+
getErrorWithStatus
205142
}

0 commit comments

Comments
 (0)