diff --git a/src/common/helper.js b/src/common/helper.js index 7d02b8c..3cbcd95 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -13,7 +13,6 @@ AWS.config.region = config.ES.AWS_REGION // Elasticsearch client let esClient - // Mutex to ensure that only one elasticsearch action is carried out at any given time const esClientMutex = new Mutex() @@ -56,18 +55,18 @@ async function getESClient () { }) } - return esClient -} - -/** - * Wraps original get es client function - * to control access to elasticsearch using a mutex - */ -async function getESClientWrapper () { - const client = await getESClient() - const release = await esClientMutex.acquire() + // Patch the transport to enable mutex + esClient.transport.originalRequest = esClient.transport.request + esClient.transport.request = async (params) => { + const release = await esClientMutex.acquire() + try { + return await esClient.transport.originalRequest(params) + } finally { + release() + } + } - return { client, release } + return esClient } /** @@ -87,25 +86,17 @@ function validProperties (payload, keys) { /** * Function to get user from es * @param {String} userId - * @param {Boolean} isTransaction Is this part of a transaction? + * @param {Boolean} sourceOnly * @returns {Object} user */ -async function getUser (userId, isTransaction = false) { - const { client, release } = await getESClientWrapper() - - try { - const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) - - if (isTransaction) { - return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source, release } - } +async function getUser (userId, sourceOnly = true) { + const client = await getESClient() - return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source } - } finally { - if (!isTransaction) { - release() - } + if (sourceOnly) { + return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) } + + return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) } /** @@ -114,92 +105,43 @@ async function getUser (userId, isTransaction = false) { * @param {Number} seqNo * @param {Number} primaryTerm * @param {Object} body - * @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to release */ -async function updateUser (userId, body, seqNo, primaryTerm, isTransaction = false) { - let client, release - - if (isTransaction) { - client = await getESClient() - } else { - const esClient = await getESClientWrapper() - client = esClient.client - release = esClient.release - } - - try { - await client.update({ - index: config.get('ES.USER_INDEX'), - type: config.get('ES.USER_TYPE'), - id: userId, - body: { doc: body }, - if_seq_no: seqNo, - if_primary_term: primaryTerm - }) - } finally { - if (!isTransaction) { - release() - } - } +async function updateUser (userId, body, seqNo, primaryTerm) { + const client = await getESClient() + await client.update({ + index: config.get('ES.USER_INDEX'), + type: config.get('ES.USER_TYPE'), + id: userId, + body: { doc: body }, + if_seq_no: seqNo, + if_primary_term: primaryTerm + }) } /** * Function to get org from es * @param {String} organizationId - * @param {Boolean} isTransaction Is this part of a transaction? * @returns {Object} organization */ -async function getOrg (organizationId, isTransaction = false) { - const { client, release } = await getESClientWrapper() - - try { - const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) - - if (isTransaction) { - return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source, release } - } - - return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source } - } finally { - if (!isTransaction) { - release() - } - } +async function getOrg (organizationId) { + const client = await getESClient() + return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) } /** * Function to update es organization * @param {String} organizationId * @param {Object} body - * @param {Number} seqNo - * @param {Number} primaryTerm - * @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to lock */ -async function updateOrg (organizationId, body, seqNo, primaryTerm, isTransaction = false) { - let client, release - - if (isTransaction) { - client = await getESClient() - } else { - const esClient = await getESClientWrapper() - client = esClient.client - release = esClient.release - } - - try { - await client.update({ - index: config.get('ES.ORGANIZATION_INDEX'), - type: config.get('ES.ORGANIZATION_TYPE'), - id: organizationId, - body: { doc: body }, - if_seq_no: seqNo, - if_primary_term: primaryTerm - }) - } finally { - if (!isTransaction) { - release() - } - } +async function updateOrg (organizationId, body) { + const client = await getESClient() + await client.update({ + index: config.get('ES.ORGANIZATION_INDEX'), + type: config.get('ES.ORGANIZATION_TYPE'), + id: organizationId, + body: { doc: body }, + refresh: 'true' + }) } /** @@ -216,7 +158,7 @@ function getErrorWithStatus (message, statusCode) { module.exports = { getKafkaOptions, - getESClientWrapper, + getESClient, validProperties, getUser, updateUser, diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 7ea0914..2fd6f50 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -21,64 +21,49 @@ async function processCreate (message) { if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const { client, release } = await helper.getESClientWrapper() - - try { - await client.create({ - index: topResources[resource].index, - type: topResources[resource].type, - id: message.payload.id, - body: _.omit(message.payload, 'resource'), - refresh: 'true' - }) - } finally { - release() - } + const client = await helper.getESClient() + await client.create({ + index: topResources[resource].index, + type: topResources[resource].type, + id: message.payload.id, + body: _.omit(message.payload, 'resource'), + refresh: 'true' + }) } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) - - try { - const relateId = message.payload[userResource.relateKey] - if (!user[userResource.propertyName]) { - user[userResource.propertyName] = [] - } + const user = await helper.getUser(message.payload.userId) + const relateId = message.payload[userResource.relateKey] + if (!user[userResource.propertyName]) { + user[userResource.propertyName] = [] + } - // check the resource does not exist - if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`) - throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) - } else { - user[userResource.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) - } - } finally { - release() + // check the resource does not exist + if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`) + throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) + } else { + user[userResource.propertyName].push(_.omit(message.payload, 'resource')) + await helper.updateUser(message.payload.userId, user) } } else if (_.includes(_.keys(organizationResources), resource)) { // process org resources such as org skill provider const orgResources = organizationResources[resource] orgResources.validate(message.payload) - const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId) - - try { - const relateId = message.payload[orgResources.relateKey] - if (!org[orgResources.propertyName]) { - org[orgResources.propertyName] = [] - } + const org = await helper.getOrg(message.payload.organizationId) + const relateId = message.payload[orgResources.relateKey] + if (!org[orgResources.propertyName]) { + org[orgResources.propertyName] = [] + } - // check the resource does not exist - if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) { - logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`) - throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) - } else { - org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) - } - } finally { - release() + // check the resource does not exist + if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) { + logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`) + throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) + } else { + org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -107,25 +92,19 @@ async function processUpdate (message) { logger.info(`Processing top level resource: ${resource}`) // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const { client, release } = await helper.getESClientWrapper() - - try { - const { index, type } = topResources[resource] - const id = message.payload.id - const source = await client.get({ index, type, id }) - await client.update({ - index, - type, - id, - body: { - doc: _.assign(source._source, _.omit(message.payload, 'resource')) - }, - if_seq_no: source._seq_no, - if_primary_term: source._primary_term - }) - } finally { - release() - } + const client = await helper.getESClient() + const { index, type } = topResources[resource] + const id = message.payload.id + const source = await client.getSource({ index, type, id }) + await client.update({ + index, + type, + id, + body: { + doc: _.assign(source, _.omit(message.payload, 'resource')) + }, + refresh: true + }) } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] @@ -133,43 +112,40 @@ async function processUpdate (message) { logger.info(`Processing user level resource: ${resource}:${relateId}`) userResource.validate(message.payload) logger.info(`Resource validated for ${relateId}`) - const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) - - try { - // check the resource exist - if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) - user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - logger.info(`Updating ${user.id} and ${relateId}`) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) - logger.info(`Updated ${user.id} and ${relateId}`) - } - } finally { - release() + let user = await helper.getUser(message.payload.userId, false) + const seqNo = user._seq_no + const primaryTerm = user._primary_term + user = user._source + logger.info(`User fetched ${user.id} and ${relateId}`) + // const relateId = message.payload[userResource.relateKey] + + // check the resource exist + if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) + user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) + logger.info(`Updating ${user.id} and ${relateId}`) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm) + logger.info(`Updated ${user.id} and ${relateId}`) } } else if (_.includes(_.keys(organizationResources), resource)) { logger.info(`Processing org level resource: ${resource}`) // process org resources such as org skill providers const orgResource = organizationResources[resource] orgResource.validate(message.payload) + const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResource.relateKey] - const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId) - try { - // check the resource exist - if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) - } - } finally { - release() + // check the resource exist + if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) + org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -197,55 +173,42 @@ async function processDelete (message) { if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... helper.validProperties(message.payload, ['id']) - const { client, release } = await helper.getESClientWrapper() - - try { - await client.delete({ - index: topResources[resource].index, - type: topResources[resource].type, - id: message.payload.id, - refresh: 'true' - }) - } finally { - release() - } + const client = await helper.getESClient() + await client.delete({ + index: topResources[resource].index, + type: topResources[resource].type, + id: message.payload.id, + refresh: 'true' + }) } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) + const user = await helper.getUser(message.payload.userId) const relateId = message.payload[userResource.relateKey] - const { seqNo, primaryTerm, user, release } = await helper.getUser(message.payload.userId, true) - try { - // check the resource exist - if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, true) - } - } finally { - release() + // check the resource exist + if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) + await helper.updateUser(message.payload.userId, user) } } else if (_.includes(_.keys(organizationResources), resource)) { // process user resources such as org skill provider const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const { seqNo, primaryTerm, org, release } = await helper.getOrg(message.payload.organizationId, true) + const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResource.relateKey] - try { - // check the resource exist - if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) - throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) - } else { - _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, true) - } - } finally { - release() + // check the resource exist + if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId} not exist`) + throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) + } else { + _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) diff --git a/test/common/init-es.js b/test/common/init-es.js index a489749..6f56f83 100644 --- a/test/common/init-es.js +++ b/test/common/init-es.js @@ -1,7 +1,6 @@ /** - * ! OBSOLETE SCRIPT. NOT MAINTAINED. - * ! Use the one from ubahn-api instead - * ! ( https://github.com/topcoder-platform/u-bahn-api ) + * !OBSOLETE SCRIPT. Use the one from ubahn-api instead + * !( https://github.com/topcoder-platform/u-bahn-api ) */ /** diff --git a/test/common/testHelper.js b/test/common/testHelper.js index 68b030e..8a4e64a 100644 --- a/test/common/testHelper.js +++ b/test/common/testHelper.js @@ -5,26 +5,26 @@ const _ = require('lodash') const helper = require('../../src/common/helper') const { topResources, userResources, organizationResources } = require('../../src/common/constants') +var client +(async function () { + client = await helper.getESClient() +})() + /** * Get record from ES. * @param {Object} kafka message payload * @return {Object} the record entity */ async function getESRecord (payload) { - const { client, release } = await helper.getESClientWrapper() if (topResources[payload.resource]) { - try { - return client.getSource({ - index: topResources[payload.resource].index, - type: topResources[payload.resource].type, - id: payload.id - }) - } finally { - release() - } + return client.getSource({ + index: topResources[payload.resource].index, + type: topResources[payload.resource].type, + id: payload.id + }) } else if (organizationResources[payload.resource]) { const orgResource = organizationResources[payload.resource] - const { org } = await helper.getOrg(payload.organizationId) + const org = await helper.getOrg(payload.organizationId) if (!org || !org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, payload[orgResource.relateKey]])) { const err = Error('[resource_not_found_exception]') err.statusCode = 404 @@ -33,7 +33,7 @@ async function getESRecord (payload) { return _.find(org[orgResource.propertyName], [orgResource.relateKey, payload[orgResource.relateKey]]) } else { const userResource = userResources[payload.resource] - const { user } = await helper.getUser(payload.userId) + const user = await helper.getUser(payload.userId) if (!user || !user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, payload[userResource.relateKey]])) { const err = Error('[resource_not_found_exception]') err.statusCode = 404 diff --git a/test/common/view-data.js b/test/common/view-data.js index 4db5cd5..fbd978c 100644 --- a/test/common/view-data.js +++ b/test/common/view-data.js @@ -8,24 +8,23 @@ const logger = require('../../src/common/logger') const helper = require('../../src/common/helper') const { topResources } = require('../../src/common/constants') +let client + if (process.argv.length < 4) { logger.error('Missing argument for Resource and Elasticsearch id.') process.exit() } const view = async (resource, id) => { - let { client, release } = await helper.getESClientWrapper() - - try { - if (_.includes(_.keys(topResources), resource)) { - const ret = await client.getSource({ index: topResources[resource].index, type: topResources[resource].type, id }) - logger.info('Elasticsearch data:') - logger.info(JSON.stringify(ret, null, 4)) - } else { - logger.warn(`resource is invalid, it should in [${_.keys(topResources)}]`) - } - } finally { - release() + if (!client) { + client = await helper.getESClient() + } + if (_.includes(_.keys(topResources), resource)) { + const ret = await client.getSource({ index: topResources[resource].index, type: topResources[resource].type, id }) + logger.info('Elasticsearch data:') + logger.info(JSON.stringify(ret, null, 4)) + } else { + logger.warn(`resource is invalid, it should in [${_.keys(topResources)}]`) } }