Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit c95a3ba

Browse files
Add aggregate topic support
1 parent b004459 commit c95a3ba

File tree

7 files changed

+53
-17
lines changed

7 files changed

+53
-17
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The following parameters can be set in config files or in env variables:
2323
- UBAHN_CREATE_TOPIC: the create ubahn entity Kafka message topic, default value is 'u-bahn.action.create'
2424
- UBAHN_UPDATE_TOPIC: the update ubahn entity Kafka message topic, default value is 'u-bahn.action.update'
2525
- UBAHN_DELETE_TOPIC: the delete ubahn entity Kafka message topic, default value is 'u-bahn.action.delete'
26+
- UBAHN_AGGREGATE_TOPIC: the ubahn entity aggregate topic, that contains create, update and delete topics. Default value is 'u-bahn.action.aggregate'
2627
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
2728
- ES.AWS_REGION: The Amazon region to use when using AWS Elasticsearch service, default value is 'us-east-1'
2829
- ES.API_VERSION: Elasticsearch API version, default value is '7.4'

VERIFICATION.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,28 @@
11
# Verification
22

3+
**NOTE** - For all kafka message below, update the topic to be the one set in config.UBAHN_AGGREGATE_TOPIC and inside the payload object, create a new attribute named `originalTopic` with the value of the original topic. Example:
4+
5+
```
6+
{
7+
"topic": "u-bahn.action.aggregate",
8+
"originator": "u-bahn-api",
9+
"timestamp": "2019-07-08T00:00:00.000Z",
10+
"mime-type": "application/json",
11+
"payload": {
12+
"originalTopic": "u-bahn.action.create"
13+
"resource": "user",
14+
"id": "391a3656-9a01-47d4-8c6d-64b68c44f212",
15+
"handle": "user"
16+
}
17+
}
18+
```
19+
20+
Additionally, you will be entering the messages into only one topic:
21+
22+
```
23+
docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.aggregate
24+
```
25+
326
1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
427
2. start kafka-console-producer to write messages to `u-bahn.action.create`
528
topic:

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module.exports = {
1717
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
1818
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
1919
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
20+
UBAHN_AGGREGATE_TOPIC: process.env.UBAHN_AGGREGATE_TOPIC || 'u-bahn.action.aggregate',
2021

2122
ES: {
2223
HOST: process.env.ES_HOST || 'localhost:9200',

docker-kafka-es/docker-compose.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
15+
# KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
16+
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1"
1617
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1718
# esearch:
1819
# image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2

src/app.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6969
}
7070
const transactionId = _.uniqueId('transaction_')
7171
try {
72-
switch (topic) {
72+
switch (messageJSON.payload.originalTopic) {
7373
case config.UBAHN_CREATE_TOPIC:
7474
await ProcessorService.processCreate(messageJSON, transactionId)
7575
break
@@ -79,6 +79,8 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
7979
case config.UBAHN_DELETE_TOPIC:
8080
await ProcessorService.processDelete(messageJSON, transactionId)
8181
break
82+
default:
83+
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
8284
}
8385

8486
logger.debug(`Successfully processed message with count ${messageCount}`)
@@ -106,7 +108,8 @@ const check = () => {
106108
return connected
107109
}
108110

109-
const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
111+
// const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
112+
const topics = [config.UBAHN_AGGREGATE_TOPIC]
110113

111114
consumer
112115
.init([{

src/common/helper.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
128128
transactionId,
129129
body: { doc: body },
130130
if_seq_no: seqNo,
131-
if_primary_term: primaryTerm
131+
if_primary_term: primaryTerm,
132+
refresh: 'wait_for'
132133
})
133134
}
134135

@@ -161,7 +162,8 @@ async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionI
161162
transactionId,
162163
body: { doc: body },
163164
if_seq_no: seqNo,
164-
if_primary_term: primaryTerm
165+
if_primary_term: primaryTerm,
166+
refresh: 'wait_for'
165167
})
166168
}
167169

src/services/ProcessorService.js

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const {
1111
userResources,
1212
organizationResources
1313
} = require('../common/constants')
14+
const config = require('config')
1415

1516
/**
1617
* Process create entity message
@@ -27,8 +28,8 @@ async function processCreate (message, transactionId) {
2728
index: topResources[resource].index,
2829
type: topResources[resource].type,
2930
id: message.payload.id,
30-
body: _.omit(message.payload, 'resource'),
31-
refresh: 'true'
31+
body: _.omit(message.payload, ['resource', 'originalTopic']),
32+
refresh: 'wait_for'
3233
})
3334
} else if (_.includes(_.keys(userResources), resource)) {
3435
// process user resources such as userSkill, userAttribute...
@@ -45,7 +46,7 @@ async function processCreate (message, transactionId) {
4546
logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`)
4647
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
4748
} else {
48-
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
49+
user[userResource.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
4950
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
5051
}
5152
} else if (_.includes(_.keys(organizationResources), resource)) {
@@ -63,7 +64,7 @@ async function processCreate (message, transactionId) {
6364
logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`)
6465
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
6566
} else {
66-
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
67+
org[orgResources.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
6768
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
6869
}
6970
} else {
@@ -78,7 +79,8 @@ processCreate.schema = {
7879
timestamp: Joi.date().required(),
7980
'mime-type': Joi.string().required(),
8081
payload: Joi.object().keys({
81-
resource: Joi.string().required()
82+
resource: Joi.string().required(),
83+
originalTopic: Joi.string().required().valid(config.UBAHN_CREATE_TOPIC)
8284
}).required().unknown(true)
8385
}).required(),
8486
transactionId: Joi.string().required()
@@ -105,10 +107,11 @@ async function processUpdate (message, transactionId) {
105107
id,
106108
transactionId,
107109
body: {
108-
doc: _.assign(source._source, _.omit(message.payload, 'resource'))
110+
doc: _.assign(source._source, _.omit(message.payload, ['resource', 'originalTopic']))
109111
},
110112
if_seq_no: source._seq_no,
111-
if_primary_term: source._primary_term
113+
if_primary_term: source._primary_term,
114+
refresh: 'wait_for'
112115
})
113116
} else if (_.includes(_.keys(userResources), resource)) {
114117
// process user resources such as userSkill, userAttribute...
@@ -127,7 +130,7 @@ async function processUpdate (message, transactionId) {
127130
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
128131
} else {
129132
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
130-
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
133+
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
131134
logger.info(`Updating ${user.id} and ${relateId}`)
132135
await helper.updateUser(message.payload.userId, user, seqNo, primaryTerm, transactionId)
133136
logger.info(`Updated ${user.id} and ${relateId}`)
@@ -146,7 +149,7 @@ async function processUpdate (message, transactionId) {
146149
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
147150
} else {
148151
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
149-
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
152+
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
150153
await helper.updateOrg(message.payload.organizationId, org, seqNo, primaryTerm, transactionId)
151154
}
152155
} else {
@@ -161,7 +164,8 @@ processUpdate.schema = {
161164
timestamp: Joi.date().required(),
162165
'mime-type': Joi.string().required(),
163166
payload: Joi.object().keys({
164-
resource: Joi.string().required()
167+
resource: Joi.string().required(),
168+
originalTopic: Joi.string().required().valid(config.UBAHN_UPDATE_TOPIC)
165169
}).required().unknown(true)
166170
}).required(),
167171
transactionId: Joi.string().required()
@@ -182,7 +186,7 @@ async function processDelete (message, transactionId) {
182186
index: topResources[resource].index,
183187
type: topResources[resource].type,
184188
id: message.payload.id,
185-
refresh: 'true'
189+
refresh: 'wait_for'
186190
})
187191
} else if (_.includes(_.keys(userResources), resource)) {
188192
// process user resources such as userSkill, userAttribute...
@@ -226,7 +230,8 @@ processDelete.schema = {
226230
timestamp: Joi.date().required(),
227231
'mime-type': Joi.string().required(),
228232
payload: Joi.object().keys({
229-
resource: Joi.string().required()
233+
resource: Joi.string().required(),
234+
originalTopic: Joi.string().required().valid(config.UBAHN_DELETE_TOPIC)
230235
}).required().unknown(true)
231236
}).required(),
232237
transactionId: Joi.string().required()

0 commit comments

Comments
 (0)