Skip to content

Commit dc63758

Browse files
author
sachin-maheshwari
authored
Merge pull request #514 from yoution/feature/shapeup4-cqrs-update
feat: add transaction for the rest models
2 parents 12ba53a + 08a29fd commit dc63758

File tree

8 files changed

+513
-64
lines changed

8 files changed

+513
-64
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Interview Processor
3+
*/
4+
5+
const _ = require('lodash')
6+
const helper = require('../common/helper')
7+
const config = require('config')
8+
9+
const esClient = helper.getESClient()
10+
11+
/**
12+
* Updates jobCandidate via a painless script
13+
*
14+
* @param {String} jobCandidateId job candidate id
15+
* @param {String} script script definition
16+
*/
17+
async function updateJobCandidateViaScript (jobCandidateId, script) {
18+
await esClient.update({
19+
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
20+
id: jobCandidateId,
21+
body: { script },
22+
refresh: 'wait_for'
23+
})
24+
}
25+
26+
/**
27+
* Process request interview entity.
28+
* Creates an interview record under jobCandidate.
29+
*
30+
* @param {Object} interview interview object
31+
*/
32+
async function processRequestInterview (interview) {
33+
// add interview in collection if there's already an existing collection
34+
// or initiate a new one with this interview
35+
const script = {
36+
source: `
37+
ctx._source.containsKey("interviews")
38+
? ctx._source.interviews.add(params.interview)
39+
: ctx._source.interviews = [params.interview]
40+
`,
41+
params: { interview }
42+
}
43+
await updateJobCandidateViaScript(interview.jobCandidateId, script)
44+
}
45+
46+
/**
47+
* Process update interview entity
48+
* Updates the interview record under jobCandidate.
49+
*
50+
* @param {Object} interview interview object
51+
*/
52+
async function processUpdateInterview (interview) {
53+
// if there's an interview with this id,
54+
// update it
55+
const script = {
56+
source: `
57+
if (ctx._source.containsKey("interviews")) {
58+
def target = ctx._source.interviews.find(i -> i.id == params.interview.id);
59+
if (target != null) {
60+
for (prop in params.interview.entrySet()) {
61+
target[prop.getKey()] = prop.getValue()
62+
}
63+
}
64+
}
65+
`,
66+
params: { interview }
67+
}
68+
await updateJobCandidateViaScript(interview.jobCandidateId, script)
69+
}
70+
71+
/**
72+
* Process bulk (partially) update interviews entity.
73+
* Currently supports status, updatedAt and updatedBy fields.
74+
* Update Joi schema to allow more fields.
75+
* (implementation should already handle new fields - just updating Joi schema should be enough)
76+
*
77+
* payload format:
78+
* {
79+
* "jobCandidateId": {
80+
* "interviewId": { ...fields },
81+
* "interviewId2": { ...fields },
82+
* ...
83+
* },
84+
* "jobCandidateId2": { // like above... },
85+
* ...
86+
* }
87+
*
88+
* @param {Object} jobCandidates job candidates
89+
*/
90+
async function processBulkUpdateInterviews (jobCandidates) {
91+
// script to update & params
92+
const script = {
93+
source: `
94+
def completedInterviews = params.jobCandidates[ctx._id];
95+
for (interview in completedInterviews.entrySet()) {
96+
def interviewId = interview.getKey();
97+
def affectedFields = interview.getValue();
98+
def target = ctx._source.interviews.find(i -> i.id == interviewId);
99+
if (target != null) {
100+
for (field in affectedFields.entrySet()) {
101+
target[field.getKey()] = field.getValue();
102+
}
103+
}
104+
}
105+
`,
106+
params: { jobCandidates }
107+
}
108+
// update interviews
109+
await esClient.updateByQuery({
110+
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
111+
body: {
112+
script,
113+
query: {
114+
ids: {
115+
values: _.keys(jobCandidates)
116+
}
117+
}
118+
},
119+
refresh: true
120+
})
121+
}
122+
123+
module.exports = {
124+
processRequestInterview,
125+
processUpdateInterview,
126+
processBulkUpdateInterviews
127+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Jobcandidate Processor
3+
*/
4+
5+
const config = require('config')
6+
const helper = require('../common/helper')
7+
8+
const esClient = helper.getESClient()
9+
10+
/**
11+
* Process create entity
12+
* @param {Object} entity entity object
13+
*/
14+
async function processCreate (entity) {
15+
await esClient.create({
16+
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
17+
id: entity.id,
18+
body: entity,
19+
refresh: 'wait_for'
20+
})
21+
}
22+
23+
/**
24+
* Process update entity
25+
* @param {Object} entity entity object
26+
*/
27+
async function processUpdate (entity) {
28+
await esClient.update({
29+
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
30+
id: entity.id,
31+
body: {
32+
doc: entity
33+
},
34+
refresh: 'wait_for'
35+
})
36+
}
37+
38+
/**
39+
* Process delete entity
40+
* @param {Object} entity entity object
41+
*/
42+
async function processDelete (entity) {
43+
await esClient.delete({
44+
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
45+
id: entity.id,
46+
refresh: 'wait_for'
47+
})
48+
}
49+
50+
module.exports = {
51+
processCreate,
52+
processUpdate,
53+
processDelete
54+
}

src/esProcessors/JobProcessor.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Job Processor
3+
*/
4+
5+
const helper = require('../common/helper')
6+
const config = require('config')
7+
8+
const esClient = helper.getESClient()
9+
10+
/**
11+
* Process create entity
12+
* @param {Object} entity entity object
13+
*/
14+
async function processCreate (entity) {
15+
await esClient.create({
16+
index: config.get('esConfig.ES_INDEX_JOB'),
17+
id: entity.id,
18+
body: entity,
19+
refresh: 'wait_for'
20+
})
21+
}
22+
23+
/**
24+
* Process update entity
25+
* @param {Object} entity entity object
26+
*/
27+
async function processUpdate (entity) {
28+
await esClient.update({
29+
index: config.get('esConfig.ES_INDEX_JOB'),
30+
id: entity.id,
31+
body: {
32+
doc: entity
33+
},
34+
refresh: 'wait_for'
35+
})
36+
}
37+
38+
/**
39+
* Process delete entity
40+
* @param {Object} entity entity object
41+
*/
42+
async function processDelete (entity) {
43+
await esClient.delete({
44+
index: config.get('esConfig.ES_INDEX_JOB'),
45+
id: entity.id,
46+
refresh: 'wait_for'
47+
})
48+
}
49+
50+
module.exports = {
51+
processCreate,
52+
processUpdate,
53+
processDelete
54+
}

src/esProcessors/RoleProcessor.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Role Processor
3+
*/
4+
5+
const helper = require('../common/helper')
6+
const config = require('config')
7+
8+
const esClient = helper.getESClient()
9+
10+
/**
11+
* Process create entity
12+
* @param {Object} entity entity object
13+
*/
14+
async function processCreate (entity) {
15+
await esClient.create({
16+
index: config.get('esConfig.ES_INDEX_ROLE'),
17+
id: entity.id,
18+
body: entity,
19+
refresh: 'wait_for'
20+
})
21+
}
22+
23+
/**
24+
* Process update entity
25+
* @param {Object} entity entity object
26+
*/
27+
async function processUpdate (entity) {
28+
await esClient.update({
29+
index: config.get('esConfig.ES_INDEX_ROLE'),
30+
id: entity.id,
31+
body: {
32+
doc: entity
33+
},
34+
refresh: 'wait_for'
35+
})
36+
}
37+
38+
/**
39+
* Process delete entity
40+
* @param {Object} entity entity object
41+
*/
42+
async function processDelete (entity) {
43+
await esClient.delete({
44+
index: config.get('esConfig.ES_INDEX_ROLE'),
45+
id: entity.id,
46+
refresh: 'wait_for'
47+
})
48+
}
49+
50+
module.exports = {
51+
processCreate,
52+
processUpdate,
53+
processDelete
54+
}

0 commit comments

Comments
 (0)