diff --git a/config/default.js b/config/default.js index 1df223b..9093e2d 100755 --- a/config/default.js +++ b/config/default.js @@ -35,6 +35,9 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics + UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_USER_TOPIC || 'user.action.topic.create', + UBAHN_UPDATE_USER_TOPIC: process.env.UBAHN_UPDATE_USER_TOPIC || 'user.action.topic.update', + UBAHN_DELETE_USER_TOPIC: process.env.UBAHN_DELETE_USER_TOPIC || 'user.action.topic.delete', UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete', diff --git a/docker-pgsql-es/docker-compose.yml b/docker-pgsql-es/docker-compose.yml index 31f5a45..9ce9d2e 100644 --- a/docker-pgsql-es/docker-compose.yml +++ b/docker-pgsql-es/docker-compose.yml @@ -4,14 +4,14 @@ services: image: "postgres:12.4" volumes: - database-data:/var/lib/postgresql/data/ - ports: + ports: - "5432:5432" environment: POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_USER: ${DB_USERNAME} POSTGRES_DB: ${DB_NAME} esearch: - image: elasticsearch:7.7.1 + image: elasticsearch:7.13.4 container_name: ubahn-data-processor-es_es ports: - "9200:9200" diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 33ea58e..6a63e86 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -4,8 +4,10 @@ const joi = require('@hapi/joi') const _ = require('lodash') +const config = require('config') const errors = require('../../common/errors') +const logger = require('../../common/logger') const helper = require('../../common/helper') const dbHelper = require('../../common/db-helper') const serviceHelper = require('../../common/service-helper') @@ -33,7 +35,11 @@ async function create (entity, auth) { const result = await sequelize.transaction(async (t) => { const userEntity = await dbHelper.create(User, entity, auth, t) await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) - return userEntity + try { + await helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, userEntity.dataValues) + } catch (err) { + logger.logFullError(err) + } }) return result @@ -62,6 +68,12 @@ async function patch (id, entity, auth, params) { const result = await sequelize.transaction(async (t) => { const newEntity = await dbHelper.update(User, id, entity, auth, null, t) await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + + try { + await helper.postEvent(config.UBAHN_UPDATE_USER_TOPIC, newEntity.dataValues) + } catch (err) { + logger.logFullError(err) + } return newEntity }) @@ -176,6 +188,11 @@ async function beginCascadeDelete (id, params) { await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) await dbHelper.remove(User, id, null, t) await serviceHelper.deleteRecordFromEs(id, params, resource, true) + try { + await helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, {id}) + } catch (err) { + logger.logFullError(err) + } }) }