Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Revert "Identify and combine same actions in a single mutex" #12

Merged
merged 1 commit into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 41 additions & 99 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ AWS.config.region = config.ES.AWS_REGION

// Elasticsearch client
let esClient

// Mutex to ensure that only one elasticsearch action is carried out at any given time
const esClientMutex = new Mutex()

Expand Down Expand Up @@ -56,18 +55,18 @@ async function getESClient () {
})
}

return esClient
}

/**
* Wraps original get es client function
* to control access to elasticsearch using a mutex
*/
async function getESClientWrapper () {
const client = await getESClient()
const release = await esClientMutex.acquire()
// Patch the transport to enable mutex
esClient.transport.originalRequest = esClient.transport.request
esClient.transport.request = async (params) => {
const release = await esClientMutex.acquire()
try {
return await esClient.transport.originalRequest(params)
} finally {
release()
}
}

return { client, release }
return esClient
}

/**
Expand All @@ -87,25 +86,17 @@ function validProperties (payload, keys) {
/**
* Function to get user from es
* @param {String} userId
* @param {Boolean} isTransaction Is this part of a transaction?
* @param {Boolean} sourceOnly
* @returns {Object} user
*/
async function getUser (userId, isTransaction = false) {
const { client, release } = await getESClientWrapper()

try {
const user = await client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })

if (isTransaction) {
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source, release }
}
async function getUser (userId, sourceOnly = true) {
const client = await getESClient()

return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
} finally {
if (!isTransaction) {
release()
}
if (sourceOnly) {
return client.getSource({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
}

return client.get({ index: config.get('ES.USER_INDEX'), type: config.get('ES.USER_TYPE'), id: userId })
}

/**
Expand All @@ -114,92 +105,43 @@ async function getUser (userId, isTransaction = false) {
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {Object} body
* @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to release
*/
async function updateUser (userId, body, seqNo, primaryTerm, isTransaction = false) {
let client, release

if (isTransaction) {
client = await getESClient()
} else {
const esClient = await getESClientWrapper()
client = esClient.client
release = esClient.release
}

try {
await client.update({
index: config.get('ES.USER_INDEX'),
type: config.get('ES.USER_TYPE'),
id: userId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm
})
} finally {
if (!isTransaction) {
release()
}
}
async function updateUser (userId, body, seqNo, primaryTerm) {
const client = await getESClient()
await client.update({
index: config.get('ES.USER_INDEX'),
type: config.get('ES.USER_TYPE'),
id: userId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm
})
}

/**
* Function to get org from es
* @param {String} organizationId
* @param {Boolean} isTransaction Is this part of a transaction?
* @returns {Object} organization
*/
async function getOrg (organizationId, isTransaction = false) {
const { client, release } = await getESClientWrapper()

try {
const org = await client.get({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })

if (isTransaction) {
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source, release }
}

return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
} finally {
if (!isTransaction) {
release()
}
}
async function getOrg (organizationId) {
const client = await getESClient()
return client.getSource({ index: config.get('ES.ORGANIZATION_INDEX'), type: config.get('ES.ORGANIZATION_TYPE'), id: organizationId })
}

/**
* Function to update es organization
* @param {String} organizationId
* @param {Object} body
* @param {Number} seqNo
* @param {Number} primaryTerm
* @param {Boolean} isTransaction If this is part of a transaction, it will not attempt to lock
*/
async function updateOrg (organizationId, body, seqNo, primaryTerm, isTransaction = false) {
let client, release

if (isTransaction) {
client = await getESClient()
} else {
const esClient = await getESClientWrapper()
client = esClient.client
release = esClient.release
}

try {
await client.update({
index: config.get('ES.ORGANIZATION_INDEX'),
type: config.get('ES.ORGANIZATION_TYPE'),
id: organizationId,
body: { doc: body },
if_seq_no: seqNo,
if_primary_term: primaryTerm
})
} finally {
if (!isTransaction) {
release()
}
}
async function updateOrg (organizationId, body) {
const client = await getESClient()
await client.update({
index: config.get('ES.ORGANIZATION_INDEX'),
type: config.get('ES.ORGANIZATION_TYPE'),
id: organizationId,
body: { doc: body },
refresh: 'true'
})
}

/**
Expand All @@ -216,7 +158,7 @@ function getErrorWithStatus (message, statusCode) {

module.exports = {
getKafkaOptions,
getESClientWrapper,
getESClient,
validProperties,
getUser,
updateUser,
Expand Down
Loading