@@ -7,77 +7,11 @@ const config = require('config')
7
7
const elasticsearch = require ( 'elasticsearch' )
8
8
const _ = require ( 'lodash' )
9
9
const Joi = require ( '@hapi/joi' )
10
- const Mutex = require ( 'async-mutex' ) . Mutex
11
10
12
11
AWS . config . region = config . ES . AWS_REGION
13
12
14
13
// Elasticsearch client
15
- let esClient = {
16
- mutex : new Mutex ( ) ,
17
- client : null ,
18
-
19
- ctor ( client ) {
20
- this . client = client
21
- this . indices = {
22
- exists : client . indices . exists . bind ( client . indices ) ,
23
- putMapping : client . indices . putMapping . bind ( client . indices ) ,
24
- create : client . indices . create . bind ( client . indices ) ,
25
- delete : client . indices . delete . bind ( client . indices )
26
- }
27
- } ,
28
-
29
- async create ( ) {
30
- const release = await this . mutex . acquire ( )
31
-
32
- try {
33
- return this . client . create ( ...arguments )
34
- } finally {
35
- release ( )
36
- }
37
- } ,
38
-
39
- async update ( ) {
40
- const release = await this . mutex . acquire ( )
41
-
42
- try {
43
- return this . client . update ( ...arguments )
44
- } finally {
45
- release ( )
46
- }
47
- } ,
48
-
49
- async delete ( ) {
50
- const release = await this . mutex . acquire ( )
51
-
52
- try {
53
- return this . client . delete ( ...arguments )
54
- } finally {
55
- release ( )
56
- }
57
- } ,
58
-
59
- async getSource ( ) {
60
- const release = await this . mutex . acquire ( )
61
-
62
- try {
63
- return this . client . getSource ( ...arguments )
64
- } finally {
65
- release ( )
66
- }
67
- } ,
68
-
69
- async get ( ) {
70
- const release = await this . mutex . acquire ( )
71
-
72
- try {
73
- return this . client . get ( ...arguments )
74
- } finally {
75
- release ( )
76
- }
77
- } ,
78
-
79
- indices : undefined
80
- }
14
+ let esClient
81
15
82
16
/**
83
17
* Get Kafka options
@@ -96,7 +30,7 @@ function getKafkaOptions () {
96
30
* @return {Object } Elasticsearch Client Instance
97
31
*/
98
32
async function getESClient ( ) {
99
- if ( esClient . client ) {
33
+ if ( esClient ) {
100
34
return esClient
101
35
}
102
36
const host = config . ES . HOST
@@ -105,17 +39,17 @@ async function getESClient () {
105
39
// AWS ES configuration is different from other providers
106
40
if ( / .* a m a z o n a w s .* / . test ( host ) ) {
107
41
try {
108
- esClient . ctor ( new elasticsearch . Client ( {
42
+ esClient = new elasticsearch . Client ( {
109
43
apiVersion,
110
44
host,
111
45
connectionClass : require ( 'http-aws-es' ) // eslint-disable-line global-require
112
- } ) )
46
+ } )
113
47
} catch ( error ) { console . log ( error ) }
114
48
} else {
115
- esClient . ctor ( new elasticsearch . Client ( {
49
+ esClient = new elasticsearch . Client ( {
116
50
apiVersion,
117
51
host
118
- } ) )
52
+ } )
119
53
}
120
54
return esClient
121
55
}
0 commit comments