Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit dbfddd5

Browse files
author
sachin-maheshwari
authored
Merge pull request #103 from topcoder-platform/feature/shapeup4-cqrs-update
Shapeup4 - CQRS standards update (user object)
2 parents 878b94d + 61bb014 commit dbfddd5

File tree

8 files changed

+186
-27
lines changed

8 files changed

+186
-27
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ workflows:
7070
branches:
7171
only:
7272
- develop
73+
- feature/shapeup4-cqrs-update
7374

7475
# Production builds are exectuted only on tagged commits to the
7576
# master branch.

config/default.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ module.exports = {
3535
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',
3636

3737
// topics
38+
UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error',
39+
3840
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
3941
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
4042
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',

docker-pgsql-es/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ services:
44
image: "postgres:12.4"
55
volumes:
66
- database-data:/var/lib/postgresql/data/
7-
ports:
7+
ports:
88
- "5432:5432"
99
environment:
1010
POSTGRES_PASSWORD: ${DB_PASSWORD}
1111
POSTGRES_USER: ${DB_USERNAME}
1212
POSTGRES_DB: ${DB_NAME}
1313
esearch:
14-
image: elasticsearch:7.7.1
14+
image: elasticsearch:7.13.4
1515
container_name: ubahn-data-processor-es_es
1616
ports:
1717
- "9200:9200"

src/common/db-helper.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,27 @@ async function get (model, pk, params) {
105105
* @param model the sequelize model object
106106
* @param entity entity to create
107107
* @param auth the user auth object
108+
* @param transaction the transaction object
108109
* @returns {Promise<void>}
109110
*/
110-
async function create (model, entity, auth) {
111+
async function create (model, entity, auth, transaction) {
111112
if (auth) {
112113
entity.createdBy = helper.getAuthUser(auth)
113114
}
114-
return model.create(entity)
115+
return model.create(entity, { transaction })
115116
}
116117

117118
/**
118119
* delete object by pk
119120
* @param model the sequelize model object
120121
* @param pk the primary key
122+
* @param transaction the transaction object
121123
* @returns {Promise<void>}
122124
*/
123-
async function remove (model, pk, params) {
125+
async function remove (model, pk, params, transaction) {
124126
const instance = await get(model, pk, params)
125-
return instance.destroy()
127+
const result = await instance.destroy({ transaction })
128+
return result
126129
}
127130

128131
/**
@@ -132,13 +135,14 @@ async function remove (model, pk, params) {
132135
* @param entity entity to create
133136
* @param auth the auth object
134137
* @param auth the path params
138+
* @param transaction the transaction object
135139
* @returns {Promise<void>}
136140
*/
137-
async function update (model, pk, entity, auth, params) {
141+
async function update (model, pk, entity, auth, params, transaction) {
138142
// insure that object exists
139143
const instance = await get(model, pk, params)
140144
entity.updatedBy = helper.getAuthUser(auth)
141-
return instance.update(entity)
145+
return instance.update(entity, { transaction })
142146
}
143147

144148
/**

src/common/es-helper.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const config = require('config')
22
const _ = require('lodash')
33
const querystring = require('querystring')
44
const logger = require('../common/logger')
5+
const helper = require('../common/helper')
56
const appConst = require('../consts')
67
const esClient = require('./es-client').getESClient()
78

@@ -282,6 +283,38 @@ function escapeRegex (str) {
282283
/* eslint-enable no-useless-escape */
283284
}
284285

286+
/**
287+
* Process create entity
288+
* @param {String} resource resource name
289+
* @param {Object} entity entity object
290+
*/
291+
async function processCreate (resource, entity) {
292+
helper.validProperties(entity, ['id'])
293+
await esClient.index({
294+
index: DOCUMENTS[resource].index,
295+
type: DOCUMENTS[resource].type,
296+
id: entity.id,
297+
body: entity,
298+
refresh: 'wait_for'
299+
})
300+
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
301+
}
302+
303+
/**
304+
* Process delete entity
305+
* @param {String} resource resource name
306+
* @param {Object} entity entity object
307+
*/
308+
async function processDelete (resource, entity) {
309+
helper.validProperties(entity, ['id'])
310+
await esClient.delete({
311+
index: DOCUMENTS[resource].index,
312+
type: DOCUMENTS[resource].type,
313+
id: entity.id,
314+
refresh: 'wait_for'
315+
})
316+
}
317+
285318
async function getOrganizationId (handle) {
286319
const dBHelper = require('../common/db-helper')
287320
const sequelize = require('../models/index')
@@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
14531486
}
14541487

14551488
module.exports = {
1489+
processCreate,
1490+
processUpdate: processCreate,
1491+
processDelete,
14561492
searchElasticSearch,
14571493
getFromElasticSearch,
14581494
searchUsers,

src/common/helper.js

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const config = require('config')
2+
const Joi = require('@hapi/joi')
23
const querystring = require('querystring')
34
const errors = require('./errors')
45
const appConst = require('../consts')
@@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
910
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
1011
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
1112

13+
/**
14+
* Function to valid require keys
15+
* @param {Object} payload validated object
16+
* @param {Array} keys required keys
17+
* @throws {Error} if required key absent
18+
*/
19+
function validProperties (payload, keys) {
20+
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
21+
const error = schema.validate(payload).error
22+
if (error) {
23+
throw error
24+
}
25+
}
26+
1227
/**
1328
* get auth user handle or id
1429
* @param authUser the user
@@ -145,12 +160,33 @@ async function postEvent (topic, payload) {
145160
await busApiClient.postEvent(message)
146161
}
147162

163+
/**
164+
* Send error event to Kafka
165+
* @params {String} topic the topic name
166+
* @params {Object} payload the payload
167+
* @params {String} action for which operation error occurred
168+
*/
169+
async function publishError (topic, payload, action) {
170+
_.set(payload, 'apiAction', action)
171+
const message = {
172+
topic,
173+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
174+
timestamp: new Date().toISOString(),
175+
'mime-type': 'application/json',
176+
payload
177+
}
178+
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
179+
await busApiClient.postEvent(message)
180+
}
181+
148182
module.exports = {
183+
validProperties,
149184
getAuthUser,
150185
permissionCheck,
151186
checkIfExists,
152187
injectSearchMeta,
153188
getControllerMethods,
154189
getSubControllerMethods,
155-
postEvent
190+
postEvent,
191+
publishError
156192
}

src/common/service-helper.js

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,49 @@ const MODEL_TO_RESOURCE = {
3838
* Create record in es
3939
* @param resource the resource to create
4040
* @param result the resource fields
41+
* @param toEs is to es directly
4142
*/
42-
async function createRecordInEs (resource, entity) {
43+
async function createRecordInEs (resource, entity, toEs) {
4344
try {
44-
await publishMessage('create', resource, entity)
45+
if (toEs) {
46+
await esHelper.processCreate(resource, entity)
47+
}
4548
} catch (err) {
4649
logger.logFullError(err)
50+
throw err
51+
}
52+
53+
if (!toEs) {
54+
try {
55+
await publishMessage("create", resource, entity);
56+
} catch (err) {
57+
logger.logFullError(err);
58+
}
4759
}
60+
4861
}
4962

5063
/**
5164
* Patch record in es
5265
* @param resource the resource to create
5366
* @param result the resource fields
67+
* @param toEs is to es directly
5468
*/
55-
async function patchRecordInEs (resource, entity) {
69+
async function patchRecordInEs (resource, entity, toEs) {
5670
try {
57-
await publishMessage('patch', resource, entity)
71+
if (toEs) {
72+
await esHelper.processUpdate(resource, entity)
73+
}
5874
} catch (err) {
5975
logger.logFullError(err)
76+
throw err
77+
}
78+
if (!toEs) {
79+
try {
80+
await publishMessage("patch", resource, entity);
81+
} catch (err) {
82+
logger.logFullError(err);
83+
}
6084
}
6185
}
6286

@@ -65,8 +89,9 @@ async function patchRecordInEs (resource, entity) {
6589
* @param id the id of record
6690
* @param params the params of record (like nested ids)
6791
* @param resource the resource to delete
92+
* @param toEs is to es directly
6893
*/
69-
async function deleteRecordFromEs (id, params, resource) {
94+
async function deleteRecordFromEs (id, params, resource, toEs) {
7095
let payload
7196
if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) {
7297
payload = _.assign({}, params)
@@ -76,9 +101,19 @@ async function deleteRecordFromEs (id, params, resource) {
76101
}
77102
}
78103
try {
79-
await publishMessage('remove', resource, payload)
104+
if (toEs) {
105+
await esHelper.processDelete(resource, payload)
106+
}
80107
} catch (err) {
81108
logger.logFullError(err)
109+
throw err
110+
}
111+
if (!toEs) {
112+
try {
113+
await publishMessage("remove", resource, payload);
114+
} catch (err) {
115+
logger.logFullError(err);
116+
}
82117
}
83118
}
84119

@@ -174,13 +209,14 @@ function sleep (ms) {
174209
}
175210

176211
/**
177-
* delete child of record with delay between each item deleted
212+
* delete child of record with delay between each item deleted and with transaction
178213
* @param model the child model to delete
179214
* @param id the user id to delete
180215
* @param params the params for child
181216
* @param resourceName the es recource name
217+
* @param transaction the transaction object
182218
*/
183-
async function deleteChild (model, id, params, resourceName) {
219+
async function deleteChild (model, id, params, resourceName, transaction) {
184220
const query = {}
185221
query[params[0]] = id
186222
const result = await dbHelper.find(model, query)
@@ -194,8 +230,8 @@ async function deleteChild (model, id, params, resourceName) {
194230
params.forEach(attr => { esParams[attr] = record[attr] })
195231

196232
// remove from db
197-
dbHelper.remove(model, record.id)
198-
deleteRecordFromEs(record.id, esParams, resourceName)
233+
await dbHelper.remove(model, record.id, transaction)
234+
await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction)
199235

200236
// sleep for configured time
201237
await sleep(config.CASCADE_PAUSE_MS)

0 commit comments

Comments
 (0)