Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Shapeup4 - CQRS standards update (user object) #103

Merged
merged 21 commits into from
Jul 30, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ workflows:
branches:
only:
- develop
- feature/shapeup4-cqrs-update

# Production builds are exectuted only on tagged commits to the
# master branch.
2 changes: 2 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
@@ -35,6 +35,8 @@ module.exports = {
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',

// topics
UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error',

UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
4 changes: 2 additions & 2 deletions docker-pgsql-es/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -4,14 +4,14 @@ services:
image: "postgres:12.4"
volumes:
- database-data:/var/lib/postgresql/data/
ports:
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_USER: ${DB_USERNAME}
POSTGRES_DB: ${DB_NAME}
esearch:
image: elasticsearch:7.7.1
image: elasticsearch:7.13.4
container_name: ubahn-data-processor-es_es
ports:
- "9200:9200"
16 changes: 10 additions & 6 deletions src/common/db-helper.js
Original file line number Diff line number Diff line change
@@ -105,24 +105,27 @@ async function get (model, pk, params) {
* @param model the sequelize model object
* @param entity entity to create
* @param auth the user auth object
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function create (model, entity, auth) {
async function create (model, entity, auth, transaction) {
if (auth) {
entity.createdBy = helper.getAuthUser(auth)
}
return model.create(entity)
return model.create(entity, { transaction })
}

/**
* delete object by pk
* @param model the sequelize model object
* @param pk the primary key
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function remove (model, pk, params) {
async function remove (model, pk, params, transaction) {
const instance = await get(model, pk, params)
return instance.destroy()
const result = await instance.destroy({ transaction })
return result
}

/**
@@ -132,13 +135,14 @@ async function remove (model, pk, params) {
* @param entity entity to create
* @param auth the auth object
* @param auth the path params
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function update (model, pk, entity, auth, params) {
async function update (model, pk, entity, auth, params, transaction) {
// insure that object exists
const instance = await get(model, pk, params)
entity.updatedBy = helper.getAuthUser(auth)
return instance.update(entity)
return instance.update(entity, { transaction })
}

/**
36 changes: 36 additions & 0 deletions src/common/es-helper.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ const config = require('config')
const _ = require('lodash')
const querystring = require('querystring')
const logger = require('../common/logger')
const helper = require('../common/helper')
const appConst = require('../consts')
const esClient = require('./es-client').getESClient()

@@ -282,6 +283,38 @@ function escapeRegex (str) {
/* eslint-enable no-useless-escape */
}

/**
* Process create entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processCreate (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.index({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
body: entity,
refresh: 'wait_for'
})
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
}

/**
* Process delete entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processDelete (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.delete({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
refresh: 'wait_for'
})
}

async function getOrganizationId (handle) {
const dBHelper = require('../common/db-helper')
const sequelize = require('../models/index')
@@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
}

module.exports = {
processCreate,
processUpdate: processCreate,
processDelete,
searchElasticSearch,
getFromElasticSearch,
searchUsers,
38 changes: 37 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const config = require('config')
const Joi = require('@hapi/joi')
const querystring = require('querystring')
const errors = require('./errors')
const appConst = require('../consts')
@@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

/**
* Function to valid require keys
* @param {Object} payload validated object
* @param {Array} keys required keys
* @throws {Error} if required key absent
*/
function validProperties (payload, keys) {
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
const error = schema.validate(payload).error
if (error) {
throw error
}
}

/**
* get auth user handle or id
* @param authUser the user
@@ -145,12 +160,33 @@ async function postEvent (topic, payload) {
await busApiClient.postEvent(message)
}

/**
* Send error event to Kafka
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {String} action for which operation error occurred
*/
async function publishError (topic, payload, action) {
_.set(payload, 'apiAction', action)
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
await busApiClient.postEvent(message)
}

module.exports = {
validProperties,
getAuthUser,
permissionCheck,
checkIfExists,
injectSearchMeta,
getControllerMethods,
getSubControllerMethods,
postEvent
postEvent,
publishError
}
56 changes: 46 additions & 10 deletions src/common/service-helper.js
Original file line number Diff line number Diff line change
@@ -38,25 +38,49 @@ const MODEL_TO_RESOURCE = {
* Create record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function createRecordInEs (resource, entity) {
async function createRecordInEs (resource, entity, toEs) {
try {
await publishMessage('create', resource, entity)
if (toEs) {
await esHelper.processCreate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}

if (!toEs) {
try {
await publishMessage("create", resource, entity);
} catch (err) {
logger.logFullError(err);
}
}

}

/**
* Patch record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function patchRecordInEs (resource, entity) {
async function patchRecordInEs (resource, entity, toEs) {
try {
await publishMessage('patch', resource, entity)
if (toEs) {
await esHelper.processUpdate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}
if (!toEs) {
try {
await publishMessage("patch", resource, entity);
} catch (err) {
logger.logFullError(err);
}
}
}

@@ -65,8 +89,9 @@ async function patchRecordInEs (resource, entity) {
* @param id the id of record
* @param params the params of record (like nested ids)
* @param resource the resource to delete
* @param toEs is to es directly
*/
async function deleteRecordFromEs (id, params, resource) {
async function deleteRecordFromEs (id, params, resource, toEs) {
let payload
if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) {
payload = _.assign({}, params)
@@ -76,9 +101,19 @@ async function deleteRecordFromEs (id, params, resource) {
}
}
try {
await publishMessage('remove', resource, payload)
if (toEs) {
await esHelper.processDelete(resource, payload)
}
} catch (err) {
logger.logFullError(err)
throw err
}
if (!toEs) {
try {
await publishMessage("remove", resource, payload);
} catch (err) {
logger.logFullError(err);
}
}
}

@@ -174,13 +209,14 @@ function sleep (ms) {
}

/**
* delete child of record with delay between each item deleted
* delete child of record with delay between each item deleted and with transaction
* @param model the child model to delete
* @param id the user id to delete
* @param params the params for child
* @param resourceName the es recource name
* @param transaction the transaction object
*/
async function deleteChild (model, id, params, resourceName) {
async function deleteChild (model, id, params, resourceName, transaction) {
const query = {}
query[params[0]] = id
const result = await dbHelper.find(model, query)
@@ -194,8 +230,8 @@ async function deleteChild (model, id, params, resourceName) {
params.forEach(attr => { esParams[attr] = record[attr] })

// remove from db
dbHelper.remove(model, record.id)
deleteRecordFromEs(record.id, esParams, resourceName)
await dbHelper.remove(model, record.id, transaction)
await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction)

// sleep for configured time
await sleep(config.CASCADE_PAUSE_MS)
60 changes: 52 additions & 8 deletions src/modules/user/service.js
Original file line number Diff line number Diff line change
@@ -4,8 +4,10 @@

const joi = require('@hapi/joi')
const _ = require('lodash')
const config = require('config')

const errors = require('../../common/errors')
const logger = require('../../common/logger')
const helper = require('../../common/helper')
const dbHelper = require('../../common/db-helper')
const serviceHelper = require('../../common/service-helper')
@@ -30,10 +32,21 @@ const uniqueFields = [['handle']]
async function create (entity, auth) {
await dbHelper.makeSureUnique(User, entity, uniqueFields)

const result = await dbHelper.create(User, entity, auth)
await serviceHelper.createRecordInEs(resource, result.dataValues)

return result
let payload
try {
const result = await sequelize.transaction(async (t) => {
const userEntity = await dbHelper.create(User, entity, auth, t)
payload = userEntity.dataValues
await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true)
return userEntity
})
return result
} catch (e) {
if (payload) {
helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.create')
}
throw e
}
}

create.schema = {
@@ -56,10 +69,22 @@ create.schema = {
async function patch (id, entity, auth, params) {
await dbHelper.makeSureUnique(User, entity, uniqueFields)

const newEntity = await dbHelper.update(User, id, entity, auth)
await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)

return newEntity
let payload
try {
const result = await sequelize.transaction(async (t) => {
const newEntity = await dbHelper.update(User, id, entity, auth, null, t)
payload = newEntity.dataValues
await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true)
return newEntity
})

return result
} catch (e) {
if (payload) {
helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.update')
}
throw e
}
}

patch.schema = {
@@ -169,6 +194,25 @@ async function beginCascadeDelete (id, params) {
await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill')
await dbHelper.remove(User, id)
await serviceHelper.deleteRecordFromEs(id, params, resource)
//TODO: below code is not working, so simply commented our changes
/* //Start here
let payload = {id}
try {
await sequelize.transaction(async (t) => {
await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t)
await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t)
await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t)
await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t)
await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t)
await dbHelper.remove(User, id, null, t)
await serviceHelper.deleteRecordFromEs(id, params, resource, true)
})
} catch (e) {
helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.delete')
throw e
}
*/ // End here
}

module.exports = {