Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Revert "Group elasticsearch actions on the same message to use the same mutex" #10

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
global.Promise = require('bluebird')
const config = require('config')
const Kafka = require('no-kafka')
const _ = require('lodash')
const healthcheck = require('topcoder-healthcheck-dropin')
const logger = require('./common/logger')
const helper = require('./common/helper')
Expand Down Expand Up @@ -67,25 +66,24 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}
const transactionId = _.uniqueId('transaction_')

try {
switch (topic) {
case config.UBAHN_CREATE_TOPIC:
await ProcessorService.processCreate(messageJSON, transactionId)
await ProcessorService.processCreate(messageJSON)
break
case config.UBAHN_UPDATE_TOPIC:
await ProcessorService.processUpdate(messageJSON, transactionId)
await ProcessorService.processUpdate(messageJSON)
break
case config.UBAHN_DELETE_TOPIC:
await ProcessorService.processDelete(messageJSON, transactionId)
await ProcessorService.processDelete(messageJSON)
break
}

logger.debug(`Successfully processed message with count ${messageCount}`)
} catch (err) {
logger.logFullError(err)
} finally {
helper.checkEsMutexRelease(transactionId)
logger.debug(`Commiting offset after processing message with count ${messageCount}`)

// Commit offset regardless of error
Expand Down
68 changes: 16 additions & 52 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ AWS.config.region = config.ES.AWS_REGION

// Elasticsearch client
let esClient
let transactionId
// Mutex to ensure that only one elasticsearch action is carried out at any given time
const esClientMutex = new Mutex()
const mutexReleaseMap = {}

/**
* Get Kafka options
Expand Down Expand Up @@ -60,24 +58,11 @@ async function getESClient () {
// Patch the transport to enable mutex
esClient.transport.originalRequest = esClient.transport.request
esClient.transport.request = async (params) => {
const tId = _.get(params.query, 'transactionId')
params.query = _.omit(params.query, 'transactionId')
if (!tId || tId !== transactionId) {
const release = await esClientMutex.acquire()
mutexReleaseMap[tId || 'noTransaction'] = release
transactionId = tId
}
const release = await esClientMutex.acquire()
try {
return await esClient.transport.originalRequest(params)
} finally {
if (params.method !== 'GET' || !tId) {
const release = mutexReleaseMap[tId || 'noTransaction']
delete mutexReleaseMap[tId || 'noTransaction']
transactionId = undefined
if (release) {
release()
}
}
release()
}
}

Expand All @@ -101,30 +86,32 @@ function validProperties (payload, keys) {
/**
* Function to get user from es
* @param {String} userId
* @param {String} transactionId
* @param {Boolean} sourceOnly
* @returns {Object} user
*/
async function getUser (userId, transactionId) {
async function getUser (userId, sourceOnly = true) {
const client = await getESClient()
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }

if (sourceOnly) {
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
}

return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
}

/**
* Function to update es user
* @param {String} userId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {String} transactionId
* @param {Object} body
*/
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
async function updateUser (userId, body, seqNo, primaryTerm) {
const client = await getESClient()
await client.update({
index: config.get('ES.USER_INDEX'),
type: config.get('ES.USER_TYPE'),
id: userId,
transactionId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm
Expand All @@ -134,33 +121,26 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
/**
* Function to get org from es
* @param {String} organizationId
* @param {String} transactionId
* @returns {Object} organization
*/
async function getOrg (organizationId, transactionId) {
async function getOrg (organizationId) {
const client = await getESClient()
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
}

/**
* Function to update es organization
* @param {String} organizationId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {String} transactionId
* @param {Object} body
*/
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
async function updateOrg (organizationId, body) {
const client = await getESClient()
await client.update({
index: config.get('ES.ORGANIZATION_INDEX'),
type: config.get('ES.ORGANIZATION_TYPE'),
id: organizationId,
transactionId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm
refresh: 'true'
})
}

Expand All @@ -176,21 +156,6 @@ function getErrorWithStatus (message, statusCode) {
return error
}

/**
* Ensure the esClient mutex is released
* @param {String} tId transactionId
*/
function checkEsMutexRelease (tId) {
if (tId === transactionId) {
const release = mutexReleaseMap[tId]
delete mutexReleaseMap[tId]
transactionId = undefined
if (release) {
release()
}
}
}

module.exports = {
getKafkaOptions,
getESClient,
Expand All @@ -199,6 +164,5 @@ module.exports = {
updateUser,
getOrg,
updateOrg,
getErrorWithStatus,
checkEsMutexRelease
getErrorWithStatus
}
52 changes: 24 additions & 28 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ const {
/**
* Process create entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processCreate (message, transactionId) {
async function processCreate (message) {
const resource = message.payload.resource
if (_.includes(_.keys(topResources), resource)) {
// process the top resources such as user, skill...
Expand All @@ -34,7 +33,7 @@ async function processCreate (message, transactionId) {
// process user resources such as userSkill, userAttribute...
const userResource = userResources[resource]
userResource.validate(message.payload)
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
const user = await helper.getUser(message.payload.userId)
const relateId = message.payload[userResource.relateKey]
if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
Expand All @@ -46,13 +45,13 @@ async function processCreate (message, transactionId) {
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
} else {
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
await helper.updateUser(message.payload.userId, user)
}
} else if (_.includes(_.keys(organizationResources), resource)) {
// process org resources such as org skill provider
const orgResources = organizationResources[resource]
orgResources.validate(message.payload)
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
const org = await helper.getOrg(message.payload.organizationId)
const relateId = message.payload[orgResources.relateKey]
if (!org[orgResources.propertyName]) {
org[orgResources.propertyName] = []
Expand All @@ -64,7 +63,7 @@ async function processCreate (message, transactionId) {
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
} else {
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
await helper.updateOrg(message.payload.organizationId, org)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
Expand All @@ -80,16 +79,14 @@ processCreate.schema = {
payload: Joi.object().keys({
resource: Joi.string().required()
}).required().unknown(true)
}).required(),
transactionId: Joi.string().required()
}).required()
}

/**
* Process update entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processUpdate (message, transactionId) {
async function processUpdate (message) {
const resource = message.payload.resource
if (_.includes(_.keys(topResources), resource)) {
logger.info(`Processing top level resource: ${resource}`)
Expand All @@ -98,16 +95,15 @@ async function processUpdate (message, transactionId) {
const client = await helper.getESClient()
const { index, type } = topResources[resource]
const id = message.payload.id
const source = await client.get({ index, type, id, transaction: true })
const source = await client.getSource({ index, type, id })
await client.update({
index,
type,
id,
body: {
doc: _.assign(source._source, _.omit(message.payload, 'resource'))
doc: _.assign(source, _.omit(message.payload, 'resource'))
},
if_seq_no: source._seq_no,
if_primary_term: source._primary_term
refresh: true
})
} else if (_.includes(_.keys(userResources), resource)) {
// process user resources such as userSkill, userAttribute...
Expand All @@ -116,7 +112,10 @@ async function processUpdate (message, transactionId) {
logger.info(`Processing user level resource: ${resource}:${relateId}`)
userResource.validate(message.payload)
logger.info(`Resource validated for ${relateId}`)
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
let user = await helper.getUser(message.payload.userId, false)
const seqNo = user._seq_no
const primaryTerm = user._primary_term
user = user._source
logger.info(`User fetched ${user.id} and ${relateId}`)
// const relateId = message.payload[userResource.relateKey]

Expand All @@ -128,15 +127,15 @@ async function processUpdate (message, transactionId) {
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
logger.info(`Updating ${user.id} and ${relateId}`)
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm)
logger.info(`Updated ${user.id} and ${relateId}`)
}
} else if (_.includes(_.keys(organizationResources), resource)) {
logger.info(`Processing org level resource: ${resource}`)
// process org resources such as org skill providers
const orgResource = organizationResources[resource]
orgResource.validate(message.payload)
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
const org = await helper.getOrg(message.payload.organizationId)
const relateId = message.payload[orgResource.relateKey]

// check the resource exist
Expand All @@ -146,7 +145,7 @@ async function processUpdate (message, transactionId) {
} else {
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
await helper.updateOrg(message.payload.organizationId, org)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
Expand All @@ -162,16 +161,14 @@ processUpdate.schema = {
payload: Joi.object().keys({
resource: Joi.string().required()
}).required().unknown(true)
}).required(),
transactionId: Joi.string().required()
}).required()
}

/**
* Process delete entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processDelete (message, transactionId) {
async function processDelete (message) {
const resource = message.payload.resource
if (_.includes(_.keys(topResources), resource)) {
// process the top resources such as user, skill...
Expand All @@ -187,7 +184,7 @@ async function processDelete (message, transactionId) {
// process user resources such as userSkill, userAttribute...
const userResource = userResources[resource]
userResource.validate(message.payload)
const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId)
const user = await helper.getUser(message.payload.userId)
const relateId = message.payload[userResource.relateKey]

// check the resource exist
Expand All @@ -196,13 +193,13 @@ async function processDelete (message, transactionId) {
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
_.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
await helper.updateUser(message.payload.userId, user)
}
} else if (_.includes(_.keys(organizationResources), resource)) {
// process user resources such as org skill provider
const orgResource = organizationResources[resource]
orgResource.validate(message.payload)
const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId)
const org = await helper.getOrg(message.payload.organizationId)
const relateId = message.payload[orgResource.relateKey]

// check the resource exist
Expand All @@ -211,7 +208,7 @@ async function processDelete (message, transactionId) {
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
} else {
_.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
await helper.updateOrg(message.payload.organizationId, org)
}
} else {
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`)
Expand All @@ -227,8 +224,7 @@ processDelete.schema = {
payload: Joi.object().keys({
resource: Joi.string().required()
}).required().unknown(true)
}).required(),
transactionId: Joi.string().required()
}).required()
}

module.exports = {
Expand Down