diff --git a/src/app.js b/src/app.js index 5939c39..9f1adbb 100644 --- a/src/app.js +++ b/src/app.js @@ -5,7 +5,6 @@ global.Promise = require('bluebird') const config = require('config') const Kafka = require('no-kafka') -const _ = require('lodash') const healthcheck = require('topcoder-healthcheck-dropin') const logger = require('./common/logger') const helper = require('./common/helper') @@ -67,17 +66,17 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a await consumer.commitOffset({ topic, partition, offset: m.offset }) return } - const transactionId = _.uniqueId('transaction_') + try { switch (topic) { case config.UBAHN_CREATE_TOPIC: - await ProcessorService.processCreate(messageJSON, transactionId) + await ProcessorService.processCreate(messageJSON) break case config.UBAHN_UPDATE_TOPIC: - await ProcessorService.processUpdate(messageJSON, transactionId) + await ProcessorService.processUpdate(messageJSON) break case config.UBAHN_DELETE_TOPIC: - await ProcessorService.processDelete(messageJSON, transactionId) + await ProcessorService.processDelete(messageJSON) break } @@ -85,7 +84,6 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a } catch (err) { logger.logFullError(err) } finally { - helper.checkEsMutexRelease(transactionId) logger.debug(`Commiting offset after processing message with count ${messageCount}`) // Commit offset regardless of error diff --git a/src/common/helper.js b/src/common/helper.js index 5c46aa3..3cbcd95 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -13,10 +13,8 @@ AWS.config.region = config.ES.AWS_REGION // Elasticsearch client let esClient -let transactionId // Mutex to ensure that only one elasticsearch action is carried out at any given time const esClientMutex = new Mutex() -const mutexReleaseMap = {} /** * Get Kafka options @@ -60,24 +58,11 @@ async function getESClient () { // Patch the transport to enable mutex esClient.transport.originalRequest = esClient.transport.request esClient.transport.request = async (params) => { - const tId = _.get(params.query, 'transactionId') - params.query = _.omit(params.query, 'transactionId') - if (!tId || tId !== transactionId) { - const release = await esClientMutex.acquire() - mutexReleaseMap[tId || 'noTransaction'] = release - transactionId = tId - } + const release = await esClientMutex.acquire() try { return await esClient.transport.originalRequest(params) } finally { - if (params.method !== 'GET' || !tId) { - const release = mutexReleaseMap[tId || 'noTransaction'] - delete mutexReleaseMap[tId || 'noTransaction'] - transactionId = undefined - if (release) { - release() - } - } + release() } } @@ -101,13 +86,17 @@ function validProperties (payload, keys) { /** * Function to get user from es * @param {String} userId - * @param {String} transactionId + * @param {Boolean} sourceOnly * @returns {Object} user */ -async function getUser (userId, transactionId) { +async function getUser (userId, sourceOnly = true) { const client = await getESClient() - const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId }) - return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source } + + if (sourceOnly) { + return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) + } + + return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId }) } /** @@ -115,16 +104,14 @@ async function getUser (userId, transactionId) { * @param {String} userId * @param {Number} seqNo * @param {Number} primaryTerm - * @param {String} transactionId * @param {Object} body */ -async function updateUser (userId, body, seqNo, primaryTerm, transactionId) { +async function updateUser (userId, body, seqNo, primaryTerm) { const client = await getESClient() await client.update({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, - transactionId, body: { doc: body }, if_seq_no: seqNo, if_primary_term: primaryTerm @@ -134,33 +121,26 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) { /** * Function to get org from es * @param {String} organizationId - * @param {String} transactionId * @returns {Object} organization */ -async function getOrg (organizationId, transactionId) { +async function getOrg (organizationId) { const client = await getESClient() - const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId }) - return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source } + return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId }) } /** * Function to update es organization * @param {String} organizationId - * @param {Number} seqNo - * @param {Number} primaryTerm - * @param {String} transactionId * @param {Object} body */ -async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) { +async function updateOrg (organizationId, body) { const client = await getESClient() await client.update({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, - transactionId, body: { doc: body }, - if_seq_no: seqNo, - if_primary_term: primaryTerm + refresh: 'true' }) } @@ -176,21 +156,6 @@ function getErrorWithStatus (message, statusCode) { return error } -/** - * Ensure the esClient mutex is released - * @param {String} tId transactionId - */ -function checkEsMutexRelease (tId) { - if (tId === transactionId) { - const release = mutexReleaseMap[tId] - delete mutexReleaseMap[tId] - transactionId = undefined - if (release) { - release() - } - } -} - module.exports = { getKafkaOptions, getESClient, @@ -199,6 +164,5 @@ module.exports = { updateUser, getOrg, updateOrg, - getErrorWithStatus, - checkEsMutexRelease + getErrorWithStatus } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 98738a5..2fd6f50 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -15,9 +15,8 @@ const { /** * Process create entity message * @param {Object} message the kafka message - * @param {String} transactionId */ -async function processCreate (message, transactionId) { +async function processCreate (message) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... @@ -34,7 +33,7 @@ async function processCreate (message, transactionId) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) + const user = await helper.getUser(message.payload.userId) const relateId = message.payload[userResource.relateKey] if (!user[userResource.propertyName]) { user[userResource.propertyName] = [] @@ -46,13 +45,13 @@ async function processCreate (message, transactionId) { throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) } else { user[userResource.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) + await helper.updateUser(message.payload.userId, user) } } else if (_.includes(_.keys(organizationResources), resource)) { // process org resources such as org skill provider const orgResources = organizationResources[resource] orgResources.validate(message.payload) - const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) + const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResources.relateKey] if (!org[orgResources.propertyName]) { org[orgResources.propertyName] = [] @@ -64,7 +63,7 @@ async function processCreate (message, transactionId) { throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409) } else { org[orgResources.propertyName].push(_.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -80,16 +79,14 @@ processCreate.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required(), - transactionId: Joi.string().required() + }).required() } /** * Process update entity message * @param {Object} message the kafka message - * @param {String} transactionId */ -async function processUpdate (message, transactionId) { +async function processUpdate (message) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { logger.info(`Processing top level resource: ${resource}`) @@ -98,16 +95,15 @@ async function processUpdate (message, transactionId) { const client = await helper.getESClient() const { index, type } = topResources[resource] const id = message.payload.id - const source = await client.get({ index, type, id, transaction: true }) + const source = await client.getSource({ index, type, id }) await client.update({ index, type, id, body: { - doc: _.assign(source._source, _.omit(message.payload, 'resource')) + doc: _.assign(source, _.omit(message.payload, 'resource')) }, - if_seq_no: source._seq_no, - if_primary_term: source._primary_term + refresh: true }) } else if (_.includes(_.keys(userResources), resource)) { // process user resources such as userSkill, userAttribute... @@ -116,7 +112,10 @@ async function processUpdate (message, transactionId) { logger.info(`Processing user level resource: ${resource}:${relateId}`) userResource.validate(message.payload) logger.info(`Resource validated for ${relateId}`) - const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) + let user = await helper.getUser(message.payload.userId, false) + const seqNo = user._seq_no + const primaryTerm = user._primary_term + user = user._source logger.info(`User fetched ${user.id} and ${relateId}`) // const relateId = message.payload[userResource.relateKey] @@ -128,7 +127,7 @@ async function processUpdate (message, transactionId) { const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId]) user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) logger.info(`Updating ${user.id} and ${relateId}`) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) + await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm) logger.info(`Updated ${user.id} and ${relateId}`) } } else if (_.includes(_.keys(organizationResources), resource)) { @@ -136,7 +135,7 @@ async function processUpdate (message, transactionId) { // process org resources such as org skill providers const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) + const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResource.relateKey] // check the resource exist @@ -146,7 +145,7 @@ async function processUpdate (message, transactionId) { } else { const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId]) org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource')) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -162,16 +161,14 @@ processUpdate.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required(), - transactionId: Joi.string().required() + }).required() } /** * Process delete entity message * @param {Object} message the kafka message - * @param {String} transactionId */ -async function processDelete (message, transactionId) { +async function processDelete (message) { const resource = message.payload.resource if (_.includes(_.keys(topResources), resource)) { // process the top resources such as user, skill... @@ -187,7 +184,7 @@ async function processDelete (message, transactionId) { // process user resources such as userSkill, userAttribute... const userResource = userResources[resource] userResource.validate(message.payload) - const { seqNo, primaryTerm, user } = await helper.getUser(message.payload.userId, transactionId) + const user = await helper.getUser(message.payload.userId) const relateId = message.payload[userResource.relateKey] // check the resource exist @@ -196,13 +193,13 @@ async function processDelete (message, transactionId) { throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) } else { _.remove(user[userResource.propertyName], [userResource.relateKey, relateId]) - await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId) + await helper.updateUser(message.payload.userId, user) } } else if (_.includes(_.keys(organizationResources), resource)) { // process user resources such as org skill provider const orgResource = organizationResources[resource] orgResource.validate(message.payload) - const { seqNo, primaryTerm, org } = await helper.getOrg(message.payload.organizationId, transactionId) + const org = await helper.getOrg(message.payload.organizationId) const relateId = message.payload[orgResource.relateKey] // check the resource exist @@ -211,7 +208,7 @@ async function processDelete (message, transactionId) { throw helper.getErrorWithStatus('[resource_not_found_exception]', 404) } else { _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId]) - await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId) + await helper.updateOrg(message.payload.organizationId, org) } } else { logger.info(`Ignore this message since resource is not in [${_.union(_.keys(topResources), _.keys(userResources), _.keys(organizationResources))}]`) @@ -227,8 +224,7 @@ processDelete.schema = { payload: Joi.object().keys({ resource: Joi.string().required() }).required().unknown(true) - }).required(), - transactionId: Joi.string().required() + }).required() } module.exports = {