From c8ad7f95f492ae22c688aa92506cfccc0d32373d Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Thu, 27 Aug 2020 15:49:37 +0530 Subject: [PATCH] Identify and combine same actions in a single mutex --- src/common/helper.js | 140 +++++++++++++------ src/services/ProcessorService.js | 225 ++++++++++++++++++------------- test/common/init-es.js | 5 +- test/common/testHelper.js | 24 ++-- test/common/view-data.js | 23 ++-- 5 files changed, 257 insertions(+), 160 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index 3cbcd95..7d02b8c 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -13,6 +13,7 @@ AWS.config.region = config.ES.AWS_REGION // Elasticsearch client let esClient + // Mutex to ensure that only one elasticsearch action is carried out at any given time const esClientMutex = new Mutex() @@ -55,20 +56,20 @@ async function getESClient () { }) } - // Patch the transport to enable mutex - esClient.transport.originalRequest = esClient.transport.request - esClient.transport.request = async (params) => { - const release = await esClientMutex.acquire() - try { - return await esClient.transport.originalRequest(params) - } finally { - release() - } - } - return esClient } +/** + * Wraps original get es client function + * to control access to elasticsearch using a mutex + */ +async function getESClientWrapper () { + const client = await getESClient() + const release = await esClientMutex.acquire() + + return { client, release } +} + /** * Function to valid require keys * @param {Object} payload validated object @@ -86,17 +87,25 @@ function validProperties (payload, keys) { /** * Function to get user from es * @param {String} userId - * @param {Boolean} sourceOnly + * @param {Boolean} isTransaction Is this part of a transaction? * @returns {Object} user */ -async function getUser (userId, sourceOnly = true) { - const client = await getESClient() +async function getUser (userId, isTransaction = false) { + const { client, release } = await getESClientWrapper() - if (sourceOnly) { - return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) - } + try { + const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) + + if (isTransaction) { + return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source, release } + } - return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) + return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source } + } finally { + if (!isTransaction) { + release() + } + } } /** @@ -105,43 +114,92 @@ async function getUser (userId, sourceOnly = true) { * @param {Number} seqNo * @param {Number} primaryTerm * @param {Object} body + * @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to release */ -async function updateUser (userId, body, seqNo, primaryTerm) { - const client = await getESClient() - await client.update({ - index: config.get('ES.USER_INDEX'), - type: config.get('ES.USER_TYPE'), - id: userId, - body: { doc: body }, - if_seq_no: seqNo, - if_primary_term: primaryTerm - }) +async function updateUser (userId, body, seqNo, primaryTerm, isTransaction = false) { + let client, release + + if (isTransaction) { + client = await getESClient() + } else { + const esClient = await getESClientWrapper() + client = esClient.client + release = esClient.release + } + + try { + await client.update({ + index: config.get('ES.USER_INDEX'), + type: config.get('ES.USER_TYPE'), + id: userId, + body: { doc: body }, + if_seq_no: seqNo, + if_primary_term: primaryTerm + }) + } finally { + if (!isTransaction) { + release() + } + } } /** * Function to get org from es * @param {String} organizationId + * @param {Boolean} isTransaction Is this part of a transaction? * @returns {Object} organization */ -async function getOrg (organizationId) { - const client = await getESClient() - return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) +async function getOrg (organizationId, isTransaction = false) { + const { client, release } = await getESClientWrapper() + + try { + const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) + + if (isTransaction) { + return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source, release } + } + + return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source } + } finally { + if (!isTransaction) { + release() + } + } } /** * Function to update es organization * @param {String} organizationId * @param {Object} body + * @param {Number} seqNo + * @param {Number} primaryTerm + * @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to lock */ -async function updateOrg (organizationId, body) { - const client = await getESClient() - await client.update({ - index: config.get('ES.ORGANIZATION_INDEX'), - type: config.get('ES.ORGANIZATION_TYPE'), - id: organizationId, - body: { doc: body }, - refresh: 'true' - }) +async function updateOrg (organizationId, body, seqNo, primaryTerm, isTransaction = false) { + let client, release + + if (isTransaction) { + client = await getESClient() + } else { + const esClient = await getESClientWrapper() + client = esClient.client + release = esClient.release + } + + try { + await client.update({ + index: config.get('ES.ORGANIZATION_INDEX'), + type: config.get('ES.ORGANIZATION_TYPE'), + id: organizationId, + body: { doc: body }, + if_seq_no: seqNo, + if_primary_term: primaryTerm + }) + } finally { + if (!isTransaction) { + release() + } + } } /** @@ -158,7 +216,7 @@ function getErrorWithStatus (message, statusCode) { module.exports = { getKafkaOptions, - getESClient, + getESClientWrapper, validProperties, getUser, updateUser, diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 2fd6f50..7ea0914 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -21,49 +21,64 @@ async function processCreate (message) { if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const client = await helper.getESClient() - await client.create({ - index: topResources[resource].index, - type: topResources[resource].type, - id: message.payload.id, - body: _.omit(message.payload, 'resource'), - refresh: 'true' - }) + const { client, release } = await helper.getESClientWrapper() + + try { + await client.create({ + index: topResources[resource].index, + type: topResources[resource].type, + id: message.payload.id, + body: _.omit(message.payload, 'resource'), + refresh: 'true' + }) + } finally { + release() + } } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const user = await helper.getUser(message.payload.userId) - const relateId = message.payload[userResource.relateKey] - if (!user[userResource.propertyName]) { - user[userResource.propertyName] = [] - } + const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) + + try { + const relateId = message.payload[userResource.relateKey] + if (!user[userResource.propertyName]) { + user[userResource.propertyName] = [] + } - // check the resource does not exist - if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`) - throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) - } else { - user[userResource.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateUser(message.payload.userId, user) + // check the resource does not exist + if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`) + throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) + } else { + user[userResource.propertyName].push(_.omit(message.payload, 'resource')) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) + } + } finally { + release() } } else if (_.includes(_.keys(organizationResources), resource)) { // process org resources such as org skill provider const orgResources = organizationResources[resource] orgResources.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) - const relateId = message.payload[orgResources.relateKey] - if (!org[orgResources.propertyName]) { - org[orgResources.propertyName] = [] - } + const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId) - // check the resource does not exist - if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) { - logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`) - throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) - } else { - org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org) + try { + const relateId = message.payload[orgResources.relateKey] + if (!org[orgResources.propertyName]) { + org[orgResources.propertyName] = [] + } + + // check the resource does not exist + if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) { + logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`) + throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) + } else { + org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) + } + } finally { + release() } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -92,19 +107,25 @@ async function processUpdate (message) { logger.info(`Processing top level resource: ${resource}`) // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const client = await helper.getESClient() - const { index, type } = topResources[resource] - const id = message.payload.id - const source = await client.getSource({ index, type, id }) - await client.update({ - index, - type, - id, - body: { - doc: _.assign(source, _.omit(message.payload, 'resource')) - }, - refresh: true - }) + const { client, release } = await helper.getESClientWrapper() + + try { + const { index, type } = topResources[resource] + const id = message.payload.id + const source = await client.get({ index, type, id }) + await client.update({ + index, + type, + id, + body: { + doc: _.assign(source._source, _.omit(message.payload, 'resource')) + }, + if_seq_no: source._seq_no, + if_primary_term: source._primary_term + }) + } finally { + release() + } } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] @@ -112,40 +133,43 @@ async function processUpdate (message) { logger.info(`Processing user level resource: ${resource}:${relateId}`) userResource.validate(message.payload) logger.info(`Resource validated for ${relateId}`) - let user = await helper.getUser(message.payload.userId, false) - const seqNo = user._seq_no - const primaryTerm = user._primary_term - user = user._source - logger.info(`User fetched ${user.id} and ${relateId}`) - // const relateId = message.payload[userResource.relateKey] - - // check the resource exist - if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) - user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - logger.info(`Updating ${user.id} and ${relateId}`) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm) - logger.info(`Updated ${user.id} and ${relateId}`) + const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) + + try { + // check the resource exist + if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) + user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) + logger.info(`Updating ${user.id} and ${relateId}`) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) + logger.info(`Updated ${user.id} and ${relateId}`) + } + } finally { + release() } } else if (_.includes(_.keys(organizationResources), resource)) { logger.info(`Processing org level resource: ${resource}`) // process org resources such as org skill providers const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResource.relateKey] + const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId) - // check the resource exist - if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org) + try { + // check the resource exist + if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) + org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) + } + } finally { + release() } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -173,42 +197,55 @@ async function processDelete (message) { if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const client = await helper.getESClient() - await client.delete({ - index: topResources[resource].index, - type: topResources[resource].type, - id: message.payload.id, - refresh: 'true' - }) + const { client, release } = await helper.getESClientWrapper() + + try { + await client.delete({ + index: topResources[resource].index, + type: topResources[resource].type, + id: message.payload.id, + refresh: 'true' + }) + } finally { + release() + } } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const user = await helper.getUser(message.payload.userId) const relateId = message.payload[userResource.relateKey] + const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) - // check the resource exist - if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) - await helper.updateUser(message.payload.userId, user) + try { + // check the resource exist + if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) + } + } finally { + release() } } else if (_.includes(_.keys(organizationResources), resource)) { // process user resources such as org skill provider const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const org = await helper.getOrg(message.payload.organizationId) + const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId, true) const relateId = message.payload[orgResource.relateKey] - // check the resource exist - if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - await helper.updateOrg(message.payload.organizationId, org) + try { + // check the resource exist + if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) + await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) + } + } finally { + release() } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) diff --git a/test/common/init-es.js b/test/common/init-es.js index 6f56f83..a489749 100644 --- a/test/common/init-es.js +++ b/test/common/init-es.js @@ -1,6 +1,7 @@ /** - * !OBSOLETE SCRIPT. Use the one from ubahn-api instead - * !( https://github.com/topcoder-platform/u-bahn-api ) + * ! OBSOLETE SCRIPT. NOT MAINTAINED. + * ! Use the one from ubahn-api instead + * ! ( https://github.com/topcoder-platform/u-bahn-api ) */ /** diff --git a/test/common/testHelper.js b/test/common/testHelper.js index 8a4e64a..68b030e 100644 --- a/test/common/testHelper.js +++ b/test/common/testHelper.js @@ -5,26 +5,26 @@ const _ = require('lodash') const helper = require('../../src/common/helper') const { topResources, userResources, organizationResources } = require('../../src/common/constants') -var client -(async function () { - client = await helper.getESClient() -})() - /** * Get record from ES. * @param {Object} kafka message payload * @return {Object} the record entity */ async function getESRecord (payload) { + const { client, release } = await helper.getESClientWrapper() if (topResources[payload.resource]) { - return client.getSource({ - index: topResources[payload.resource].index, - type: topResources[payload.resource].type, - id: payload.id - }) + try { + return client.getSource({ + index: topResources[payload.resource].index, + type: topResources[payload.resource].type, + id: payload.id + }) + } finally { + release() + } } else if (organizationResources[payload.resource]) { const orgResource = organizationResources[payload.resource] - const org = await helper.getOrg(payload.organizationId) + const { org } = await helper.getOrg(payload.organizationId) if (!org || !org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, payload[orgResource.relateKey]])) { const err = Error('[resource_not_found_exception]') err.statusCode = 404 @@ -33,7 +33,7 @@ async function getESRecord (payload) { return _.find(org[orgResource.propertyName], [orgResource.relateKey, payload[orgResource.relateKey]]) } else { const userResource = userResources[payload.resource] - const user = await helper.getUser(payload.userId) + const { user } = await helper.getUser(payload.userId) if (!user || !user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, payload[userResource.relateKey]])) { const err = Error('[resource_not_found_exception]') err.statusCode = 404 diff --git a/test/common/view-data.js b/test/common/view-data.js index fbd978c..4db5cd5 100644 --- a/test/common/view-data.js +++ b/test/common/view-data.js @@ -8,23 +8,24 @@ const logger = require('../../src/common/logger') const helper = require('../../src/common/helper') const { topResources } = require('../../src/common/constants') -let client - if (process.argv.length < 4) { logger.error('Missing argument for Resource and Elasticsearch id.') process.exit() } const view = async (resource, id) => { - if (!client) { - client = await helper.getESClient() - } - if (_.includes(_.keys(topResources), resource)) { - const ret = await client.getSource({ index: topResources[resource].index, type: topResources[resource].type, id }) - logger.info('Elasticsearch data:') - logger.info(JSON.stringify(ret, null, 4)) - } else { - logger.warn(`resource is invalid, it should in [${_.keys(topResources)}]`) + let { client, release } = await helper.getESClientWrapper() + + try { + if (_.includes(_.keys(topResources), resource)) { + const ret = await client.getSource({ index: topResources[resource].index, type: topResources[resource].type, id }) + logger.info('Elasticsearch data:') + logger.info(JSON.stringify(ret, null, 4)) + } else { + logger.warn(`resource is invalid, it should in [${_.keys(topResources)}]`) + } + } finally { + release() } }