Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Sync master with develop for prod go-live #13

Merged
merged 29 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6cbd3f0
Use mutex to limit access to Elasticsearch
callmekatootie Aug 23, 2020
ff8ba38
Limit read access to elasticsearch
callmekatootie Aug 23, 2020
64bfee1
Restore changes carried out in 830712e4043264b2cc8ec49ffd922078c6dda8…
callmekatootie Aug 23, 2020
48e701d
Pass sequence and primary term during update to ensure that we are up…
callmekatootie Aug 24, 2020
465d55b
Limit ES access to one at a time
callmekatootie Aug 24, 2020
86bde0e
Undo changes that limit ES access
callmekatootie Aug 24, 2020
f4efc96
Merge pull request #8 from topcoder-platform/mutex
cwdcwd Aug 24, 2020
c8ad7f9
Identify and combine same actions in a single mutex
callmekatootie Aug 27, 2020
846f172
Group elasticsearch actions on the same message to use the same mutex
callmekatootie Aug 27, 2020
0eb1b18
Merge pull request #9 from topcoder-platform/transaction_members
callmekatootie Aug 27, 2020
b0631fe
Debugging messages
callmekatootie Aug 27, 2020
555320c
Revert "Debugging messages"
callmekatootie Aug 27, 2020
5424805
Revert "Group elasticsearch actions on the same message to use the sa…
callmekatootie Aug 27, 2020
5116a88
Merge pull request #10 from topcoder-platform/revert-9-transaction_me…
callmekatootie Aug 27, 2020
8cf077a
Merge pull request #11 from topcoder-platform/transaction
callmekatootie Aug 27, 2020
2a8b87b
Revert "Identify and combine same actions in a single mutex"
callmekatootie Aug 28, 2020
dc3c600
Merge pull request #12 from topcoder-platform/revert-11-transaction
callmekatootie Aug 28, 2020
6ba12fe
Debugging why members solution did not work
callmekatootie Aug 28, 2020
39c2541
Pass transactionId during update of top resource
callmekatootie Aug 28, 2020
10cff5f
Log when mutex is acquired and released
callmekatootie Aug 28, 2020
c86a22a
Remove debug logs
callmekatootie Aug 28, 2020
8942530
Undo changes carried out for the issue of ES not updating all users a…
callmekatootie Aug 29, 2020
5fb8950
Update ES updates to wait for index to refresh before returning with …
callmekatootie Aug 29, 2020
f558d59
Update to listen to only aggregate topic
callmekatootie Aug 29, 2020
d6a48bc
Revert "Update to listen to only aggregate topic"
callmekatootie Aug 29, 2020
835878c
Revert "Update ES updates to wait for index to refresh before returni…
callmekatootie Aug 29, 2020
b004459
Revert "Undo changes carried out for the issue of ES not updating all…
callmekatootie Aug 29, 2020
c95a3ba
Add aggregate topic support
callmekatootie Aug 29, 2020
3efa8c6
Pass transactionId during create and delete operations too
callmekatootie Aug 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The following parameters can be set in config files or in env variables:
- UBAHN_CREATE_TOPIC: the create ubahn entity Kafka message topic, default value is 'u-bahn.action.create'
- UBAHN_UPDATE_TOPIC: the update ubahn entity Kafka message topic, default value is 'u-bahn.action.update'
- UBAHN_DELETE_TOPIC: the delete ubahn entity Kafka message topic, default value is 'u-bahn.action.delete'
- UBAHN_AGGREGATE_TOPIC: the ubahn entity aggregate topic, that contains create, update and delete topics. Default value is 'u-bahn.action.aggregate'
- ES.HOST: Elasticsearch host, default value is 'localhost:9200'
- ES.AWS_REGION: The Amazon region to use when using AWS Elasticsearch service, default value is 'us-east-1'
- ES.API_VERSION: Elasticsearch API version, default value is '7.4'
Expand Down
23 changes: 23 additions & 0 deletions VERIFICATION.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# Verification

**NOTE** - For all kafka message below, update the topic to be the one set in config.UBAHN_AGGREGATE_TOPIC and inside the payload object, create a new attribute named `originalTopic` with the value of the original topic. Example:

```
{
"topic": "u-bahn.action.aggregate",
"originator": "u-bahn-api",
"timestamp": "2019-07-08T00:00:00.000Z",
"mime-type": "application/json",
"payload": {
"originalTopic": "u-bahn.action.create"
"resource": "user",
"id": "391a3656-9a01-47d4-8c6d-64b68c44f212",
"handle": "user"
}
}
```

Additionally, you will be entering the messages into only one topic:

```
docker exec -it ubahn-data-processor-es_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic u-bahn.action.aggregate
```

1. start kafka server, start elasticsearch, initialize Elasticsearch, start processor app
2. start kafka-console-producer to write messages to `u-bahn.action.create`
topic:
Expand Down
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module.exports = {
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
UBAHN_AGGREGATE_TOPIC: process.env.UBAHN_AGGREGATE_TOPIC || 'u-bahn.action.aggregate',

ES: {
HOST: process.env.ES_HOST || 'localhost:9200',
Expand Down
2 changes: 1 addition & 1 deletion config/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

module.exports = {
WAIT_TIME: 1500,
WAIT_TIME: 1500
// ES: {
// ACHIEVEMENT_PROVIDER_INDEX: process.env.ACHIEVEMENT_PROVIDER_INDEX || 'achievement_provider_test',
// ATTRIBUTE_INDEX: process.env.ATTRIBUTE_INDEX || 'attribute_test',
Expand Down
3 changes: 2 additions & 1 deletion docker-kafka-es/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
# KAFKA_CREATE_TOPICS: "u-bahn.action.create:1:1,u-bahn.action.update:1:1,u-bahn.action.delete:1:1"
KAFKA_CREATE_TOPICS: "u-bahn.action.aggregate:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# esearch:
# image: docker.elastic.co/elasticsearch/elasticsearch:7.4.2
Expand Down
17 changes: 11 additions & 6 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
global.Promise = require('bluebird')
const config = require('config')
const Kafka = require('no-kafka')
const _ = require('lodash')
const healthcheck = require('topcoder-healthcheck-dropin')
const logger = require('./common/logger')
const helper = require('./common/helper')
Expand Down Expand Up @@ -66,24 +67,27 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
await consumer.commitOffset({ topic, partition, offset: m.offset })
return
}

const transactionId = _.uniqueId('transaction_')
try {
switch (topic) {
switch (messageJSON.payload.originalTopic) {
case config.UBAHN_CREATE_TOPIC:
await ProcessorService.processCreate(messageJSON)
await ProcessorService.processCreate(messageJSON, transactionId)
break
case config.UBAHN_UPDATE_TOPIC:
await ProcessorService.processUpdate(messageJSON)
await ProcessorService.processUpdate(messageJSON, transactionId)
break
case config.UBAHN_DELETE_TOPIC:
await ProcessorService.processDelete(messageJSON)
await ProcessorService.processDelete(messageJSON, transactionId)
break
default:
throw new Error(`Unknown original topic: ${messageJSON.payload.originalTopic}`)
}

logger.debug(`Successfully processed message with count ${messageCount}`)
} catch (err) {
logger.logFullError(err)
} finally {
helper.checkEsMutexRelease(transactionId)
logger.debug(`Commiting offset after processing message with count ${messageCount}`)

// Commit offset regardless of error
Expand All @@ -104,7 +108,8 @@ const check = () => {
return connected
}

const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
// const topics = [config.UBAHN_CREATE_TOPIC, config.UBAHN_UPDATE_TOPIC, config.UBAHN_DELETE_TOPIC]
const topics = [config.UBAHN_AGGREGATE_TOPIC]

consumer
.init([{
Expand Down
82 changes: 73 additions & 9 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ const config = require('config')
const elasticsearch = require('elasticsearch')
const _ = require('lodash')
const Joi = require('@hapi/joi')
const { Mutex } = require('async-mutex')

AWS.config.region = config.ES.AWS_REGION

// Elasticsearch client
let esClient
let transactionId
// Mutex to ensure that only one elasticsearch action is carried out at any given time
const esClientMutex = new Mutex()
const mutexReleaseMap = {}

/**
* Get Kafka options
Expand Down Expand Up @@ -51,6 +56,31 @@ async function getESClient () {
host
})
}

// Patch the transport to enable mutex
esClient.transport.originalRequest = esClient.transport.request
esClient.transport.request = async (params) => {
const tId = _.get(params.query, 'transactionId')
params.query = _.omit(params.query, 'transactionId')
if (!tId || tId !== transactionId) {
const release = await esClientMutex.acquire()
mutexReleaseMap[tId || 'noTransaction'] = release
transactionId = tId
}
try {
return await esClient.transport.originalRequest(params)
} finally {
if (params.method !== 'GET' || !tId) {
const release = mutexReleaseMap[tId || 'noTransaction']
delete mutexReleaseMap[tId || 'noTransaction']
transactionId = undefined
if (release) {
release()
}
}
}
}

return esClient
}

Expand All @@ -71,50 +101,68 @@ function validProperties (payload, keys) {
/**
* Function to get user from es
* @param {String} userId
* @param {String} transactionId
* @returns {Object} user
*/
async function getUser (userId) {
async function getUser (userId, transactionId) {
const client = await getESClient()
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId, transactionId })
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
}

/**
* Function to update es user
* @param {String} userId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {String} transactionId
* @param {Object} body
*/
async function updateUser (userId, body) {
async function updateUser (userId, body, seqNo, primaryTerm, transactionId) {
const client = await getESClient()
await client.update({
index: config.get('ES.USER_INDEX'),
type: config.get('ES.USER_TYPE'),
id: userId,
body: { doc: body }
transactionId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm,
refresh: 'wait_for'
})
}

/**
* Function to get org from es
* @param {String} organizationId
* @param {String} transactionId
* @returns {Object} organization
*/
async function getOrg (organizationId) {
async function getOrg (organizationId, transactionId) {
const client = await getESClient()
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId, transactionId })
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
}

/**
* Function to update es organization
* @param {String} organizationId
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {String} transactionId
* @param {Object} body
*/
async function updateOrg (organizationId, body) {
async function updateOrg (organizationId, body, seqNo, primaryTerm, transactionId) {
const client = await getESClient()
await client.update({
index: config.get('ES.ORGANIZATION_INDEX'),
type: config.get('ES.ORGANIZATION_TYPE'),
id: organizationId,
body: { doc: body }
transactionId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm,
refresh: 'wait_for'
})
}

Expand All @@ -130,6 +178,21 @@ function getErrorWithStatus (message, statusCode) {
return error
}

/**
* Ensure the esClient mutex is released
* @param {String} tId transactionId
*/
function checkEsMutexRelease (tId) {
if (tId === transactionId) {
const release = mutexReleaseMap[tId]
delete mutexReleaseMap[tId]
transactionId = undefined
if (release) {
release()
}
}
}

module.exports = {
getKafkaOptions,
getESClient,
Expand All @@ -138,5 +201,6 @@ module.exports = {
updateUser,
getOrg,
updateOrg,
getErrorWithStatus
getErrorWithStatus,
checkEsMutexRelease
}
Loading