diff --git a/src/common/db-helper.js b/src/common/db-helper.js index 5ba3e7e..b69bf01 100644 --- a/src/common/db-helper.js +++ b/src/common/db-helper.js @@ -105,24 +105,27 @@ async function get (model, pk, params) { * @param model the sequelize model object * @param entity entity to create * @param auth the user auth object + * @param transaction the transaction object * @returns {Promise} */ -async function create (model, entity, auth) { +async function create (model, entity, auth, transaction) { if (auth) { entity.createdBy = helper.getAuthUser(auth) } - return model.create(entity) + return model.create(entity, { transaction }) } /** * delete object by pk * @param model the sequelize model object * @param pk the primary key + * @param transaction the transaction object * @returns {Promise} */ -async function remove (model, pk, params) { +async function remove (model, pk, params, transaction) { const instance = await get(model, pk, params) - return instance.destroy() + const result = await instance.destroy({ transaction }) + return result } /** @@ -132,13 +135,14 @@ async function remove (model, pk, params) { * @param entity entity to create * @param auth the auth object * @param auth the path params + * @param transaction the transaction object * @returns {Promise} */ -async function update (model, pk, entity, auth, params) { +async function update (model, pk, entity, auth, params, transaction) { // insure that object exists const instance = await get(model, pk, params) entity.updatedBy = helper.getAuthUser(auth) - return instance.update(entity) + return instance.update(entity, { transaction }) } /** diff --git a/src/common/es-helper.js b/src/common/es-helper.js index e667de4..14a5e69 100644 --- a/src/common/es-helper.js +++ b/src/common/es-helper.js @@ -2,6 +2,7 @@ const config = require('config') const _ = require('lodash') const querystring = require('querystring') const logger = require('../common/logger') +const helper = require('../common/helper') const appConst = require('../consts') const esClient = require('./es-client').getESClient() @@ -282,6 +283,37 @@ function escapeRegex (str) { /* eslint-enable no-useless-escape */ } +/** + * Process create entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processCreate (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.index({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processDelete (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.delete({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + refresh: 'wait_for' + }) +} + async function getOrganizationId (handle) { const dBHelper = require('../common/db-helper') const sequelize = require('../models/index') @@ -1453,6 +1485,9 @@ async function searchAchievementValues ({ organizationId, keyword }) { } module.exports = { + processCreate, + processUpdate: processCreate, + processDelete, searchElasticSearch, getFromElasticSearch, searchUsers, diff --git a/src/common/helper.js b/src/common/helper.js index 95bde1e..0dcc811 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -1,4 +1,5 @@ const config = require('config') +const Joi = require('@hapi/joi') const querystring = require('querystring') const errors = require('./errors') const appConst = require('../consts') @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper') const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) +/** + * Function to valid require keys + * @param {Object} payload validated object + * @param {Array} keys required keys + * @throws {Error} if required key absent + */ +function validProperties (payload, keys) { + const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true) + const error = schema.validate(payload).error + if (error) { + throw error + } +} + /** * get auth user handle or id * @param authUser the user @@ -146,6 +161,7 @@ async function postEvent (topic, payload) { } module.exports = { + validProperties, getAuthUser, permissionCheck, checkIfExists, diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 68b55a0..45e5ed0 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -38,8 +38,17 @@ const MODEL_TO_RESOURCE = { * Create record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function createRecordInEs (resource, entity) { +async function createRecordInEs (resource, entity, toEs) { + try { + if (toEs) { + await esHelper.processCreate(resource, entity) + } + } catch (err) { + logger.logFullError(err) + throw err + } try { await publishMessage('create', resource, entity) } catch (err) { @@ -51,8 +60,17 @@ async function createRecordInEs (resource, entity) { * Patch record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function patchRecordInEs (resource, entity) { +async function patchRecordInEs (resource, entity, toEs) { + try { + if (toEs) { + await esHelper.processUpdate(resource, entity) + } + } catch (err) { + logger.logFullError(err) + throw err + } try { await publishMessage('patch', resource, entity) } catch (err) { @@ -65,8 +83,9 @@ async function patchRecordInEs (resource, entity) { * @param id the id of record * @param params the params of record (like nested ids) * @param resource the resource to delete + * @param toEs is to es directly */ -async function deleteRecordFromEs (id, params, resource) { +async function deleteRecordFromEs (id, params, resource, toEs) { let payload if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) { payload = _.assign({}, params) @@ -75,6 +94,15 @@ async function deleteRecordFromEs (id, params, resource) { id } } + try { + if (toEs) { + await esHelper.processDelete(resource, payload) + } + } catch (err) { + logger.logFullError(err) + throw err + } + try { await publishMessage('remove', resource, payload) } catch (err) { @@ -174,13 +202,14 @@ function sleep (ms) { } /** - * delete child of record with delay between each item deleted + * delete child of record with delay between each item deleted and with transaction * @param model the child model to delete * @param id the user id to delete * @param params the params for child * @param resourceName the es recource name + * @param transaction the transaction object */ -async function deleteChild (model, id, params, resourceName) { +async function deleteChild (model, id, params, resourceName, transaction) { const query = {} query[params[0]] = id const result = await dbHelper.find(model, query) @@ -194,8 +223,8 @@ async function deleteChild (model, id, params, resourceName) { params.forEach(attr => { esParams[attr] = record[attr] }) // remove from db - dbHelper.remove(model, record.id) - deleteRecordFromEs(record.id, esParams, resourceName) + await dbHelper.remove(model, record.id, transaction) + await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction) // sleep for configured time await sleep(config.CASCADE_PAUSE_MS) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 0918552..33ea58e 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -30,8 +30,11 @@ const uniqueFields = [['handle']] async function create (entity, auth) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const result = await dbHelper.create(User, entity, auth) - await serviceHelper.createRecordInEs(resource, result.dataValues) + const result = await sequelize.transaction(async (t) => { + const userEntity = await dbHelper.create(User, entity, auth, t) + await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) + return userEntity + }) return result } @@ -56,10 +59,13 @@ create.schema = { async function patch (id, entity, auth, params) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const newEntity = await dbHelper.update(User, id, entity, auth) - await serviceHelper.patchRecordInEs(resource, newEntity.dataValues) + const result = await sequelize.transaction(async (t) => { + const newEntity = await dbHelper.update(User, id, entity, auth, null, t) + await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + return newEntity + }) - return newEntity + return result } patch.schema = { @@ -153,7 +159,7 @@ search.schema = { * @return {Promise} no data returned */ async function remove (id, auth, params) { - beginCascadeDelete(id, params) + await beginCascadeDelete(id, params) } /** @@ -162,13 +168,15 @@ async function remove (id, auth, params) { * @param params the path params */ async function beginCascadeDelete (id, params) { - await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement') - await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile') - await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute') - await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole') - await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill') - await dbHelper.remove(User, id) - await serviceHelper.deleteRecordFromEs(id, params, resource) + await sequelize.transaction(async (t) => { + await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) + await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) + await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) + await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) + await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) + await dbHelper.remove(User, id, null, t) + await serviceHelper.deleteRecordFromEs(id, params, resource, true) + }) } module.exports = {