diff --git a/README.md b/README.md index 86fd63e..8637573 100755 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Configuration for the application is at config/default.js and config/production. - ELASTICCLOUD_ID: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this - ELASTICCLOUD_USERNAME: The elastic cloud username for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud - ELASTICCLOUD_PASSWORD: The elastic cloud password for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud +- MAX_BATCH_SIZE: Restrict number of records in memory during bulk insert (Used by the db to es migration script) - MAX_RESULT_SIZE: The Results Per Query Limits. Default is `1000` (Used by the db to es migration script) - MAX_BULK_SIZE: The Bulk Indexing Maximum Limits. Default is `100` (Used by the db to es migration script) diff --git a/app.js b/app.js index 39be486..f54dd4d 100755 --- a/app.js +++ b/app.js @@ -51,7 +51,6 @@ _.each(routes, (verbs, url) => { return next(errors.newAuthError('Action is not allowed for invalid token')) } req.auth = req.authUser - req.auth.sub = req.auth.userId if (req.authUser.roles) { // all access are allowed if (_.isEmpty(access)) { diff --git a/config/default.js b/config/default.js index bbff95f..0766f4d 100755 --- a/config/default.js +++ b/config/default.js @@ -126,6 +126,7 @@ module.exports = { orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders' } }, + MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000, MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000, MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100 } diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 2d9b96f..4f6147c 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -155,8 +155,23 @@ async function insertIntoES (modelName, dataset) { } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] - let users = [] - // query all users + if (userResource.nested === true && userResource.mappingCreated !== true) { + await client.indices.putMapping({ + index: topResources.user.index, + type: topResources.user.type, + include_type_name: true, + body: { + properties: { + [userResource.propertyName]: { + type: 'nested' + } + } + } + }) + userResource.mappingCreated = true + } + + // chunk the list to process const idsArr = _.chunk(_.uniq(_.map(dataset, 'userId')), config.get('ES.MAX_RESULT_SIZE')) for (const ids of idsArr) { const res = await client.search({ @@ -171,68 +186,47 @@ async function insertIntoES (modelName, dataset) { } } }) - users.push(..._.map(res.body.hits.hits, '_source')) - } + const users = _.filter(_.map(res.body.hits.hits, '_source'), user => { + if (!user[userResource.propertyName]) { + user[userResource.propertyName] = [] + } + let updated = false + _.forEach(_.filter(dataset, ['userId', user.id]), body => { + const relateId = body[userResource.relateKey] + if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`) + } else { + updated = true + user[userResource.propertyName].push(body) + } + }) + return updated + }) - // remove unreference resource - for (const data of dataset) { - if (!_.some(users, ['id', data.userId])) { + const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE')) + for (const us of chunked) { + const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc]) + await client.bulk({ + index: topResources.user.index, + type: topResources.user.type, + body, + pipeline: topResources.user.pipeline.id, + refresh: 'wait_for' + }) + } + const deleteRecord = _.filter(dataset, d => _.includes(ids, d.userId) && !_.some(users, ['id', d.userId])) + // remove unreference resource + for (const data of deleteRecord) { logger.info(`The ${modelName} references user with id ${data.userId}, which does not exist. Deleting the reference...`) // The user does not exist. Delete the referece records await dbHelper.remove(models[modelName], data.id) logger.info('Reference deleted') } } - - if (userResource.nested === true && userResource.mappingCreated !== true) { - await client.indices.putMapping({ - index: topResources.user.index, - type: topResources.user.type, - include_type_name: true, - body: { - properties: { - [userResource.propertyName]: { - type: 'nested' - } - } - } - }) - userResource.mappingCreated = true - } - - users = _.filter(users, user => { - if (!user[userResource.propertyName]) { - user[userResource.propertyName] = [] - } - let updated = false - _.forEach(_.filter(dataset, ['userId', user.id]), body => { - const relateId = body[userResource.relateKey] - if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`) - } else { - updated = true - user[userResource.propertyName].push(body) - } - }) - return updated - }) - - const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE')) - for (const us of chunked) { - const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc]) - await client.bulk({ - index: topResources.user.index, - type: topResources.user.type, - body, - pipeline: topResources.user.pipeline.id, - refresh: 'wait_for' - }) - } } else if (_.includes(_.keys(organizationResources), esResourceName)) { const orgResource = organizationResources[esResourceName] - let organizations = [] - // query all organizations + // chunk the list to process const idsArr = _.chunk(_.uniq(_.map(dataset, 'organizationId')), config.get('ES.MAX_RESULT_SIZE')) for (const ids of idsArr) { const res = await client.search({ @@ -247,45 +241,41 @@ async function insertIntoES (modelName, dataset) { } } }) - organizations.push(..._.map(res.body.hits.hits, '_source')) - } + const organizations = _.filter(_.map(res.body.hits.hits, '_source'), organization => { + if (!organization[orgResource.propertyName]) { + organization[orgResource.propertyName] = [] + } + let updated = false + _.forEach(_.filter(dataset, ['organizationId', organization.id]), body => { + const relateId = body[orgResource.relateKey] + if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) + } else { + updated = true + organization[orgResource.propertyName].push(body) + } + }) + return updated + }) - for (const data of dataset) { - if (!_.some(organizations, ['id', data.organizationId])) { + const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE')) + for (const os of chunked) { + const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc]) + await client.bulk({ + index: topResources.organization.index, + type: topResources.organization.type, + body, + refresh: 'wait_for' + }) + } + const deleteRecord = _.filter(dataset, d => _.includes(ids, d.organizationId) && !_.some(organizations, ['id', d.organizationId])) + for (const data of deleteRecord) { logger.info(`The ${modelName} references org with id ${data.organizationId}, which does not exist. Deleting the reference...`) // The org does not exist. Delete the referece records await dbHelper.remove(models[modelName], data.id) logger.info('Reference deleted') } } - - organizations = _.filter(organizations, organization => { - if (!organization[orgResource.propertyName]) { - organization[orgResource.propertyName] = [] - } - let updated = false - _.forEach(_.filter(dataset, ['organizationId', organization.id]), body => { - const relateId = body[orgResource.relateKey] - if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) - } else { - updated = true - organization[orgResource.propertyName].push(body) - } - }) - return updated - }) - - const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE')) - for (const os of chunked) { - const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc]) - await client.bulk({ - index: topResources.organization.index, - type: topResources.organization.type, - body, - refresh: 'wait_for' - }) - } } } @@ -402,25 +392,29 @@ async function main () { for (let i = 0; i < keys.length; i++) { const key = keys[i] try { - const data = await dbHelper.find(models[key], {}) - - for (let i = 0; i < data.length; i++) { - logger.info(`Inserting data ${i + 1} of ${data.length}`) - logger.info(JSON.stringify(data[i])) - if (!_.isString(data[i].created)) { - data[i].created = new Date() - } - if (!_.isString(data[i].updated)) { - data[i].updated = new Date() - } - if (!_.isString(data[i].createdBy)) { - data[i].createdBy = 'tcAdmin' - } - if (!_.isString(data[i].updatedBy)) { - data[i].updatedBy = 'tcAdmin' + const allData = await dbHelper.find(models[key], {}) + let j = 0 + const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE')) + for (const data of dataset) { + for (let i = 0; i < data.length; i++) { + j++ + logger.info(`Inserting data ${j} of ${allData.length}`) + logger.info(JSON.stringify(data[i])) + if (!_.isString(data[i].created)) { + data[i].created = new Date() + } + if (!_.isString(data[i].updated)) { + data[i].updated = new Date() + } + if (!_.isString(data[i].createdBy)) { + data[i].createdBy = 'tcAdmin' + } + if (!_.isString(data[i].updatedBy)) { + data[i].updatedBy = 'tcAdmin' + } } + await insertIntoES(key, data) } - await insertIntoES(key, data) logger.info('import data for ' + key + ' done') } catch (e) { logger.error(e)