Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit f558d59

Browse files
Update to listen to only aggregate topic
1 parent 5fb8950 commit f558d59

File tree

6 files changed

+45
-12
lines changed

6 files changed

+45
-12
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The following parameters can be set in config files or in env variables:
2323
- UBAHN_CREATE_TOPIC: the create ubahn entity Kafka message topic, default value is 'u-bahn.action.create'
2424
- UBAHN_UPDATE_TOPIC: the update ubahn entity Kafka message topic, default value is 'u-bahn.action.update'
2525
- UBAHN_DELETE_TOPIC: the delete ubahn entity Kafka message topic, default value is 'u-bahn.action.delete'
26+
- UBAHN_AGGREGATE_TOPIC: the ubahn entity aggregate topic, that contains create, update and delete topics. Default value is 'u-bahn.action.aggregate'
2627
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
2728
- ES.AWS_REGION: The Amazon region to use when using AWS Elasticsearch service, default value is 'us-east-1'
2829
- ES.API_VERSION: Elasticsearch API version, default value is '7.4'

VERIFICATION.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,28 @@
11
# Verification
22

3+
**NOTE** - For all kafka message below, update the topic to be the one set in config.UBAHN_AGGREGATE_TOPIC and inside the payload object, create a new attribute named `originalTopic` with the value of the original topic. Example:
4+
5+
```
6+
{
7+
"topic": "u-bahn.action.aggregate",
8+
"originator": "u-bahn-api",
9+
"timestamp": "2019-07-08T00:00:00.000Z",
10+
"mime-type": "application/json",
11+
"payload": {
12+
"originalTopic": "u-bahn.action.create"
13+
"resource": "user",
14+
"id": "391a3656-9a01-47d4-8c6d-64b68c44f212",
15+
"handle": "user"
16+
}
17+
}
18+
```
19+
20+
Additionally, you will be entering the messages into only one topic:
21+
22+
```
23+
docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.aggregate
24+
```
25+
326
1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
427
2. start kafka-console-producer to write messages to `u-bahn.action.create`
528
topic:

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module.exports = {
1717
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
1818
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
1919
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
20+
UBAHN_AGGREGATE_TOPIC: process.env.UBAHN_AGGREGATE_TOPIC || 'u-bahn.action.aggregate',
2021

2122
ES: {
2223
HOST: process.env.ES_HOST || 'localhost:9200',

docker-kafka-es/docker-compose.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
15+
# KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
16+
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1"
1617
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1718
# esearch:
1819
# image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2

src/app.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
5353
}
5454

5555
try {
56-
switch (topic) {
56+
switch (messageJSON.payload.originalTopic) {
5757
case config.UBAHN_CREATE_TOPIC:
5858
await ProcessorService.processCreate(messageJSON)
5959
break
@@ -63,6 +63,8 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
6363
case config.UBAHN_DELETE_TOPIC:
6464
await ProcessorService.processDelete(messageJSON)
6565
break
66+
default:
67+
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
6668
}
6769

6870
logger.debug('Successfully processed message')
@@ -88,7 +90,8 @@ const check = () => {
8890
return connected
8991
}
9092

91-
const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
93+
// const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
94+
const topics = [config.UBAHN_AGGREGATE_TOPIC]
9295

9396
consumer
9497
.init([{

src/services/ProcessorService.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const {
1111
userResources,
1212
organizationResources
1313
} = require('../common/constants')
14+
const config = require('config')
1415

1516
/**
1617
* Process create entity message
@@ -26,7 +27,7 @@ async function processCreate (message) {
2627
index: topResources[resource].index,
2728
type: topResources[resource].type,
2829
id: message.payload.id,
29-
body: _.omit(message.payload, 'resource'),
30+
body: _.omit(message.payload, ['resource', 'originalTopic']),
3031
refresh: 'wait_for'
3132
})
3233
} else if (_.includes(_.keys(userResources), resource)) {
@@ -44,7 +45,7 @@ async function processCreate (message) {
4445
logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${message.payload.userId}`)
4546
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
4647
} else {
47-
user[userResource.propertyName].push(_.omit(message.payload, 'resource'))
48+
user[userResource.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
4849
await helper.updateUser(message.payload.userId, user)
4950
}
5051
} else if (_.includes(_.keys(organizationResources), resource)) {
@@ -62,7 +63,7 @@ async function processCreate (message) {
6263
logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${message.payload.organizationId}`)
6364
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
6465
} else {
65-
org[orgResources.propertyName].push(_.omit(message.payload, 'resource'))
66+
org[orgResources.propertyName].push(_.omit(message.payload, ['resource', 'originalTopic']))
6667
await helper.updateOrg(message.payload.organizationId, org)
6768
}
6869
} else {
@@ -77,7 +78,8 @@ processCreate.schema = {
7778
timestamp: Joi.date().required(),
7879
'mime-type': Joi.string().required(),
7980
payload: Joi.object().keys({
80-
resource: Joi.string().required()
81+
resource: Joi.string().required(),
82+
originalTopic: Joi.string().required().valid(config.UBAHN_CREATE_TOPIC)
8183
}).required().unknown(true)
8284
}).required()
8385
}
@@ -101,7 +103,7 @@ async function processUpdate (message) {
101103
type,
102104
id,
103105
body: {
104-
doc: _.assign(source, _.omit(message.payload, 'resource'))
106+
doc: _.assign(source, _.omit(message.payload, ['resource', 'originalTopic']))
105107
},
106108
refresh: 'wait_for'
107109
})
@@ -122,7 +124,7 @@ async function processUpdate (message) {
122124
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
123125
} else {
124126
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
125-
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
127+
user[userResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
126128
logger.info(`Updating ${user.id} and ${relateId}`)
127129
await helper.updateUser(message.payload.userId, user)
128130
logger.info(`Updated ${user.id} and ${relateId}`)
@@ -141,7 +143,7 @@ async function processUpdate (message) {
141143
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
142144
} else {
143145
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
144-
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, 'resource'))
146+
org[orgResource.propertyName].splice(updateIndex, 1, _.omit(message.payload, ['resource', 'originalTopic']))
145147
await helper.updateOrg(message.payload.organizationId, org)
146148
}
147149
} else {
@@ -156,7 +158,8 @@ processUpdate.schema = {
156158
timestamp: Joi.date().required(),
157159
'mime-type': Joi.string().required(),
158160
payload: Joi.object().keys({
159-
resource: Joi.string().required()
161+
resource: Joi.string().required(),
162+
originalTopic: Joi.string().required().valid(config.UBAHN_UPDATE_TOPIC)
160163
}).required().unknown(true)
161164
}).required()
162165
}
@@ -219,7 +222,8 @@ processDelete.schema = {
219222
timestamp: Joi.date().required(),
220223
'mime-type': Joi.string().required(),
221224
payload: Joi.object().keys({
222-
resource: Joi.string().required()
225+
resource: Joi.string().required(),
226+
originalTopic: Joi.string().required().valid(config.UBAHN_DELETE_TOPIC)
223227
}).required().unknown(true)
224228
}).required()
225229
}

0 commit comments

Comments
 (0)