Skip to content
This repository was archived by the owner on Mar 12, 2025. It is now read-only.

Updates from another repo 1 #6

Merged
merged 2 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Microservice to manage CRUD operations for all things Projects.
* Install [libpg](https://www.npmjs.com/package/pg-native)

### Steps to run locally

1. Install node dependencies
```bash
npm install
Expand Down Expand Up @@ -142,5 +141,4 @@ You may replace 172.17.0.1 with your docker0 IP.
You can paste **swagger.yaml** to [swagger editor](http://editor.swagger.io/) or import **postman.json** and **postman_environment.json** to verify endpoints.

#### Deploying without docker

If you don't want to use docker to deploy to localhost. You can simply run `npm run start:dev` from root of project. This should start the server on default port `8001`.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-transform-runtime": "^6.23.0",
"babel-preset-es2015": "^6.9.0",
"bunyan": "^1.8.1",
"bunyan": "^1.8.12",
"chai": "^3.5.0",
"chai-as-promised": "^7.1.1",
"eslint": "^3.16.1",
Expand Down
89 changes: 52 additions & 37 deletions src/events/index.js
Original file line number Diff line number Diff line change
@@ -1,58 +1,73 @@

import { EVENT, CONNECT_NOTIFICATION_EVENT } from '../constants';
import { projectCreatedHandler, projectUpdatedHandler, projectDeletedHandler,
import { projectCreatedHandler,
projectUpdatedKafkaHandler } from './projects';
import { projectMemberAddedHandler, projectMemberRemovedHandler,
projectMemberUpdatedHandler } from './projectMembers';
import { projectMemberInviteCreatedHandler,
projectMemberInviteUpdatedHandler } from './projectMemberInvites';
import { projectAttachmentRemovedHandler,
projectAttachmentUpdatedHandler, projectAttachmentAddedHandler } from './projectAttachments';
import { projectPhaseAddedHandler, projectPhaseRemovedHandler,
projectPhaseUpdatedHandler } from './projectPhases';
import { phaseProductAddedHandler, phaseProductRemovedHandler,
phaseProductUpdatedHandler } from './phaseProducts';
import {
timelineAddedHandler,
timelineUpdatedHandler,
timelineRemovedHandler,
timelineAdjustedKafkaHandler,
} from './timelines';
import {
milestoneAddedHandler,
milestoneUpdatedHandler,
milestoneRemovedHandler,
milestoneUpdatedKafkaHandler,
} from './milestones';

/**
* Void RabbitMQ event handler.
* It "ack"s messages which are still published but we don't want to consume.
*
* It's used to "disable" events which we don't want to handle anymore. But for a time being
* we don't want to remove the code of them until we validate that we are good without them.
*
* @param {Object} logger logger
* @param {Object} msg RabbitMQ message
* @param {Object} channel RabbitMQ channel
* @returns {Promise} nothing
*/
const voidRabbitHandler = (logger, msg, channel) => {
logger.debug('Calling void RabbitMQ handler.');
channel.ack(msg);
return Promise.resolve();
};

// NOTE: We use "project-processor-es" for ES indexing now.
// So I disable indexing using RabbitMQ for a transition period for most of the objects
// which don't have any special logic.
// As soon as we are sure, that "project-processor-es" works well for ES indexing,
// we should completely remove the handlers for this events.
export const rabbitHandlers = {
'project.initial': projectCreatedHandler,
[EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: projectCreatedHandler,
[EVENT.ROUTING_KEY.PROJECT_UPDATED]: projectUpdatedHandler,
[EVENT.ROUTING_KEY.PROJECT_DELETED]: projectDeletedHandler,
[EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: projectMemberAddedHandler,
[EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: projectMemberRemovedHandler,
[EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: projectMemberUpdatedHandler,
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: projectMemberInviteCreatedHandler,
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: projectMemberInviteUpdatedHandler,
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: projectAttachmentAddedHandler,
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: projectAttachmentRemovedHandler,
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: projectAttachmentUpdatedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_ADDED]: phaseProductAddedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_REMOVED]: phaseProductRemovedHandler,
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_UPDATED]: phaseProductUpdatedHandler,
'project.initial': projectCreatedHandler, // is only used `seedElasticsearchIndex.js` and can be removed
[EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_UPDATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_DELETED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: voidRabbitHandler, // DISABLED

// project phase handles additionally implement logic for creating associated topics in Message Service
[EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler, // index in ES because of cascade updates
[EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler, // doesn't index in ES
[EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler, // index in ES because of cascade updates

[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_ADDED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_REMOVED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_UPDATED]: voidRabbitHandler, // DISABLED

// Timeline and milestone
'timeline.initial': timelineAddedHandler,
[EVENT.ROUTING_KEY.TIMELINE_ADDED]: timelineAddedHandler,
[EVENT.ROUTING_KEY.TIMELINE_REMOVED]: timelineRemovedHandler,
[EVENT.ROUTING_KEY.TIMELINE_UPDATED]: timelineUpdatedHandler,
[EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler,
[EVENT.ROUTING_KEY.MILESTONE_REMOVED]: milestoneRemovedHandler,
[EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler,
'timeline.initial': timelineAddedHandler, // is only used `seedElasticsearchIndex.js` and can be removed
[EVENT.ROUTING_KEY.TIMELINE_ADDED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.TIMELINE_REMOVED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.TIMELINE_UPDATED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler, // index in ES because of cascade updates
[EVENT.ROUTING_KEY.MILESTONE_REMOVED]: voidRabbitHandler, // DISABLED
[EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler, // index in ES because of cascade updates
};

export const kafkaHandlers = {
Expand Down
5 changes: 3 additions & 2 deletions src/events/projectPhases/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ const projectPhaseUpdatedHandler = Promise.coroutine(function* (logger, msg, cha
* @param {Object} msg event payload
* @returns {undefined}
*/
const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names
const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names, no-unused-vars
try {
const data = JSON.parse(msg.content.toString());
const phase = _.get(data, 'deleted', {});
Expand Down Expand Up @@ -316,7 +316,8 @@ const removeTopics = Promise.coroutine(function* (logger, phase, route) { // esl
*/
const projectPhaseRemovedHandler = Promise.coroutine(function* (logger, msg, channel) { // eslint-disable-line func-names
try {
yield removePhaseFromIndex(logger, msg, channel);
// NOTE We use "project-processor-es" for ES indexing now.
// yield removePhaseFromIndex(logger, msg, channel);
const data = JSON.parse(msg.content.toString());
const phase = _.get(data, 'deleted', {});
const route = _.get(data, 'route');
Expand Down
15 changes: 8 additions & 7 deletions src/routes/milestones/create.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,27 @@ module.exports = [
}),
)
.then((otherUpdated) => {
// Do not send events for the updated milestones here,
// because it will make 'version conflict' error in ES.
// The order of the other milestones need to be updated in the MILESTONE_ADDED event handler

// Send event to bus
req.log.debug('Sending event to RabbitMQ bus for milestone %d', result.id);
req.app.services.pubsub.publish(EVENT.ROUTING_KEY.MILESTONE_ADDED,
result,
{ correlationId: req.id },
);

// emit the event
// NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED
// Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it.
util.sendResourceToKafkaBus(
req,
EVENT.ROUTING_KEY.MILESTONE_ADDED,
RESOURCES.MILESTONE,
result);


// emit the event for other milestone order updated
// NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED
// Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them.
//
// We don't process these event in "project-processor-es"
// because it will make 'version conflict' error in ES.
// The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler
_.map(otherUpdated, milestone =>
util.sendResourceToKafkaBus(
req,
Expand Down
10 changes: 8 additions & 2 deletions src/routes/phases/create.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,20 @@ module.exports = [
{ correlationId: req.id },
);

// emit the event
// NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED
// Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it.
util.sendResourceToKafkaBus(
req,
EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED,
RESOURCES.PHASE,
newProjectPhase);

// emit the event for other phase order updated
// NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED
// Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them.
//
// We don't process these event in "project-processor-es"
// because it will make 'version conflict' error in ES.
// The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler
_.map(otherUpdated, phase =>
util.sendResourceToKafkaBus(
req,
Expand Down
6 changes: 4 additions & 2 deletions src/routes/phases/create.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,10 @@ describe('Project Phases', () => {
let publishSpy;
let sandbox;

// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
before((done) => {
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
testUtil.wait(done);
});

beforeEach(async () => {
sandbox = sinon.sandbox.create();
Expand Down
6 changes: 4 additions & 2 deletions src/routes/phases/delete.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ describe('Project Phases', () => {
let publishSpy;
let sandbox;

// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
before((done) => {
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
testUtil.wait(done);
});

beforeEach(async () => {
sandbox = sinon.sandbox.create();
Expand Down
6 changes: 4 additions & 2 deletions src/routes/phases/update.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,10 @@ describe('Project Phases', () => {
let publishSpy;
let sandbox;

// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
before((done) => {
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
testUtil.wait(done);
});

beforeEach(async () => {
sandbox = sinon.sandbox.create();
Expand Down
4 changes: 4 additions & 0 deletions src/routes/projects/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,11 @@ module.exports = [
return retrieveProjects(req, criteria, sort, req.query.fields)
.then((result) => {
if (result.rows.length === 0) {
req.log.debug('No projects found in ES');
return retrieveProjectsFromDB(req, criteria, sort, req.query.fields)
.then(r => util.setPaginationHeaders(req, res, r));
}
req.log.debug('Projects found in ES');
// set header
return util.setPaginationHeaders(req, res, result);
})
Expand All @@ -584,9 +586,11 @@ module.exports = [
return retrieveProjects(req, criteria, sort, req.query.fields)
.then((result) => {
if (result.rows.length === 0) {
req.log.debug('No projects found in ES');
return retrieveProjectsFromDB(req, criteria, sort, req.query.fields)
.then(r => util.setPaginationHeaders(req, res, r));
}
req.log.debug('Projects found in ES');
return util.setPaginationHeaders(req, res, result);
})
.catch(err => next(err));
Expand Down
91 changes: 91 additions & 0 deletions src/routes/works/create.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@ import _ from 'lodash';
import chai from 'chai';
import sinon from 'sinon';
import request from 'supertest';
import config from 'config';
import models from '../../models';
import server from '../../app';
import testUtil from '../../tests/util';
import busApi from '../../services/busApi';
import messageService from '../../services/messageService';
import RabbitMQService from '../../services/rabbitmq';
import mockRabbitMQ from '../../tests/mockRabbitMQ';
import { BUS_API_EVENT, CONNECT_NOTIFICATION_EVENT, RESOURCES } from '../../constants';

const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName');
const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType');

const should = chai.should();

const validatePhase = (resJson, expectedPhase) => {
Expand Down Expand Up @@ -370,5 +377,89 @@ describe('CREATE work', () => {
});
});
});

describe('RabbitMQ Message topic', () => {
let createMessageSpy;
let publishSpy;
let sandbox;

before((done) => {
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
testUtil.wait(done);
});

beforeEach(async () => {
sandbox = sinon.sandbox.create();
server.services.pubsub = new RabbitMQService(server.logger);

// initialize RabbitMQ
server.services.pubsub.init(
config.get('rabbitmqURL'),
config.get('pubsubExchangeName'),
config.get('pubsubQueueName'),
);

// add project to ES index
await server.services.es.index({
index: ES_PROJECT_INDEX,
type: ES_PROJECT_TYPE,
id: projectId,
body: {
doc: project,
},
});

return new Promise(resolve => setTimeout(() => {
publishSpy = sandbox.spy(server.services.pubsub, 'publish');
createMessageSpy = sandbox.spy(messageService, 'createTopic');
resolve();
}, 500));
});

afterEach(() => {
sandbox.restore();
});

after(() => {
mockRabbitMQ(server);
});

it('should send message topic when work added', (done) => {
const mockHttpClient = _.merge(testUtil.mockHttpClient, {
post: () => Promise.resolve({
status: 200,
data: {
id: 'requesterId',
version: 'v3',
result: {
success: true,
status: 200,
content: {},
},
},
}),
});
sandbox.stub(messageService, 'getClient', () => mockHttpClient);
request(server)
.post(`/v5/projects/${projectId}/workstreams/${workStreamId}/works`)
.set({
Authorization: `Bearer ${testUtil.jwts.connectAdmin}`,
})
.send(body)
.expect(201)
.end((err) => {
if (err) {
done(err);
} else {
testUtil.wait(() => {
publishSpy.calledOnce.should.be.true;
publishSpy.calledWith('project.phase.added').should.be.true;
createMessageSpy.calledTwice.should.be.true;
done();
});
}
});
});
});
});
});
Loading