diff --git a/.circleci/config.yml b/.circleci/config.yml
index 78be9fb..a80b21d 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -70,7 +70,7 @@ workflows:
             branches:
               only:
                 - develop
-                - feature/shapeup4-cqrs-update
+                - feature/shapeup4-cqrs-update2
 
       # Production builds are exectuted only on tagged commits to the
       # master branch.
diff --git a/config/default.js b/config/default.js
index eb69492..98f4903 100755
--- a/config/default.js
+++ b/config/default.js
@@ -31,6 +31,8 @@ module.exports = {
 
   BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
 
+  TOPCODER_GROUP_API: process.env.TOPCODER_GROUP_API || 'https://api.topcoder-dev.com/v5/groups',
+
   KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
   KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',
 
@@ -63,6 +65,14 @@ module.exports = {
       password: process.env.ELASTICCLOUD_PASSWORD
     },
 
+    USER_ACHIEVEMENT_PROPERTY_NAME: process.env.USER_ACHIEVEMENT_PROPERTY_NAME || 'achievements',
+    USER_EXTERNALPROFILE_PROPERTY_NAME: process.env.USER_EXTERNALPROFILE_PROPERTY_NAME || 'externalProfiles',
+    USER_ATTRIBUTE_PROPERTY_NAME: process.env.USER_ATTRIBUTE_PROPERTY_NAME || 'attributes',
+    USER_ROLE_PROPERTY_NAME: process.env.USER_ROLE_PROPERTY_NAME || 'roles',
+    USER_SKILL_PROPERTY_NAME: process.env.USER_SKILL_PROPERTY_NAME || 'skills',
+    USER_GROUP_PROPERTY_NAME: process.env.USER_GROUP_PROPERTY_NAME || 'groups',
+
+    ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders',
     // es mapping: _index, _type, _id
     DOCUMENTS: {
       achievementprovider: {
diff --git a/src/common/es-helper.js b/src/common/es-helper.js
index 93c1cfa..6f5a1f4 100644
--- a/src/common/es-helper.js
+++ b/src/common/es-helper.js
@@ -6,6 +6,12 @@ const helper = require('../common/helper')
 const appConst = require('../consts')
 const esClient = require('./es-client').getESClient()
 
+const {
+  TopResources,
+  UserResources,
+  OrganizationResources
+} = appConst
+
 const DOCUMENTS = config.ES.DOCUMENTS
 const RESOURCES = Object.keys(DOCUMENTS)
 
@@ -283,21 +289,213 @@ function escapeRegex (str) {
   /* eslint-enable no-useless-escape */
 }
 
+/**
+ * Function to get user from es
+ * @param {String} userId
+ * @returns {Object} user
+ */
+async function getUser (userId) {
+  const { body: user } = await esClient.get({ index: TopResources.user.index, type: TopResources.user.type, id: userId })
+  return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
+}
+
+/**
+ * Function to update es user
+ * @param {String} userId
+ * @param {Number} seqNo
+ * @param {Number} primaryTerm
+ * @param {Object} body
+ */
+async function updateUser (userId, body, seqNo, primaryTerm) {
+  try {
+    await esClient.index({
+      index: TopResources.user.index,
+      type: TopResources.user.type,
+      id: userId,
+      body,
+      if_seq_no: seqNo,
+      if_primary_term: primaryTerm,
+      pipeline: TopResources.user.ingest.pipeline.id,
+      refresh: 'wait_for'
+    })
+    logger.debug('Update user completed')
+  } catch (err) {
+    if (err && err.meta && err.meta.body && err.meta.body.error) {
+      logger.debug(JSON.stringify(err.meta.body.error, null, 4))
+    }
+    logger.debug(JSON.stringify(err))
+    throw err
+  }
+}
+
+/**
+ * Function to get org from es
+ * @param {String} organizationId
+ * @returns {Object} organization
+ */
+async function getOrg (organizationId) {
+  const { body: org } = await esClient.get({ index: TopResources.organization.index, type: TopResources.organization.type, id: organizationId })
+  return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
+}
+
+/**
+ * Function to update es organization
+ * @param {String} organizationId
+ * @param {Number} seqNo
+ * @param {Number} primaryTerm
+ * @param {Object} body
+ */
+async function updateOrg (organizationId, body, seqNo, primaryTerm) {
+  await esClient.index({
+    index: TopResources.organization.index,
+    type: TopResources.organization.type,
+    id: organizationId,
+    body,
+    if_seq_no: seqNo,
+    if_primary_term: primaryTerm,
+    refresh: 'wait_for'
+  })
+  await esClient.enrich.executePolicy({ name: TopResources.organization.enrich.policyName })
+}
+
 /**
  * Process create entity
  * @param {String} resource resource name
  * @param {Object} entity entity object
  */
 async function processCreate (resource, entity) {
-  helper.validProperties(entity, ['id'])
-  await esClient.index({
-    index: DOCUMENTS[resource].index,
-    type: DOCUMENTS[resource].type,
-    id: entity.id,
-    body: entity,
-    refresh: 'true'
-  })
-  logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
+  if (_.includes(_.keys(TopResources), resource)) {
+    // process the top resources such as user, skill...
+    helper.validProperties(entity, ['id'])
+    await esClient.index({
+      index: TopResources[resource].index,
+      type: TopResources[resource].type,
+      id: entity.id,
+      body: _.omit(entity, ['resource', 'originalTopic']),
+      pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
+      refresh: 'true'
+    })
+    if (TopResources[resource].enrich) {
+      await esClient.enrich.executePolicy({
+        name: TopResources[resource].enrich.policyName
+      })
+    }
+  } else if (_.includes(_.keys(UserResources), resource)) {
+    // process user resources such as userSkill, userAttribute...
+    const userResource = UserResources[resource]
+    userResource.validate(entity)
+    const { seqNo, primaryTerm, user } = await getUser(entity.userId)
+    const relateId = entity[userResource.relateKey]
+    if (!user[userResource.propertyName]) {
+      user[userResource.propertyName] = []
+    }
+
+    // import groups for a new user
+    if (resource === 'externalprofile' && entity.externalId) {
+      const userGroups = await helper.getUserGroup(entity.externalId)
+      user[config.get('ES.USER_GROUP_PROPERTY_NAME')] = _.unionBy(user[config.get('ES.USER_GROUP_PROPERTY_NAME')], userGroups, 'id')
+    }
+
+    // check the resource does not exist
+    if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
+      logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId}`)
+      throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
+    } else {
+      user[userResource.propertyName].push(entity)
+      await updateUser(entity.userId, user, seqNo, primaryTerm)
+    }
+  } else if (_.includes(_.keys(OrganizationResources), resource)) {
+    // process org resources such as org skill provider
+    const orgResources = OrganizationResources[resource]
+    orgResources.validate(entity)
+    const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
+    const relateId = entity[orgResources.relateKey]
+    if (!org[orgResources.propertyName]) {
+      org[orgResources.propertyName] = []
+    }
+
+    // check the resource does not exist
+    if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) {
+      logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${entity.organizationId}`)
+      throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
+    } else {
+      org[orgResources.propertyName].push(entity)
+      await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
+    }
+  } else {
+    logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
+  }
+}
+
+/**
+ * Process update entity
+ * @param {String} resource resource name
+ * @param {Object} entity entity object
+ */
+async function processUpdate (resource, entity) {
+  if (_.includes(_.keys(TopResources), resource)) {
+    logger.info(`Processing top level resource: ${resource}`)
+    // process the top resources such as user, skill...
+    helper.validProperties(entity, ['id'])
+    const { index, type } = TopResources[resource]
+    const id = entity.id
+    const { body: source } = await esClient.get({ index, type, id })
+    await esClient.index({
+      index,
+      type,
+      id,
+      body: _.assign(source._source, _.omit(entity, ['resource', 'originalTopic'])),
+      pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
+      if_seq_no: source._seq_no,
+      if_primary_term: source._primary_term,
+      refresh: 'true'
+    })
+    if (TopResources[resource].enrich) {
+      await esClient.enrich.executePolicy({
+        name: TopResources[resource].enrich.policyName
+      })
+    }
+  } else if (_.includes(_.keys(UserResources), resource)) {
+    // process user resources such as userSkill, userAttribute...
+    const userResource = UserResources[resource]
+    const relateId = entity[userResource.relateKey]
+    logger.info(`Processing user level resource: ${resource}:${relateId}`)
+    userResource.validate(entity)
+    logger.info(`Resource validated for ${relateId}`)
+    const { seqNo, primaryTerm, user } = await getUser(entity.userId)
+    logger.info(`User fetched ${user.id} and ${relateId}`)
+
+    // check the resource exist
+    if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
+      logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
+      throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
+    } else {
+      const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
+      user[userResource.propertyName].splice(updateIndex, 1, entity)
+      logger.info(`Updating ${user.id} and ${relateId}`)
+      await updateUser(entity.userId, user, seqNo, primaryTerm)
+      logger.info(`Updated ${user.id} and ${relateId}`)
+    }
+  } else if (_.includes(_.keys(OrganizationResources), resource)) {
+    logger.info(`Processing org level resource: ${resource}`)
+    // process org resources such as org skill providers
+    const orgResource = OrganizationResources[resource]
+    orgResource.validate(entity)
+    const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
+    const relateId = entity[orgResource.relateKey]
+
+    // check the resource exist
+    if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
+      logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
+      throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
+    } else {
+      const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
+      org[orgResource.propertyName].splice(updateIndex, 1, entity)
+      await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
+    }
+  } else {
+    logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
+  }
 }
 
 /**
@@ -306,13 +504,53 @@ async function processCreate (resource, entity) {
  * @param {Object} entity entity object
  */
 async function processDelete (resource, entity) {
-  helper.validProperties(entity, ['id'])
-  await esClient.delete({
-    index: DOCUMENTS[resource].index,
-    type: DOCUMENTS[resource].type,
-    id: entity.id,
-    refresh: 'wait_for'
-  })
+  if (_.includes(_.keys(TopResources), resource)) {
+    // process the top resources such as user, skill...
+    helper.validProperties(entity, ['id'])
+    await esClient.delete({
+      index: TopResources[resource].index,
+      type: TopResources[resource].type,
+      id: entity.id,
+      refresh: 'wait_for'
+    })
+    if (TopResources[resource].enrich) {
+      await esClient.enrich.executePolicy({
+        name: TopResources[resource].enrich.policyName
+      })
+    }
+  } else if (_.includes(_.keys(UserResources), resource)) {
+    // process user resources such as userSkill, userAttribute...
+    const userResource = UserResources[resource]
+    userResource.validate(entity)
+    const { seqNo, primaryTerm, user } = await getUser(entity.userId)
+    const relateId = entity[userResource.relateKey]
+
+    // check the resource exist
+    if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
+      logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
+      throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
+    } else {
+      _.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
+      await updateUser(entity.userId, user, seqNo, primaryTerm)
+    }
+  } else if (_.includes(_.keys(OrganizationResources), resource)) {
+    // process user resources such as org skill provider
+    const orgResource = OrganizationResources[resource]
+    orgResource.validate(entity)
+    const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
+    const relateId = entity[orgResource.relateKey]
+
+    // check the resource exist
+    if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
+      logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
+      throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
+    } else {
+      _.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
+      await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
+    }
+  } else {
+    logger.info(`Ignore this message since resource is not in [${_.union(_.keys(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
+  }
 }
 
 async function getOrganizationId (handle) {
@@ -1487,7 +1725,7 @@ async function searchAchievementValues ({ organizationId, keyword }) {
 
 module.exports = {
   processCreate,
-  processUpdate: processCreate,
+  processUpdate,
   processDelete,
   searchElasticSearch,
   getFromElasticSearch,
diff --git a/src/common/helper.js b/src/common/helper.js
index b68f3ee..ea03f67 100644
--- a/src/common/helper.js
+++ b/src/common/helper.js
@@ -3,12 +3,15 @@ const Joi = require('@hapi/joi')
 const querystring = require('querystring')
 const errors = require('./errors')
 const appConst = require('../consts')
+const axios = require('axios')
+const m2mAuth = require('tc-core-library-js').auth.m2m
 const _ = require('lodash')
 const { getControllerMethods, getSubControllerMethods } = require('./controller-helper')
 const logger = require('./logger')
 const busApi = require('tc-bus-api-wrapper')
 const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
   'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
+const topcoderM2M = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
 
 /**
  * Function to valid require keys
@@ -32,6 +35,34 @@ function getAuthUser (authUser) {
   return authUser.handle || authUser.sub
 }
 
+/* Function to get M2M token
+ * (Topcoder APIs only)
+ * @returns {Promise}
+ */
+async function getTopcoderM2Mtoken () {
+  return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
+}
+
+/**
+ * Returns the user in Topcoder identified by the email
+ * @param {String} email The user email
+ */
+async function getUserGroup (memberId) {
+  const url = config.TOPCODER_GROUP_API
+  const token = await getTopcoderM2Mtoken()
+  const params = { memberId, membershipType: 'user', page: 1 }
+
+  logger.debug(`request GET ${url} with params: ${JSON.stringify(params)}`)
+  let groups = []
+  let groupRes = await axios.get(url, { headers: { Authorization: `Bearer ${token}` }, params })
+  while (groupRes.data.length > 0) {
+    groups = _.concat(groups, _.map(groupRes.data, g => _.pick(g, 'id', 'name')))
+    params.page = params.page + 1
+    groupRes = await axios.get(url, { headers: { Authorization: `Bearer ${token}` }, params })
+  }
+  return groups
+}
+
 /**
  * Checks if the source matches the term.
  *
@@ -166,7 +197,7 @@ async function postEvent (topic, payload) {
  * @params {Object} payload the payload
  * @params {String} action for which operation error occurred
  */
- async function publishError (topic, payload, action) {
+async function publishError (topic, payload, action) {
   _.set(payload, 'apiAction', action)
   const message = {
     topic,
@@ -179,14 +210,28 @@ async function postEvent (topic, payload) {
   await busApiClient.postEvent(message)
 }
 
+/**
+ * Fuction to get an Error with statusCode property
+ * @param {String} message error message
+ * @param {Number} statusCode
+ * @returns {Error} an Error with statusCode property
+ */
+function getErrorWithStatus (message, statusCode) {
+  const error = Error(message)
+  error.statusCode = statusCode
+  return error
+}
+
 module.exports = {
   validProperties,
   getAuthUser,
+  getUserGroup,
   permissionCheck,
   checkIfExists,
   injectSearchMeta,
   getControllerMethods,
   getSubControllerMethods,
   postEvent,
-  publishError
+  publishError,
+  getErrorWithStatus
 }
diff --git a/src/common/service-helper.js b/src/common/service-helper.js
index b47ae48..4410ddc 100644
--- a/src/common/service-helper.js
+++ b/src/common/service-helper.js
@@ -38,13 +38,10 @@ const MODEL_TO_RESOURCE = {
  * Create record in es
  * @param resource the resource to create
  * @param result the resource fields
- * @param toEs is to es directly
  */
-async function createRecordInEs (resource, entity, toEs) {
+async function createRecordInEs (resource, entity) {
   try {
-    if (toEs) {
-      await esHelper.processCreate(resource, entity)
-    }
+    await esHelper.processCreate(resource, entity)
   } catch (err) {
     logger.logFullError(err)
     throw err
@@ -52,24 +49,20 @@ async function createRecordInEs (resource, entity, toEs) {
 
   // publish create event.
   try {
-    await publishMessage("create", resource, entity);
+    await publishMessage('create', resource, entity)
   } catch (err) {
-    logger.logFullError(err);
+    logger.logFullError(err)
   }
-
 }
 
 /**
  * Patch record in es
  * @param resource the resource to create
  * @param result the resource fields
- * @param toEs is to es directly
  */
-async function patchRecordInEs (resource, entity, toEs) {
+async function patchRecordInEs (resource, entity) {
   try {
-    if (toEs) {
-      await esHelper.processUpdate(resource, entity)
-    }
+    await esHelper.processUpdate(resource, entity)
   } catch (err) {
     logger.logFullError(err)
     throw err
@@ -77,9 +70,9 @@ async function patchRecordInEs (resource, entity, toEs) {
 
   // publish patch event.
   try {
-    await publishMessage("patch", resource, entity);
+    await publishMessage('patch', resource, entity)
   } catch (err) {
-    logger.logFullError(err);
+    logger.logFullError(err)
   }
 }
 
@@ -88,9 +81,8 @@ async function patchRecordInEs (resource, entity, toEs) {
  * @param id the id of record
  * @param params the params of record (like nested ids)
  * @param resource the resource to delete
- * @param toEs is to es directly
  */
-async function deleteRecordFromEs (id, params, resource, toEs) {
+async function deleteRecordFromEs (id, params, resource) {
   let payload
   if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) {
     payload = _.assign({}, params)
@@ -100,19 +92,16 @@ async function deleteRecordFromEs (id, params, resource, toEs) {
     }
   }
   try {
-    if (toEs) {
-      await esHelper.processDelete(resource, payload)
-    }
+    await esHelper.processDelete(resource, payload)
   } catch (err) {
     logger.logFullError(err)
     throw err
   }
-  if (!toEs) {
-    try {
-      await publishMessage("remove", resource, payload);
-    } catch (err) {
-      logger.logFullError(err);
-    }
+
+  try {
+    await publishMessage('remove', resource, payload)
+  } catch (err) {
+    logger.logFullError(err)
   }
 }
 
@@ -230,7 +219,7 @@ async function deleteChild (model, id, params, resourceName, transaction) {
 
       // remove from db
       await dbHelper.remove(model, record.id, transaction)
-      await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction)
+      await deleteRecordFromEs(record.id, esParams, resourceName)
 
       // sleep for configured time
       await sleep(config.CASCADE_PAUSE_MS)
diff --git a/src/consts.js b/src/consts.js
index 18835a2..8b28094 100644
--- a/src/consts.js
+++ b/src/consts.js
@@ -1,3 +1,24 @@
+const config = require('config')
+const _ = require('lodash')
+const Joi = require('@hapi/joi')
+
+/**
+ * because of circular dependency with `common/helper.js`, so put the code here
+ * Function to valid require keys
+ * @param {Object} payload validated object
+ * @param {Array} keys required keys
+ * @throws {Error} if required key absent
+ */
+function validProperties (payload, keys) {
+  const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
+  const error = schema.validate(payload).error
+  if (error) {
+    throw error
+  }
+}
+
+
+
 /**
  * roles that used in service, all roles must match topcoder roles
  * Admin and Administrator are both admin user
@@ -21,13 +42,196 @@ const AllAuthenticatedUsers = [
   UserRoles.ubahn
 ]
 
+
 /**
  * all admin user
  */
 const AdminUser = [UserRoles.admin, UserRoles.administrator]
 
+/**
+ * es config TopResources
+ */
+const TopResources = {
+  achievementprovider: {
+    index: config.get('ES.DOCUMENTS.achievementprovider.index'),
+    type: config.get('ES.DOCUMENTS.achievementprovider.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.achievementprovider.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy']
+    }
+  },
+  attribute: {
+    index: config.get('ES.DOCUMENTS.attribute.index'),
+    type: config.get('ES.DOCUMENTS.attribute.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.attribute.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'attributeGroupId', 'created', 'updated', 'createdBy', 'updatedBy', 'attributegroup']
+    },
+    ingest: {
+      pipeline: {
+        id: config.get('ES.DOCUMENTS.attributegroup.pipelineId')
+      }
+    }
+  },
+  attributegroup: {
+    index: config.get('ES.DOCUMENTS.attributegroup.index'),
+    type: config.get('ES.DOCUMENTS.attributegroup.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.attributegroup.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'organizationId', 'created', 'updated', 'createdBy', 'updatedBy']
+    },
+    pipeline: {
+      id: config.get('ES.DOCUMENTS.attributegroup.pipelineId'),
+      field: 'attributeGroupId',
+      targetField: 'attributegroup',
+      maxMatches: '1'
+    }
+  },
+  organization: {
+    index: config.get('ES.DOCUMENTS.organization.index'),
+    type: config.get('ES.DOCUMENTS.organization.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.organization.enrichPolicyName')
+    }
+  },
+  role: {
+    index: config.get('ES.DOCUMENTS.role.index'),
+    type: config.get('ES.DOCUMENTS.role.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.role.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy']
+    }
+  },
+  skill: {
+    index: config.get('ES.DOCUMENTS.skill.index'),
+    type: config.get('ES.DOCUMENTS.skill.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.skill.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'skillProviderId', 'name', 'externalId', 'uri', 'created', 'updated', 'createdBy', 'updatedBy', 'skillprovider']
+    },
+    ingest: {
+      pipeline: {
+        id: config.get('ES.DOCUMENTS.skillprovider.pipelineId')
+      }
+    }
+  },
+  skillprovider: {
+    index: config.get('ES.DOCUMENTS.skillprovider.index'),
+    type: config.get('ES.DOCUMENTS.skillprovider.type'),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.skillprovider.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy']
+    },
+    pipeline: {
+      id: config.get('ES.DOCUMENTS.skillprovider.pipelineId'),
+      field: 'skillProviderId',
+      targetField: 'skillprovider',
+      maxMatches: '1'
+    }
+  },
+  user: {
+    index: config.get('ES.DOCUMENTS.user.index'),
+    type: config.get('ES.DOCUMENTS.user.type'),
+    ingest: {
+      pipeline: {
+        id: config.get('ES.DOCUMENTS.user.pipelineId')
+      }
+    },
+    pipeline: {
+      id: config.get('ES.DOCUMENTS.user.pipelineId'),
+      processors: [
+        {
+          referenceField: config.get('ES.DOCUMENTS.achievement.userField'),
+          enrichPolicyName: config.get('ES.DOCUMENTS.achievementprovider.enrichPolicyName'),
+          field: '_ingest._value.achievementsProviderId',
+          targetField: '_ingest._value.achievementprovider',
+          maxMatches: '1'
+        },
+        {
+          referenceField: config.get('ES.DOCUMENTS.userattribute.userField'),
+          enrichPolicyName: config.get('ES.DOCUMENTS.attribute.enrichPolicyName'),
+          field: '_ingest._value.attributeId',
+          targetField: '_ingest._value.attribute',
+          maxMatches: '1'
+        },
+        {
+          referenceField: config.get('ES.DOCUMENTS.userrole.userField'),
+          enrichPolicyName: config.get('ES.DOCUMENTS.role.enrichPolicyName'),
+          field: '_ingest._value.roleId',
+          targetField: '_ingest._value.role',
+          maxMatches: '1'
+        },
+        {
+          referenceField: config.get('ES.DOCUMENTS.userskill.userField'),
+          enrichPolicyName: config.get('ES.DOCUMENTS.skill.enrichPolicyName'),
+          field: '_ingest._value.skillId',
+          targetField: '_ingest._value.skill',
+          maxMatches: '1'
+        }
+      ]
+    }
+  }
+}
+
+/**
+ * es config UserResources
+ */
+const UserResources = {
+  achievement: {
+    propertyName: config.get('ES.USER_ACHIEVEMENT_PROPERTY_NAME'),
+    relateKey: 'achievementsProviderId',
+    validate: payload => validProperties(payload, ['userId', 'achievementsProviderId'])
+  },
+  externalprofile: {
+    propertyName: config.get('ES.USER_EXTERNALPROFILE_PROPERTY_NAME'),
+    relateKey: 'organizationId',
+    validate: payload => validProperties(payload, ['userId', 'organizationId'])
+  },
+  userattribute: {
+    propertyName: config.get('ES.USER_ATTRIBUTE_PROPERTY_NAME'),
+    relateKey: 'attributeId',
+    validate: payload => validProperties(payload, ['userId', 'attributeId']),
+    isNested: true // For ES index creation
+  },
+  userrole: {
+    propertyName: config.get('ES.USER_ROLE_PROPERTY_NAME'),
+    relateKey: 'roleId',
+    validate: payload => validProperties(payload, ['userId', 'roleId'])
+  },
+  userskill: {
+    propertyName: config.get('ES.USER_SKILL_PROPERTY_NAME'),
+    relateKey: 'skillId',
+    validate: payload => validProperties(payload, ['userId', 'skillId'])
+  }
+}
+
+/**
+ * es config OrganizationResources
+ */
+const OrganizationResources = {
+  organizationskillprovider: {
+    propertyName: config.get('ES.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME'),
+    relateKey: 'skillProviderId',
+    validate: payload => validProperties(payload, ['organizationId', 'skillProviderId']),
+    enrich: {
+      policyName: config.get('ES.DOCUMENTS.organization.enrichPolicyName'),
+      matchField: 'id',
+      enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy', 'skillProviders']
+    }
+  }
+}
+
 module.exports = {
   UserRoles,
   AllAuthenticatedUsers,
-  AdminUser
+  AdminUser,
+  TopResources,
+  UserResources,
+  OrganizationResources
 }
diff --git a/src/modules/achievement/service.js b/src/modules/achievement/service.js
index a4276eb..60f3b7c 100644
--- a/src/modules/achievement/service.js
+++ b/src/modules/achievement/service.js
@@ -2,6 +2,7 @@
  * the achievement services
  */
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -28,10 +29,20 @@ async function create (entity, auth) {
 
   await dbHelper.makeSureUnique(Achievement, entity, uniqueFields)
 
-  const result = await dbHelper.create(Achievement, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(Achievement, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'achievement.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -60,10 +71,20 @@ async function patch (id, entity, auth, params) {
 
   await dbHelper.makeSureUnique(Achievement, entity, uniqueFields, params)
 
-  const newEntity = await dbHelper.update(Achievement, id, entity, auth, params)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(Achievement, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'achievement.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -154,8 +175,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(Achievement, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(Achievement, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'achievement.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/achievementsProvider/service.js b/src/modules/achievementsProvider/service.js
index b6331e1..524fc02 100644
--- a/src/modules/achievementsProvider/service.js
+++ b/src/modules/achievementsProvider/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -25,10 +26,20 @@ const uniqueFields = [['name']]
 async function create (entity, auth) {
   await dbHelper.makeSureUnique(AchievementsProvider, entity, uniqueFields)
 
-  const result = await dbHelper.create(AchievementsProvider, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(AchievementsProvider, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'achievementprovider.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -49,10 +60,20 @@ create.schema = {
 async function patch (id, entity, auth, params) {
   await dbHelper.makeSureUnique(AchievementsProvider, entity, uniqueFields)
 
-  const newEntity = await dbHelper.update(AchievementsProvider, id, entity, auth)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(AchievementsProvider, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'achievementprovider.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -130,8 +151,17 @@ async function remove (id, auth, params) {
   if (existing.length > 0) {
     throw errors.deleteConflictError(`Please delete ${Achievement.name} with ids ${existing.map(o => o.id)}`)
   }
-  await dbHelper.remove(AchievementsProvider, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(AchievementsProvider, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'achievementprovider.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/attribute/service.js b/src/modules/attribute/service.js
index f074025..e617e48 100644
--- a/src/modules/attribute/service.js
+++ b/src/modules/attribute/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -27,9 +28,20 @@ async function create (entity, auth) {
   await dbHelper.get(AttributeGroup, entity.attributeGroupId)
   await dbHelper.makeSureUnique(Attribute, entity, uniqueFields)
 
-  const result = await dbHelper.create(Attribute, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(Attribute, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'attribute.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -54,9 +66,20 @@ async function patch (id, entity, auth, params) {
   }
   await dbHelper.makeSureUnique(Attribute, entity, uniqueFields)
 
-  const newEntity = await dbHelper.update(Attribute, id, entity, auth)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(Attribute, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'attribute.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -136,8 +159,17 @@ async function remove (id, auth, params) {
   if (existing.length > 0) {
     throw errors.deleteConflictError(`Please delete ${UserAttribute.name} with ids ${existing.map(o => o.id)}`)
   }
-  await dbHelper.remove(Attribute, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(Attribute, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'attribute.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/attributeGroup/service.js b/src/modules/attributeGroup/service.js
index c9feb13..7db4699 100644
--- a/src/modules/attributeGroup/service.js
+++ b/src/modules/attributeGroup/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -27,9 +28,20 @@ async function create (entity, auth) {
   await dbHelper.get(Organization, entity.organizationId)
   await dbHelper.makeSureUnique(AttributeGroup, entity, uniqueFields)
 
-  const result = await dbHelper.create(AttributeGroup, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(AttributeGroup, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'attributegroup.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -54,9 +66,20 @@ async function patch (id, entity, auth, params) {
   }
   await dbHelper.makeSureUnique(AttributeGroup, entity, uniqueFields)
 
-  const newEntity = await dbHelper.update(AttributeGroup, id, entity, auth)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(AttributeGroup, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'attributegroup.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -136,8 +159,16 @@ async function remove (id, auth, params) {
   if (existing.length > 0) {
     throw errors.deleteConflictError(`Please delete ${Attribute.name} with ids ${existing.map(o => o.id)}`)
   }
-  await dbHelper.remove(AttributeGroup, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(AttributeGroup, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'attributegroup.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/externalProfile/service.js b/src/modules/externalProfile/service.js
index 07e626f..71b8660 100644
--- a/src/modules/externalProfile/service.js
+++ b/src/modules/externalProfile/service.js
@@ -4,6 +4,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -29,10 +30,20 @@ async function create (entity, auth) {
   await dbHelper.get(User, entity.userId)
   await dbHelper.makeSureUnique(ExternalProfile, entity, uniqueFields)
 
-  const result = await dbHelper.create(ExternalProfile, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(ExternalProfile, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'externalprofile.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -60,10 +71,20 @@ async function patch (id, entity, auth, params) {
 
   await dbHelper.makeSureUnique(ExternalProfile, entity, uniqueFields, params)
 
-  const newEntity = await dbHelper.update(ExternalProfile, id, entity, auth, params)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(ExternalProfile, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'externalprofile.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -157,8 +178,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(ExternalProfile, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(ExternalProfile, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'externalprofile.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/organization/service.js b/src/modules/organization/service.js
index 79a5d03..9155267 100644
--- a/src/modules/organization/service.js
+++ b/src/modules/organization/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -26,10 +27,21 @@ const uniqueFields = [['name']]
  */
 async function create (entity, auth) {
   await dbHelper.makeSureUnique(Organization, entity, uniqueFields)
-  const result = await dbHelper.create(Organization, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
 
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(Organization, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'organization.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -49,10 +61,20 @@ create.schema = {
  */
 async function patch (id, entity, auth, params) {
   await dbHelper.makeSureUnique(Organization, entity, uniqueFields)
-  const newEntity = await dbHelper.update(Organization, id, entity, auth)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(Organization, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'organization.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -135,7 +157,7 @@ async function remove (id, auth, params) {
     throw errors.deleteConflictError(`Please delete ${ExternalProfile.name} with ids ${existing.map(o => o.id)}`)
   }
 
-  beginCascadeDelete(id, params)
+  await beginCascadeDelete(id, params)
 }
 
 /**
@@ -144,9 +166,17 @@ async function remove (id, auth, params) {
  * @param {*} id the path params
  */
 async function beginCascadeDelete (id, params) {
-  await serviceHelper.deleteChild(OrganizationSkillsProvider, id, ['organizationId', 'skillProviderId'], 'OrganizationSkillsProvider')
-  await dbHelper.remove(Organization, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await serviceHelper.deleteChild(OrganizationSkillsProvider, id, ['organizationId', 'skillProviderId'], 'organizationskillprovider', t)
+      await dbHelper.remove(Organization, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'organization.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/organizationSkillsProvider/service.js b/src/modules/organizationSkillsProvider/service.js
index 6525f73..81ed575 100644
--- a/src/modules/organizationSkillsProvider/service.js
+++ b/src/modules/organizationSkillsProvider/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -28,10 +29,20 @@ async function create (entity, auth) {
   await dbHelper.get(SkillsProvider, entity.skillProviderId)
   await dbHelper.makeSureUnique(OrganizationSkillsProvider, entity, uniqueFields)
 
-  const result = await dbHelper.create(OrganizationSkillsProvider, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(OrganizationSkillsProvider, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'organizationskillprovider.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -108,8 +119,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(OrganizationSkillsProvider, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(OrganizationSkillsProvider, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'organizationskillprovider.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/role/service.js b/src/modules/role/service.js
index 2ec4ae7..42f46de 100644
--- a/src/modules/role/service.js
+++ b/src/modules/role/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -25,10 +26,20 @@ const uniqueFields = [['name']]
 async function create (entity, auth) {
   await dbHelper.makeSureUnique(Role, entity, uniqueFields)
 
-  const result = await dbHelper.create(Role, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(Role, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'role.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -49,10 +60,20 @@ create.schema = {
 async function patch (id, entity, auth, params) {
   await dbHelper.makeSureUnique(Role, entity, uniqueFields)
 
-  const newEntity = await dbHelper.update(Role, id, entity, auth)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(Role, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'role.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -130,8 +151,17 @@ async function remove (id, auth, params) {
   if (existing.length > 0) {
     throw errors.deleteConflictError(`Please delete ${UsersRole.name} with ids ${existing.map(o => o.id)}`)
   }
-  await dbHelper.remove(Role, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(Role, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'role.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/user/service.js b/src/modules/user/service.js
index 953f92c..bcd7b32 100644
--- a/src/modules/user/service.js
+++ b/src/modules/user/service.js
@@ -7,7 +7,6 @@ const _ = require('lodash')
 const config = require('config')
 
 const errors = require('../../common/errors')
-const logger = require('../../common/logger')
 const helper = require('../../common/helper')
 const dbHelper = require('../../common/db-helper')
 const serviceHelper = require('../../common/service-helper')
@@ -32,18 +31,17 @@ const uniqueFields = [['handle']]
 async function create (entity, auth) {
   await dbHelper.makeSureUnique(User, entity, uniqueFields)
 
-  let payload
+  let newEntity
   try {
-    const result = await sequelize.transaction(async (t) => {
-      const userEntity = await dbHelper.create(User, entity, auth, t)
-      payload = userEntity.dataValues
-      await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true)
-      return userEntity
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(User, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
     })
-    return result
+    return newEntity
   } catch (e) {
-    if (payload) {
-      helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.create')
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'user.create')
     }
     throw e
   }
@@ -69,19 +67,18 @@ create.schema = {
 async function patch (id, entity, auth, params) {
   await dbHelper.makeSureUnique(User, entity, uniqueFields)
 
-  let payload
+  let newEntity
   try {
-    const result = await sequelize.transaction(async (t) => {
-      const newEntity = await dbHelper.update(User, id, entity, auth, null, t)
-      payload = newEntity.dataValues
-      await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true)
-      return newEntity
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(User, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
     })
 
-    return result
+    return newEntity
   } catch (e) {
-    if (payload) {
-      helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.update')
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'user.update')
     }
     throw e
   }
@@ -187,32 +184,21 @@ async function remove (id, auth, params) {
  * @param params the path params
  */
 async function beginCascadeDelete (id, params) {
-  await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement')
-  await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile')
-  await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute')
-  await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole')
-  await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill')
-  await dbHelper.remove(User, id)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
-  //TODO: below code is not working, so simply commented our changes
-  /* //Start here
-  let payload = {id}
+  const payload = { id }
   try {
     await sequelize.transaction(async (t) => {
-      await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t)
-      await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t)
-      await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t)
-      await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t)
-      await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t)
+      await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'achievement', t)
+      await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'externalprofile', t)
+      await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'userattribute', t)
+      await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'userrole', t)
+      await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'userskill', t)
       await dbHelper.remove(User, id, null, t)
       await serviceHelper.deleteRecordFromEs(id, params, resource, true)
     })
-
   } catch (e) {
     helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.delete')
     throw e
   }
-  */ // End here 
 }
 
 module.exports = {
diff --git a/src/modules/usersAttribute/service.js b/src/modules/usersAttribute/service.js
index eed3bee..5e0b42c 100644
--- a/src/modules/usersAttribute/service.js
+++ b/src/modules/usersAttribute/service.js
@@ -2,6 +2,7 @@
  * the user attribute services
  */
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -28,10 +29,20 @@ async function create (entity, auth) {
   await dbHelper.get(User, entity.userId)
   await dbHelper.makeSureUnique(UserAttribute, entity, uniqueFields)
 
-  const result = await dbHelper.create(UserAttribute, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(UserAttribute, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'userattribute.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -61,10 +72,20 @@ async function patch (id, entity, auth, params) {
 
   await dbHelper.makeSureUnique(UserAttribute, entity, uniqueFields, params)
 
-  const newEntity = await dbHelper.update(UserAttribute, id, entity, auth, params)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(UserAttribute, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'userattribute.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -169,8 +190,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(UserAttribute, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(UserAttribute, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'userattribute.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/usersRole/service.js b/src/modules/usersRole/service.js
index b21586e..452d816 100644
--- a/src/modules/usersRole/service.js
+++ b/src/modules/usersRole/service.js
@@ -3,6 +3,7 @@
  */
 
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -28,10 +29,20 @@ async function create (entity, auth) {
   await dbHelper.get(User, entity.userId)
   await dbHelper.makeSureUnique(UsersRole, entity, uniqueFields)
 
-  const result = await dbHelper.create(UsersRole, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(UsersRole, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'userrole.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -109,8 +120,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(UsersRole, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(UsersRole, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'userrole.delete')
+    throw e
+  }
 }
 
 module.exports = {
diff --git a/src/modules/usersSkill/service.js b/src/modules/usersSkill/service.js
index 178a0bd..2c538f1 100644
--- a/src/modules/usersSkill/service.js
+++ b/src/modules/usersSkill/service.js
@@ -2,6 +2,7 @@
  * the users skill services
  */
 const joi = require('@hapi/joi')
+const config = require('config')
 const _ = require('lodash')
 
 const errors = require('../../common/errors')
@@ -27,10 +28,20 @@ async function create (entity, auth) {
   await dbHelper.get(User, entity.userId)
   await dbHelper.makeSureUnique(UsersSkill, entity, uniqueFields)
 
-  const result = await dbHelper.create(UsersSkill, entity, auth)
-  await serviceHelper.createRecordInEs(resource, result.dataValues)
-
-  return result
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.create(UsersSkill, entity, auth, t)
+      newEntity = result.toJSON()
+      await serviceHelper.createRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'userskill.create')
+    }
+    throw e
+  }
 }
 
 create.schema = {
@@ -62,10 +73,20 @@ async function patch (id, entity, auth, params) {
 
   await dbHelper.makeSureUnique(UsersSkill, entity, uniqueFields, params)
 
-  const newEntity = await dbHelper.update(UsersSkill, id, entity, auth, params)
-  await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
-
-  return newEntity
+  let newEntity
+  try {
+    await sequelize.transaction(async (t) => {
+      const result = await dbHelper.update(UsersSkill, id, entity, auth, params, t)
+      newEntity = result.toJSON()
+      await serviceHelper.patchRecordInEs(resource, newEntity)
+    })
+    return newEntity
+  } catch (e) {
+    if (newEntity) {
+      helper.publishError(config.UBAHN_ERROR_TOPIC, newEntity, 'userskill.update')
+    }
+    throw e
+  }
 }
 
 patch.schema = {
@@ -157,8 +178,16 @@ search.schema = {
  * @return {Promise<void>} no data returned
  */
 async function remove (id, auth, params) {
-  await dbHelper.remove(UsersSkill, id, params)
-  await serviceHelper.deleteRecordFromEs(id, params, resource)
+  const entity = { id }
+  try {
+    await sequelize.transaction(async (t) => {
+      await dbHelper.remove(UsersSkill, id, params, t)
+      await serviceHelper.deleteRecordFromEs(id, params, resource)
+    })
+  } catch (e) {
+    helper.publishError(config.UBAHN_ERROR_TOPIC, entity, 'userskill.delete')
+    throw e
+  }
 }
 
 module.exports = {