From 846f1728180e45ab3ca796cc5f1b8f809d1d0ed6 Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Thu, 27 Aug 2020 16:27:00 +0530 Subject: [PATCH] Group elasticsearch actions on the same message to use the same mutex --- src/app.js | 10 +++-- src/common/helper.js | 68 ++++++++++++++++++++++++-------- src/services/ProcessorService.js | 52 +++++++++++++----------- 3 files changed, 86 insertions(+), 44 deletions(-) diff --git a/src/app.js b/src/app.js index 9f1adbb..5939c39 100644 --- a/src/app.js +++ b/src/app.js @@ -5,6 +5,7 @@ global.Promise = require('bluebird') const config = require('config') const Kafka = require('no-kafka') +const _ = require('lodash') const healthcheck = require('topcoder-healthcheck-dropin') const logger = require('./common/logger') const helper = require('./common/helper') @@ -66,17 +67,17 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a await consumer.commitOffset({ topic, partition, offset: m.offset }) return } - + const transactionId = _.uniqueId('transaction_') try { switch (topic) { case config.UBAHN_CREATE_TOPIC: - await ProcessorService.processCreate(messageJSON) + await ProcessorService.processCreate(messageJSON, transactionId) break case config.UBAHN_UPDATE_TOPIC: - await ProcessorService.processUpdate(messageJSON) + await ProcessorService.processUpdate(messageJSON, transactionId) break case config.UBAHN_DELETE_TOPIC: - await ProcessorService.processDelete(messageJSON) + await ProcessorService.processDelete(messageJSON, transactionId) break } @@ -84,6 +85,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a } catch (err) { logger.logFullError(err) } finally { + helper.checkEsMutexRelease(transactionId) logger.debug(`Commiting offset after processing message with count ${messageCount}`) // Commit offset regardless of error diff --git a/src/common/helper.js b/src/common/helper.js index 3cbcd95..5c46aa3 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -13,8 +13,10 @@ AWS.config.region = config.ES.AWS_REGION // Elasticsearch client let esClient +let transactionId // Mutex to ensure that only one elasticsearch action is carried out at any given time const esClientMutex = new Mutex() +const mutexReleaseMap = {} /** * Get Kafka options @@ -58,11 +60,24 @@ async function getESClient () { // Patch the transport to enable mutex esClient.transport.originalRequest = esClient.transport.request esClient.transport.request = async (params) => { - const release = await esClientMutex.acquire() + const tId = _.get(params.query, 'transactionId') + params.query = _.omit(params.query, 'transactionId') + if (!tId || tId !== transactionId) { + const release = await esClientMutex.acquire() + mutexReleaseMap[tId || 'noTransaction'] = release + transactionId = tId + } try { return await esClient.transport.originalRequest(params) } finally { - release() + if (params.method !== 'GET' || !tId) { + const release = mutexReleaseMap[tId || 'noTransaction'] + delete mutexReleaseMap[tId || 'noTransaction'] + transactionId = undefined + if (release) { + release() + } + } } } @@ -86,17 +101,13 @@ function validProperties (payload, keys) { /** * Function to get user from es * @param {String} userId - * @param {Boolean} sourceOnly + * @param {String} transactionId * @returns {Object} user */ -async function getUser (userId, sourceOnly = true) { +async function getUser (userId, transactionId) { const client = await getESClient() - - if (sourceOnly) { - return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) - } - - return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) + const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId }) + return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source } } /** @@ -104,14 +115,16 @@ async function getUser (userId, sourceOnly = true) { * @param {String} userId * @param {Number} seqNo * @param {Number} primaryTerm + * @param {String} transactionId * @param {Object} body */ -async function updateUser (userId, body, seqNo, primaryTerm) { +async function updateUser (userId, body, seqNo, primaryTerm, transactionId) { const client = await getESClient() await client.update({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, + transactionId, body: { doc: body }, if_seq_no: seqNo, if_primary_term: primaryTerm @@ -121,26 +134,33 @@ async function updateUser (userId, body, seqNo, primaryTerm) { /** * Function to get org from es * @param {String} organizationId + * @param {String} transactionId * @returns {Object} organization */ -async function getOrg (organizationId) { +async function getOrg (organizationId, transactionId) { const client = await getESClient() - return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) + const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId }) + return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source } } /** * Function to update es organization * @param {String} organizationId + * @param {Number} seqNo + * @param {Number} primaryTerm + * @param {String} transactionId * @param {Object} body */ -async function updateOrg (organizationId, body) { +async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) { const client = await getESClient() await client.update({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, + transactionId, body: { doc: body }, - refresh: 'true' + if_seq_no: seqNo, + if_primary_term: primaryTerm }) } @@ -156,6 +176,21 @@ function getErrorWithStatus (message, statusCode) { return error } +/** + * Ensure the esClient mutex is released + * @param {String} tId transactionId + */ +function checkEsMutexRelease (tId) { + if (tId === transactionId) { + const release = mutexReleaseMap[tId] + delete mutexReleaseMap[tId] + transactionId = undefined + if (release) { + release() + } + } +} + module.exports = { getKafkaOptions, getESClient, @@ -164,5 +199,6 @@ module.exports = { updateUser, getOrg, updateOrg, - getErrorWithStatus + getErrorWithStatus, + checkEsMutexRelease } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 2fd6f50..98738a5 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -15,8 +15,9 @@ const { /** * Process create entity message * @param {Object} message the kafka message + * @param {String} transactionId */ -async function processCreate (message) { +async function processCreate (message, transactionId) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... @@ -33,7 +34,7 @@ async function processCreate (message) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const user = await helper.getUser(message.payload.userId) + const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) const relateId = message.payload[userResource.relateKey] if (!user[userResource.propertyName]) { user[userResource.propertyName] = [] @@ -45,13 +46,13 @@ async function processCreate (message) { throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) } else { user[userResource.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateUser(message.payload.userId, user) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) } } else if (_.includes(_.keys(organizationResources), resource)) { // process org resources such as org skill provider const orgResources = organizationResources[resource] orgResources.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) + const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) const relateId = message.payload[orgResources.relateKey] if (!org[orgResources.propertyName]) { org[orgResources.propertyName] = [] @@ -63,7 +64,7 @@ async function processCreate (message) { throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) } else { org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -79,14 +80,16 @@ processCreate.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required() + }).required(), + transactionId: Joi.string().required() } /** * Process update entity message * @param {Object} message the kafka message + * @param {String} transactionId */ -async function processUpdate (message) { +async function processUpdate (message, transactionId) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { logger.info(`Processing top level resource: ${resource}`) @@ -95,15 +98,16 @@ async function processUpdate (message) { const client = await helper.getESClient() const { index, type } = topResources[resource] const id = message.payload.id - const source = await client.getSource({ index, type, id }) + const source = await client.get({ index, type, id, transaction: true }) await client.update({ index, type, id, body: { - doc: _.assign(source, _.omit(message.payload, 'resource')) + doc: _.assign(source._source, _.omit(message.payload, 'resource')) }, - refresh: true + if_seq_no: source._seq_no, + if_primary_term: source._primary_term }) } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... @@ -112,10 +116,7 @@ async function processUpdate (message) { logger.info(`Processing user level resource: ${resource}:${relateId}`) userResource.validate(message.payload) logger.info(`Resource validated for ${relateId}`) - let user = await helper.getUser(message.payload.userId, false) - const seqNo = user._seq_no - const primaryTerm = user._primary_term - user = user._source + const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) logger.info(`User fetched ${user.id} and ${relateId}`) // const relateId = message.payload[userResource.relateKey] @@ -127,7 +128,7 @@ async function processUpdate (message) { const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) logger.info(`Updating ${user.id} and ${relateId}`) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) logger.info(`Updated ${user.id} and ${relateId}`) } } else if (_.includes(_.keys(organizationResources), resource)) { @@ -135,7 +136,7 @@ async function processUpdate (message) { // process org resources such as org skill providers const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) + const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) const relateId = message.payload[orgResource.relateKey] // check the resource exist @@ -145,7 +146,7 @@ async function processUpdate (message) { } else { const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -161,14 +162,16 @@ processUpdate.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required() + }).required(), + transactionId: Joi.string().required() } /** * Process delete entity message * @param {Object} message the kafka message + * @param {String} transactionId */ -async function processDelete (message) { +async function processDelete (message, transactionId) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... @@ -184,7 +187,7 @@ async function processDelete (message) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const user = await helper.getUser(message.payload.userId) + const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) const relateId = message.payload[userResource.relateKey] // check the resource exist @@ -193,13 +196,13 @@ async function processDelete (message) { throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) } else { _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) - await helper.updateUser(message.payload.userId, user) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) } } else if (_.includes(_.keys(organizationResources), resource)) { // process user resources such as org skill provider const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) + const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) const relateId = message.payload[orgResource.relateKey] // check the resource exist @@ -208,7 +211,7 @@ async function processDelete (message) { throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) } else { _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - await helper.updateOrg(message.payload.organizationId, org) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -224,7 +227,8 @@ processDelete.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required() + }).required(), + transactionId: Joi.string().required() } module.exports = {