Skip to content
This repository was archived by the owner on Mar 12, 2025. It is now read-only.

Commit 6d7ea04

Browse files
committed
feat: updates from another repo
- fix: removed duplicate ES index calls - feat: bring back RabbitMQ unit tests for works - refactor: simpler code to wait before test
1 parent 2aeef19 commit 6d7ea04

File tree

10 files changed

+295
-81
lines changed

10 files changed

+295
-81
lines changed

src/events/index.js

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,47 @@
11

22
import { EVENT, CONNECT_NOTIFICATION_EVENT } from '../constants';
3-
import { projectCreatedHandler, projectUpdatedHandler, projectDeletedHandler,
3+
import { projectCreatedHandler,
44
projectUpdatedKafkaHandler } from './projects';
5-
import { projectMemberAddedHandler, projectMemberRemovedHandler,
6-
projectMemberUpdatedHandler } from './projectMembers';
7-
import { projectMemberInviteCreatedHandler,
8-
projectMemberInviteUpdatedHandler } from './projectMemberInvites';
9-
import { projectAttachmentRemovedHandler,
10-
projectAttachmentUpdatedHandler, projectAttachmentAddedHandler } from './projectAttachments';
115
import { projectPhaseAddedHandler, projectPhaseRemovedHandler,
126
projectPhaseUpdatedHandler } from './projectPhases';
13-
import { phaseProductAddedHandler, phaseProductRemovedHandler,
14-
phaseProductUpdatedHandler } from './phaseProducts';
157
import {
168
timelineAddedHandler,
17-
timelineUpdatedHandler,
18-
timelineRemovedHandler,
199
timelineAdjustedKafkaHandler,
2010
} from './timelines';
2111
import {
2212
milestoneAddedHandler,
2313
milestoneUpdatedHandler,
24-
milestoneRemovedHandler,
2514
milestoneUpdatedKafkaHandler,
2615
} from './milestones';
2716

17+
// NOTE: We use "project-processor-es" for ES indexing now.
18+
// So I disable indexing using RabbitMQ for a transition period for most of the objects
19+
// which don't have any special logic.
20+
// As soon as we are sure, that "project-processor-es" works well for ES indexing,
21+
// we should completely remove the handlers for this events.
2822
export const rabbitHandlers = {
29-
'project.initial': projectCreatedHandler,
30-
[EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: projectCreatedHandler,
31-
[EVENT.ROUTING_KEY.PROJECT_UPDATED]: projectUpdatedHandler,
32-
[EVENT.ROUTING_KEY.PROJECT_DELETED]: projectDeletedHandler,
33-
[EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: projectMemberAddedHandler,
34-
[EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: projectMemberRemovedHandler,
35-
[EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: projectMemberUpdatedHandler,
36-
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: projectMemberInviteCreatedHandler,
37-
[EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: projectMemberInviteUpdatedHandler,
38-
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: projectAttachmentAddedHandler,
39-
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: projectAttachmentRemovedHandler,
40-
[EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: projectAttachmentUpdatedHandler,
41-
[EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler,
42-
[EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler,
43-
[EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler,
44-
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_ADDED]: phaseProductAddedHandler,
45-
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_REMOVED]: phaseProductRemovedHandler,
46-
[EVENT.ROUTING_KEY.PROJECT_PHASE_PRODUCT_UPDATED]: phaseProductUpdatedHandler,
23+
'project.initial': projectCreatedHandler, // is only used `seedElasticsearchIndex.js` and can be removed
24+
// [EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: projectCreatedHandler,
25+
// [EVENT.ROUTING_KEY.PROJECT_UPDATED]: projectUpdatedHandler,
26+
// [EVENT.ROUTING_KEY.PROJECT_DELETED]: projectDeletedHandler,
27+
// [EVENT.ROUTING_KEY.PROJECT_MEMBER_ADDED]: projectMemberAddedHandler,
28+
// [EVENT.ROUTING_KEY.PROJECT_MEMBER_REMOVED]: projectMemberRemovedHandler,
29+
// [EVENT.ROUTING_KEY.PROJECT_MEMBER_UPDATED]: projectMemberUpdatedHandler,
30+
// [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_CREATED]: projectMemberInviteCreatedHandler,
31+
// [EVENT.ROUTING_KEY.PROJECT_MEMBER_INVITE_UPDATED]: projectMemberInviteUpdatedHandler,
32+
// [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_ADDED]: projectAttachmentAddedHandler,
33+
// [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_REMOVED]: projectAttachmentRemovedHandler,
34+
// [EVENT.ROUTING_KEY.PROJECT_ATTACHMENT_UPDATED]: projectAttachmentUpdatedHandler,
35+
36+
// project phase handles additionally implement logic for creating associated topics in Message Service
37+
[EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED]: projectPhaseAddedHandler, // index in ES because of cascade updates
38+
[EVENT.ROUTING_KEY.PROJECT_PHASE_REMOVED]: projectPhaseRemovedHandler, // doesn't index in ES
39+
[EVENT.ROUTING_KEY.PROJECT_PHASE_UPDATED]: projectPhaseUpdatedHandler, // index in ES because of cascade updates
4740

4841
// Timeline and milestone
49-
'timeline.initial': timelineAddedHandler,
50-
[EVENT.ROUTING_KEY.TIMELINE_ADDED]: timelineAddedHandler,
51-
[EVENT.ROUTING_KEY.TIMELINE_REMOVED]: timelineRemovedHandler,
52-
[EVENT.ROUTING_KEY.TIMELINE_UPDATED]: timelineUpdatedHandler,
53-
[EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler,
54-
[EVENT.ROUTING_KEY.MILESTONE_REMOVED]: milestoneRemovedHandler,
55-
[EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler,
42+
'timeline.initial': timelineAddedHandler, // is only used `seedElasticsearchIndex.js` and can be removed
43+
[EVENT.ROUTING_KEY.MILESTONE_ADDED]: milestoneAddedHandler, // index in ES because of cascade updates
44+
[EVENT.ROUTING_KEY.MILESTONE_UPDATED]: milestoneUpdatedHandler, // index in ES because of cascade updates
5645
};
5746

5847
export const kafkaHandlers = {

src/events/projectPhases/index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ const projectPhaseUpdatedHandler = Promise.coroutine(function* (logger, msg, cha
243243
* @param {Object} msg event payload
244244
* @returns {undefined}
245245
*/
246-
const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names
246+
const removePhaseFromIndex = Promise.coroutine(function* (logger, msg) { // eslint-disable-line func-names, no-unused-vars
247247
try {
248248
const data = JSON.parse(msg.content.toString());
249249
const phase = _.get(data, 'deleted', {});
@@ -316,7 +316,8 @@ const removeTopics = Promise.coroutine(function* (logger, phase, route) { // esl
316316
*/
317317
const projectPhaseRemovedHandler = Promise.coroutine(function* (logger, msg, channel) { // eslint-disable-line func-names
318318
try {
319-
yield removePhaseFromIndex(logger, msg, channel);
319+
// NOTE We use "project-processor-es" for ES indexing now.
320+
// yield removePhaseFromIndex(logger, msg, channel);
320321
const data = JSON.parse(msg.content.toString());
321322
const phase = _.get(data, 'deleted', {});
322323
const route = _.get(data, 'route');

src/routes/milestones/create.js

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,26 +102,27 @@ module.exports = [
102102
}),
103103
)
104104
.then((otherUpdated) => {
105-
// Do not send events for the updated milestones here,
106-
// because it will make 'version conflict' error in ES.
107-
// The order of the other milestones need to be updated in the MILESTONE_ADDED event handler
108-
109105
// Send event to bus
110106
req.log.debug('Sending event to RabbitMQ bus for milestone %d', result.id);
111107
req.app.services.pubsub.publish(EVENT.ROUTING_KEY.MILESTONE_ADDED,
112108
result,
113109
{ correlationId: req.id },
114110
);
115111

116-
// emit the event
112+
// NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED
113+
// Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it.
117114
util.sendResourceToKafkaBus(
118115
req,
119116
EVENT.ROUTING_KEY.MILESTONE_ADDED,
120117
RESOURCES.MILESTONE,
121118
result);
122119

123-
124-
// emit the event for other milestone order updated
120+
// NOTE So far this logic is implemented in RabbitMQ handler of MILESTONE_ADDED
121+
// Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them.
122+
//
123+
// We don't process these event in "project-processor-es"
124+
// because it will make 'version conflict' error in ES.
125+
// The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler
125126
_.map(otherUpdated, milestone =>
126127
util.sendResourceToKafkaBus(
127128
req,

src/routes/phases/create.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,20 @@ module.exports = [
145145
{ correlationId: req.id },
146146
);
147147

148-
// emit the event
148+
// NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED
149+
// Even though we send this event to the Kafka, the "project-processor-es" shouldn't process it.
149150
util.sendResourceToKafkaBus(
150151
req,
151152
EVENT.ROUTING_KEY.PROJECT_PHASE_ADDED,
152153
RESOURCES.PHASE,
153154
newProjectPhase);
154155

155-
// emit the event for other phase order updated
156+
// NOTE So far this logic is implemented in RabbitMQ handler of PROJECT_PHASE_UPDATED
157+
// Even though we send these events to the Kafka, the "project-processor-es" shouldn't process them.
158+
//
159+
// We don't process these event in "project-processor-es"
160+
// because it will make 'version conflict' error in ES.
161+
// The order of the other milestones need to be updated in the PROJECT_PHASE_UPDATED event handler
156162
_.map(otherUpdated, phase =>
157163
util.sendResourceToKafkaBus(
158164
req,

src/routes/phases/create.spec.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,8 +482,10 @@ describe('Project Phases', () => {
482482
let publishSpy;
483483
let sandbox;
484484

485-
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
486-
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
485+
before((done) => {
486+
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
487+
testUtil.wait(done);
488+
});
487489

488490
beforeEach(async () => {
489491
sandbox = sinon.sandbox.create();

src/routes/phases/delete.spec.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,10 @@ describe('Project Phases', () => {
297297
let publishSpy;
298298
let sandbox;
299299

300-
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
301-
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
300+
before((done) => {
301+
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
302+
testUtil.wait(done);
303+
});
302304

303305
beforeEach(async () => {
304306
sandbox = sinon.sandbox.create();

src/routes/phases/update.spec.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,8 +719,10 @@ describe('Project Phases', () => {
719719
let publishSpy;
720720
let sandbox;
721721

722-
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
723-
before(async () => new Promise(resolve => setTimeout(() => resolve(), 500)));
722+
before((done) => {
723+
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
724+
testUtil.wait(done);
725+
});
724726

725727
beforeEach(async () => {
726728
sandbox = sinon.sandbox.create();

src/routes/works/create.spec.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,19 @@ import _ from 'lodash';
77
import chai from 'chai';
88
import sinon from 'sinon';
99
import request from 'supertest';
10+
import config from 'config';
1011
import models from '../../models';
1112
import server from '../../app';
1213
import testUtil from '../../tests/util';
1314
import busApi from '../../services/busApi';
15+
import messageService from '../../services/messageService';
16+
import RabbitMQService from '../../services/rabbitmq';
17+
import mockRabbitMQ from '../../tests/mockRabbitMQ';
1418
import { BUS_API_EVENT, CONNECT_NOTIFICATION_EVENT, RESOURCES } from '../../constants';
1519

20+
const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName');
21+
const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType');
22+
1623
const should = chai.should();
1724

1825
const validatePhase = (resJson, expectedPhase) => {
@@ -370,5 +377,89 @@ describe('CREATE work', () => {
370377
});
371378
});
372379
});
380+
381+
describe('RabbitMQ Message topic', () => {
382+
let createMessageSpy;
383+
let publishSpy;
384+
let sandbox;
385+
386+
before((done) => {
387+
// Wait for 500ms in order to wait for createEvent calls from previous tests to complete
388+
testUtil.wait(done);
389+
});
390+
391+
beforeEach(async () => {
392+
sandbox = sinon.sandbox.create();
393+
server.services.pubsub = new RabbitMQService(server.logger);
394+
395+
// initialize RabbitMQ
396+
server.services.pubsub.init(
397+
config.get('rabbitmqURL'),
398+
config.get('pubsubExchangeName'),
399+
config.get('pubsubQueueName'),
400+
);
401+
402+
// add project to ES index
403+
await server.services.es.index({
404+
index: ES_PROJECT_INDEX,
405+
type: ES_PROJECT_TYPE,
406+
id: projectId,
407+
body: {
408+
doc: project,
409+
},
410+
});
411+
412+
return new Promise(resolve => setTimeout(() => {
413+
publishSpy = sandbox.spy(server.services.pubsub, 'publish');
414+
createMessageSpy = sandbox.spy(messageService, 'createTopic');
415+
resolve();
416+
}, 500));
417+
});
418+
419+
afterEach(() => {
420+
sandbox.restore();
421+
});
422+
423+
after(() => {
424+
mockRabbitMQ(server);
425+
});
426+
427+
it('should send message topic when work added', (done) => {
428+
const mockHttpClient = _.merge(testUtil.mockHttpClient, {
429+
post: () => Promise.resolve({
430+
status: 200,
431+
data: {
432+
id: 'requesterId',
433+
version: 'v3',
434+
result: {
435+
success: true,
436+
status: 200,
437+
content: {},
438+
},
439+
},
440+
}),
441+
});
442+
sandbox.stub(messageService, 'getClient', () => mockHttpClient);
443+
request(server)
444+
.post(`/v5/projects/${projectId}/workstreams/${workStreamId}/works`)
445+
.set({
446+
Authorization: `Bearer ${testUtil.jwts.connectAdmin}`,
447+
})
448+
.send(body)
449+
.expect(201)
450+
.end((err) => {
451+
if (err) {
452+
done(err);
453+
} else {
454+
testUtil.wait(() => {
455+
publishSpy.calledOnce.should.be.true;
456+
publishSpy.calledWith('project.phase.added').should.be.true;
457+
createMessageSpy.calledTwice.should.be.true;
458+
done();
459+
});
460+
}
461+
});
462+
});
463+
});
373464
});
374465
});

0 commit comments

Comments
 (0)