diff --git a/README.md b/README.md index 19231dc..e2790ac 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,6 @@ Microservice to manage CRUD operations for all things Projects. * Install [libpg](https://www.npmjs.com/package/pg-native) ### Steps to run locally - 1. Install node dependencies ```bash npm install @@ -142,5 +141,4 @@ You may replace 172.17.0.1 with your docker0 IP. You can paste **swagger.yaml** to [swagger editor](http://editor.swagger.io/) or import **postman.json** and **postman_environment.json** to verify endpoints. #### Deploying without docker - If you don't want to use docker to deploy to localhost. You can simply run `npm run start:dev` from root of project. This should start the server on default port `8001`. diff --git a/package.json b/package.json index 84829a0..37bf074 100644 --- a/package.json +++ b/package.json @@ -75,7 +75,7 @@ "babel-plugin-add-module-exports": "^0.2.1", "babel-plugin-transform-runtime": "^6.23.0", "babel-preset-es2015": "^6.9.0", - "bunyan": "^1.8.1", + "bunyan": "^1.8.12", "chai": "^3.5.0", "chai-as-promised": "^7.1.1", "eslint": "^3.16.1", diff --git a/src/events/index.js b/src/events/index.js index e161ef6..c4c121b 100644 --- a/src/events/index.js +++ b/src/events/index.js @@ -1,58 +1,73 @@ import { EVENT, CONNECT_NOTIFICATION_EVENT } from '../constants'; -import { projectCreatedHandler, projectUpdatedHandler, projectDeletedHandler, +import { projectCreatedHandler, projectUpdatedKafkaHandler } from './projects'; -import { projectMemberAddedHandler, projectMemberRemovedHandler, - projectMemberUpdatedHandler } from './projectMembers'; -import { projectMemberInviteCreatedHandler, - projectMemberInviteUpdatedHandler } from './projectMemberInvites'; -import { projectAttachmentRemovedHandler, - projectAttachmentUpdatedHandler, projectAttachmentAddedHandler } from './projectAttachments'; import { projectPhaseAddedHandler, projectPhaseRemovedHandler, projectPhaseUpdatedHandler } from './projectPhases'; -import { phaseProductAddedHandler, phaseProductRemovedHandler, - phaseProductUpdatedHandler } from './phaseProducts'; import { timelineAddedHandler, - timelineUpdatedHandler, - timelineRemovedHandler, timelineAdjustedKafkaHandler, } from './timelines'; import { milestoneAddedHandler, milestoneUpdatedHandler, - milestoneRemovedHandler, milestoneUpdatedKafkaHandler, } from './milestones'; +/** + * Void RabbitMQ event handler. + * It "ack"s messages which are still published but we don't want to consume. + * + * It's used to "disable" events which we don't want to handle anymore. But for a time being + * we don't want to remove the code of them until we validate that we are good without them. + * + * @param {Object} logger logger + * @param {Object} msg RabbitMQ message + * @param {Object} channel RabbitMQ channel + * @returns {Promise} nothing + */ +const voidRabbitHandler = (logger, msg, channel) => { + logger.debug('Calling void RabbitMQ handler.'); + channel.ack(msg); + return Promise.resolve(); +}; + +// NOTE: We use "project-processor-es" for ES indexing now. +// So I disable indexing using RabbitMQ for a transition period for most of the objects +// which don't have any special logic. +// As soon as we are sure, that "project-processor-es" works well for ES indexing, +// we should completely remove the handlers for this events. export const rabbitHandlers = { - 'project.initial': projectCreatedHandler, - [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: projectCreatedHandler, - [EVENT.ROUTING_KEY.PROJECT_UPDATED]: projectUpdatedHandler, - [EVENT.ROUTING_KEY.PROJECT_DELETED]: projectDeletedHandler, - [EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: projectMemberAddedHandler, - [EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: projectMemberRemovedHandler, - [EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: projectMemberUpdatedHandler, - [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: projectMemberInviteCreatedHandler, - [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: projectMemberInviteUpdatedHandler, - [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: projectAttachmentAddedHandler, - [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: projectAttachmentRemovedHandler, - [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: projectAttachmentUpdatedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_ADDED]: phaseProductAddedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_REMOVED]: phaseProductRemovedHandler, - [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_UPDATED]: phaseProductUpdatedHandler, + 'project.initial': projectCreatedHandler, // is only used `seedElasticsearchIndex.js` and can be removed + [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_UPDATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_DELETED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: voidRabbitHandler, // DISABLED + + // project phase handles additionally implement logic for creating associated topics in Message Service + [EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler, // index in ES because of cascade updates + [EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler, // doesn't index in ES + [EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler, // index in ES because of cascade updates + + [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_ADDED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_REMOVED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_UPDATED]: voidRabbitHandler, // DISABLED // Timeline and milestone - 'timeline.initial': timelineAddedHandler, - [EVENT.ROUTING_KEY.TIMELINE_ADDED]: timelineAddedHandler, - [EVENT.ROUTING_KEY.TIMELINE_REMOVED]: timelineRemovedHandler, - [EVENT.ROUTING_KEY.TIMELINE_UPDATED]: timelineUpdatedHandler, - [EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler, - [EVENT.ROUTING_KEY.MILESTONE_REMOVED]: milestoneRemovedHandler, - [EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler, + 'timeline.initial': timelineAddedHandler, // is only used `seedElasticsearchIndex.js` and can be removed + [EVENT.ROUTING_KEY.TIMELINE_ADDED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.TIMELINE_REMOVED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.TIMELINE_UPDATED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler, // index in ES because of cascade updates + [EVENT.ROUTING_KEY.MILESTONE_REMOVED]: voidRabbitHandler, // DISABLED + [EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler, // index in ES because of cascade updates }; export const kafkaHandlers = { diff --git a/src/events/projectPhases/index.js b/src/events/projectPhases/index.js index e8e3337..ec5c21b 100644 --- a/src/events/projectPhases/index.js +++ b/src/events/projectPhases/index.js @@ -243,7 +243,7 @@ const projectPhaseUpdatedHandler = Promise.coroutine(function* (logger, msg, cha * @param {Object} msg event payload * @returns {undefined} */ -const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names +const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names, no-unused-vars try { const data = JSON.parse(msg.content.toString()); const phase = _.get(data, 'deleted', {}); @@ -316,7 +316,8 @@ const removeTopics = Promise.coroutine(function* (logger, phase, route) { // esl */ const projectPhaseRemovedHandler = Promise.coroutine(function* (logger, msg, channel) { // eslint-disable-line func-names try { - yield removePhaseFromIndex(logger, msg, channel); + // NOTE We use "project-processor-es" for ES indexing now. + // yield removePhaseFromIndex(logger, msg, channel); const data = JSON.parse(msg.content.toString()); const phase = _.get(data, 'deleted', {}); const route = _.get(data, 'route'); diff --git a/src/routes/milestones/create.js b/src/routes/milestones/create.js index 0bbdf54..08cda37 100644 --- a/src/routes/milestones/create.js +++ b/src/routes/milestones/create.js @@ -102,10 +102,6 @@ module.exports = [ }), ) .then((otherUpdated) => { - // Do not send events for the updated milestones here, - // because it will make 'version conflict' error in ES. - // The order of the other milestones need to be updated in the MILESTONE_ADDED event handler - // Send event to bus req.log.debug('Sending event to RabbitMQ bus for milestone %d', result.id); req.app.services.pubsub.publish(EVENT.ROUTING_KEY.MILESTONE_ADDED, @@ -113,15 +109,20 @@ module.exports = [ { correlationId: req.id }, ); - // emit the event + // NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED + // Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it. util.sendResourceToKafkaBus( req, EVENT.ROUTING_KEY.MILESTONE_ADDED, RESOURCES.MILESTONE, result); - - // emit the event for other milestone order updated + // NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED + // Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them. + // + // We don't process these event in "project-processor-es" + // because it will make 'version conflict' error in ES. + // The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler _.map(otherUpdated, milestone => util.sendResourceToKafkaBus( req, diff --git a/src/routes/phases/create.js b/src/routes/phases/create.js index 5207163..080e191 100644 --- a/src/routes/phases/create.js +++ b/src/routes/phases/create.js @@ -145,14 +145,20 @@ module.exports = [ { correlationId: req.id }, ); - // emit the event + // NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED + // Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it. util.sendResourceToKafkaBus( req, EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED, RESOURCES.PHASE, newProjectPhase); - // emit the event for other phase order updated + // NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED + // Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them. + // + // We don't process these event in "project-processor-es" + // because it will make 'version conflict' error in ES. + // The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler _.map(otherUpdated, phase => util.sendResourceToKafkaBus( req, diff --git a/src/routes/phases/create.spec.js b/src/routes/phases/create.spec.js index 94ee55c..106cac7 100644 --- a/src/routes/phases/create.spec.js +++ b/src/routes/phases/create.spec.js @@ -482,8 +482,10 @@ describe('Project Phases', () => { let publishSpy; let sandbox; - // Wait for 500ms in order to wait for createEvent calls from previous tests to complete - before(async () => new Promise(resolve => setTimeout(() => resolve(), 500))); + before((done) => { + // Wait for 500ms in order to wait for createEvent calls from previous tests to complete + testUtil.wait(done); + }); beforeEach(async () => { sandbox = sinon.sandbox.create(); diff --git a/src/routes/phases/delete.spec.js b/src/routes/phases/delete.spec.js index 17eea96..e619ed7 100644 --- a/src/routes/phases/delete.spec.js +++ b/src/routes/phases/delete.spec.js @@ -297,8 +297,10 @@ describe('Project Phases', () => { let publishSpy; let sandbox; - // Wait for 500ms in order to wait for createEvent calls from previous tests to complete - before(async () => new Promise(resolve => setTimeout(() => resolve(), 500))); + before((done) => { + // Wait for 500ms in order to wait for createEvent calls from previous tests to complete + testUtil.wait(done); + }); beforeEach(async () => { sandbox = sinon.sandbox.create(); diff --git a/src/routes/phases/update.spec.js b/src/routes/phases/update.spec.js index 408670a..b8b6910 100644 --- a/src/routes/phases/update.spec.js +++ b/src/routes/phases/update.spec.js @@ -719,8 +719,10 @@ describe('Project Phases', () => { let publishSpy; let sandbox; - // Wait for 500ms in order to wait for createEvent calls from previous tests to complete - before(async () => new Promise(resolve => setTimeout(() => resolve(), 500))); + before((done) => { + // Wait for 500ms in order to wait for createEvent calls from previous tests to complete + testUtil.wait(done); + }); beforeEach(async () => { sandbox = sinon.sandbox.create(); diff --git a/src/routes/projects/list.js b/src/routes/projects/list.js index 3a03d05..733ad73 100755 --- a/src/routes/projects/list.js +++ b/src/routes/projects/list.js @@ -569,9 +569,11 @@ module.exports = [ return retrieveProjects(req, criteria, sort, req.query.fields) .then((result) => { if (result.rows.length === 0) { + req.log.debug('No projects found in ES'); return retrieveProjectsFromDB(req, criteria, sort, req.query.fields) .then(r => util.setPaginationHeaders(req, res, r)); } + req.log.debug('Projects found in ES'); // set header return util.setPaginationHeaders(req, res, result); }) @@ -584,9 +586,11 @@ module.exports = [ return retrieveProjects(req, criteria, sort, req.query.fields) .then((result) => { if (result.rows.length === 0) { + req.log.debug('No projects found in ES'); return retrieveProjectsFromDB(req, criteria, sort, req.query.fields) .then(r => util.setPaginationHeaders(req, res, r)); } + req.log.debug('Projects found in ES'); return util.setPaginationHeaders(req, res, result); }) .catch(err => next(err)); diff --git a/src/routes/works/create.spec.js b/src/routes/works/create.spec.js index 52fe870..0d3c6d6 100644 --- a/src/routes/works/create.spec.js +++ b/src/routes/works/create.spec.js @@ -7,12 +7,19 @@ import _ from 'lodash'; import chai from 'chai'; import sinon from 'sinon'; import request from 'supertest'; +import config from 'config'; import models from '../../models'; import server from '../../app'; import testUtil from '../../tests/util'; import busApi from '../../services/busApi'; +import messageService from '../../services/messageService'; +import RabbitMQService from '../../services/rabbitmq'; +import mockRabbitMQ from '../../tests/mockRabbitMQ'; import { BUS_API_EVENT, CONNECT_NOTIFICATION_EVENT, RESOURCES } from '../../constants'; +const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName'); +const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType'); + const should = chai.should(); const validatePhase = (resJson, expectedPhase) => { @@ -370,5 +377,89 @@ describe('CREATE work', () => { }); }); }); + + describe('RabbitMQ Message topic', () => { + let createMessageSpy; + let publishSpy; + let sandbox; + + before((done) => { + // Wait for 500ms in order to wait for createEvent calls from previous tests to complete + testUtil.wait(done); + }); + + beforeEach(async () => { + sandbox = sinon.sandbox.create(); + server.services.pubsub = new RabbitMQService(server.logger); + + // initialize RabbitMQ + server.services.pubsub.init( + config.get('rabbitmqURL'), + config.get('pubsubExchangeName'), + config.get('pubsubQueueName'), + ); + + // add project to ES index + await server.services.es.index({ + index: ES_PROJECT_INDEX, + type: ES_PROJECT_TYPE, + id: projectId, + body: { + doc: project, + }, + }); + + return new Promise(resolve => setTimeout(() => { + publishSpy = sandbox.spy(server.services.pubsub, 'publish'); + createMessageSpy = sandbox.spy(messageService, 'createTopic'); + resolve(); + }, 500)); + }); + + afterEach(() => { + sandbox.restore(); + }); + + after(() => { + mockRabbitMQ(server); + }); + + it('should send message topic when work added', (done) => { + const mockHttpClient = _.merge(testUtil.mockHttpClient, { + post: () => Promise.resolve({ + status: 200, + data: { + id: 'requesterId', + version: 'v3', + result: { + success: true, + status: 200, + content: {}, + }, + }, + }), + }); + sandbox.stub(messageService, 'getClient', () => mockHttpClient); + request(server) + .post(`/v5/projects/${projectId}/workstreams/${workStreamId}/works`) + .set({ + Authorization: `Bearer ${testUtil.jwts.connectAdmin}`, + }) + .send(body) + .expect(201) + .end((err) => { + if (err) { + done(err); + } else { + testUtil.wait(() => { + publishSpy.calledOnce.should.be.true; + publishSpy.calledWith('project.phase.added').should.be.true; + createMessageSpy.calledTwice.should.be.true; + done(); + }); + } + }); + }); + }); }); }); diff --git a/src/routes/works/delete.spec.js b/src/routes/works/delete.spec.js index b57c25c..e1ed3ba 100644 --- a/src/routes/works/delete.spec.js +++ b/src/routes/works/delete.spec.js @@ -6,12 +6,19 @@ import _ from 'lodash'; import request from 'supertest'; import chai from 'chai'; import sinon from 'sinon'; +import config from 'config'; import models from '../../models'; import server from '../../app'; import testUtil from '../../tests/util'; import busApi from '../../services/busApi'; +import messageService from '../../services/messageService'; +import RabbitMQService from '../../services/rabbitmq'; +import mockRabbitMQ from '../../tests/mockRabbitMQ'; import { BUS_API_EVENT, CONNECT_NOTIFICATION_EVENT, RESOURCES } from '../../constants'; +const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName'); +const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType'); + chai.should(); const expectAfterDelete = (workId, projectId, workStreamId, err, next) => { @@ -72,7 +79,15 @@ describe('DELETE work', () => { lastActivityAt: 1, lastActivityUserId: '1', }; - + const topic = { + id: 1, + title: 'test project phase', + posts: + [{ id: 1, + type: 'post', + body: 'body', + }], + }; beforeEach((done) => { testUtil.clearDb() .then(() => { @@ -300,5 +315,94 @@ describe('DELETE work', () => { }); }); }); + + describe('RabbitMQ Message topic', () => { + let deleteTopicSpy; + let deletePostsSpy; + let publishSpy; + let sandbox; + + before((done) => { + // Wait for 500ms in order to wait for createEvent calls from previous tests to complete + testUtil.wait(done); + }); + + beforeEach(async () => { + sandbox = sinon.sandbox.create(); + server.services.pubsub = new RabbitMQService(server.logger); + + // initialize RabbitMQ + server.services.pubsub.init( + config.get('rabbitmqURL'), + config.get('pubsubExchangeName'), + config.get('pubsubQueueName'), + ); + + // add project to ES index + await server.services.es.index({ + index: ES_PROJECT_INDEX, + type: ES_PROJECT_TYPE, + id: projectId, + body: { + doc: _.assign(project, { phases: [_.assign({ + name: 'test project phase', + status: 'active', + startDate: '2018-05-15T00:00:00Z', + endDate: '2018-05-15T12:00:00Z', + budget: 20.0, + progress: 1.23456, + details: { + message: 'This can be any json', + }, + createdBy: 1, + updatedBy: 1, + projectId, + }, { id: workId, projectId })] }), + }, + }); + + return new Promise(resolve => setTimeout(() => { + publishSpy = sandbox.spy(server.services.pubsub, 'publish'); + deleteTopicSpy = sandbox.spy(messageService, 'deleteTopic'); + deletePostsSpy = sandbox.spy(messageService, 'deletePosts'); + sandbox.stub(messageService, 'getTopicByTag', () => Promise.resolve(topic)); + resolve(); + }, 500)); + }); + + afterEach(() => { + sandbox.restore(); + }); + + after(() => { + mockRabbitMQ(server); + }); + + it('should send message topic when work deleted', (done) => { + const mockHttpClient = _.merge(testUtil.mockHttpClient, { + delete: () => Promise.resolve(true), + }); + sandbox.stub(messageService, 'getClient', () => mockHttpClient); + request(server) + .delete(`/v5/projects/${projectId}/workstreams/${workStreamId}/works/${workId}`) + .set({ + Authorization: `Bearer ${testUtil.jwts.admin}`, + }) + .expect(204) + .end((err) => { + if (err) { + done(err); + } else { + testUtil.wait(() => { + publishSpy.calledOnce.should.be.true; + publishSpy.calledWith('project.phase.removed').should.be.true; + deleteTopicSpy.calledTwice.should.be.true; + deletePostsSpy.calledTwice.should.be.true; + done(); + }); + } + }); + }); + }); }); }); diff --git a/src/routes/works/update.spec.js b/src/routes/works/update.spec.js index 9528669..ce4301b 100644 --- a/src/routes/works/update.spec.js +++ b/src/routes/works/update.spec.js @@ -6,12 +6,19 @@ import _ from 'lodash'; import chai from 'chai'; import request from 'supertest'; import sinon from 'sinon'; +import config from 'config'; import models from '../../models'; import server from '../../app'; import testUtil from '../../tests/util'; import busApi from '../../services/busApi'; +import messageService from '../../services/messageService'; +import RabbitMQService from '../../services/rabbitmq'; +import mockRabbitMQ from '../../tests/mockRabbitMQ'; import { BUS_API_EVENT, RESOURCES, CONNECT_NOTIFICATION_EVENT } from '../../constants'; +const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName'); +const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType'); + const should = chai.should(); const body = { @@ -87,6 +94,15 @@ describe('UPDATE work', () => { lastActivityAt: 1, lastActivityUserId: '1', }; + const topic = { + id: 1, + title: 'test project phase', + posts: + [{ id: 1, + type: 'post', + body: 'body', + }], + }; beforeEach((done) => { testUtil.clearDb() .then(() => { @@ -691,17 +707,17 @@ describe('UPDATE work', () => { }); }); - /* describe('RabbitMQ Message topic', () => { + describe('RabbitMQ Message topic', () => { let updateMessageSpy; let publishSpy; let sandbox; - before(async (done) => { + before((done) => { // Wait for 500ms in order to wait for createEvent calls from previous tests to complete testUtil.wait(done); }); - beforeEach(async (done) => { + beforeEach(async () => { sandbox = sinon.sandbox.create(); server.services.pubsub = new RabbitMQService(server.logger); @@ -722,12 +738,12 @@ describe('UPDATE work', () => { }, }); - testUtil.wait(() => { + return new Promise(resolve => setTimeout(() => { publishSpy = sandbox.spy(server.services.pubsub, 'publish'); updateMessageSpy = sandbox.spy(messageService, 'updateTopic'); sandbox.stub(messageService, 'getTopicByTag', () => Promise.resolve(topic)); - done(); - }); + resolve(); + }, 500)); }); afterEach(() => { @@ -755,26 +771,26 @@ describe('UPDATE work', () => { }); sandbox.stub(messageService, 'getClient', () => mockHttpClient); request(server) - .patch(`/v4/projects/${projectId}/workstreams/${workStreamId}/works/${workId}`) - .set({ - Authorization: `Bearer ${testUtil.jwts.admin}`, - }) - .send({ param: _.assign(updateBody, { budget: 123 }) }) - .expect('Content-Type', /json/) - .expect(200) - .end((err) => { - if (err) { - done(err); - } else { - testUtil.wait(() => { - publishSpy.calledOnce.should.be.true; - publishSpy.calledWith('project.phase.updated').should.be.true; - updateMessageSpy.calledTwice.should.be.true; - done(); - }); - } - }); + .patch(`/v5/projects/${projectId}/workstreams/${workStreamId}/works/${workId}`) + .set({ + Authorization: `Bearer ${testUtil.jwts.admin}`, + }) + .send(_.assign(updateBody, { budget: 123 })) + .expect('Content-Type', /json/) + .expect(200) + .end((err) => { + if (err) { + done(err); + } else { + testUtil.wait(() => { + publishSpy.calledOnce.should.be.true; + publishSpy.calledWith('project.phase.updated').should.be.true; + updateMessageSpy.calledTwice.should.be.true; + done(); + }); + } + }); }); - }); */ + }); }); }); diff --git a/src/services/busApi.js b/src/services/busApi.js index 56c5e0f..5916c03 100644 --- a/src/services/busApi.js +++ b/src/services/busApi.js @@ -46,7 +46,7 @@ async function getClient() { function createEvent(topic, payload, logger) { logger.debug(`Sending message to topic ${topic}: ${JSON.stringify(payload)}`); return getClient().then((busClient) => { - logger.debug('calling bus-api'); + logger.debug(`calling bus-api for topic ${topic}`); return busClient.post('/bus/events', { topic, originator: 'project-api', @@ -54,11 +54,11 @@ function createEvent(topic, payload, logger) { 'mime-type': 'application/json', payload, }).then((resp) => { - logger.debug('Sent event to bus-api'); - logger.debug(`Sent event to bus-api [data]: ${_.get(resp, 'data')}`); - logger.debug(`Sent event to bus-api [status]: ${_.get(resp, 'status')}`); + logger.debug(`Sent event to bus-api for topic ${topic}`); + logger.debug(`Sent event to bus-api for topic ${topic} [data]: ${_.get(resp, 'data')}`); + logger.debug(`Sent event to bus-api for topic ${topic} [status]: ${_.get(resp, 'status')}`); }).catch((error) => { - logger.debug('Error sending event to bus-api'); + logger.debug(`Error sending event to bus-api for topic ${topic}`); if (error.response) { // The request was made and the server responded with a status code // that falls out of the range of 2xx