Skip to content

Commit d33a2da

Browse files
oehrleinsaadmk11
andauthored
Add atomic/zero-downtime index rebuilding (django-es#358)
* Added atomic/zero-downtime index rebuilding * Update django_elasticsearch_dsl/management/commands/search_index.py Co-authored-by: Maksudul Haque <[email protected]> * Update django_elasticsearch_dsl/management/commands/search_index.py Co-authored-by: Maksudul Haque <[email protected]> * Update django_elasticsearch_dsl/management/commands/search_index.py Co-authored-by: Maksudul Haque <[email protected]> * Update django_elasticsearch_dsl/management/commands/search_index.py Co-authored-by: Maksudul Haque <[email protected]> * Update django_elasticsearch_dsl/management/commands/search_index.py Co-authored-by: Maksudul Haque <[email protected]> * Added index name slice to ensure index name length does not exceed 255 characters * Updated naming of flags * Updated arg help messages for '--use-alias' and '--use-alias-keep-index' * Updated 'indexes' to 'indices' for verbiage consistency * Simplified stdout messaging * Made adjustments for better switching between using aliases and not * Updated super() call for compatibility with Python 2 * Simplified logic for stdout messaging * Fixed test_custom_generate_id_is_called test * Added comment explaining need for aliases regardless of using the '--use-alias' arg * Moved alias indices deletion for 'rebuild' without aliases to _delete() * Added 'return False' in case of undeleted index or alias in _delete() * Added _matches() to DocType for alias pattern matching Co-authored-by: Maksudul Haque <[email protected]>
1 parent 278a0d0 commit d33a2da

File tree

2 files changed

+172
-14
lines changed

2 files changed

+172
-14
lines changed

django_elasticsearch_dsl/documents.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import unicode_literals
22

33
from collections import deque
4+
from fnmatch import fnmatch
45
from functools import partial
56

67
from django import VERSION as DJANGO_VERSION
@@ -66,6 +67,18 @@ def __eq__(self, other):
6667
def __hash__(self):
6768
return id(self)
6869

70+
@classmethod
71+
def _matches(cls, hit):
72+
"""
73+
Determine which index or indices in a pattern to be used in a hit.
74+
Overrides DSLDocument _matches function to match indices in a pattern,
75+
which is needed in case of using aliases. This is needed as the
76+
document class will not be used to deserialize the documents. The
77+
documents will have the index set to the concrete index, whereas the
78+
class refers to the alias.
79+
"""
80+
return fnmatch(hit.get("_index", ""), cls._index._name + "*")
81+
6982
@classmethod
7083
def search(cls, using=None, index=None):
7184
return Search(

django_elasticsearch_dsl/management/commands/search_index.py

Lines changed: 159 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import unicode_literals, absolute_import
2+
from datetime import datetime
23

4+
from elasticsearch_dsl import connections
35
from django.conf import settings
46
from django.core.management.base import BaseCommand, CommandError
57
from six.moves import input
@@ -9,6 +11,10 @@
911
class Command(BaseCommand):
1012
help = 'Manage elasticsearch index.'
1113

14+
def __init__(self, *args, **kwargs):
15+
super(Command, self).__init__(*args, **kwargs)
16+
self.es_conn = connections.get_connection()
17+
1218
def add_arguments(self, parser):
1319
parser.add_argument(
1420
'--models',
@@ -63,6 +69,21 @@ def add_arguments(self, parser):
6369
dest='parallel',
6470
help='Run populate/rebuild update single threaded'
6571
)
72+
parser.add_argument(
73+
'--use-alias',
74+
action='store_true',
75+
dest='use_alias',
76+
help='Use alias with indices'
77+
)
78+
parser.add_argument(
79+
'--use-alias-keep-index',
80+
action='store_true',
81+
dest='use_alias_keep_index',
82+
help="""
83+
Do not delete replaced indices when used with '--rebuild' and
84+
'--use-alias' args
85+
"""
86+
)
6687
parser.set_defaults(parallel=getattr(settings, 'ELASTICSEARCH_DSL_PARALLEL', False))
6788
parser.add_argument(
6889
'--refresh',
@@ -107,10 +128,18 @@ def _get_models(self, args):
107128

108129
return set(models)
109130

110-
def _create(self, models, options):
131+
def _create(self, models, aliases, options):
111132
for index in registry.get_indices(models):
112-
self.stdout.write("Creating index '{}'".format(index._name))
113-
index.create()
133+
alias_exists = index._name in aliases
134+
if not alias_exists:
135+
self.stdout.write("Creating index '{}'".format(index._name))
136+
index.create()
137+
elif options['action'] == 'create':
138+
self.stdout.write(
139+
"'{}' already exists as an alias. Run '--delete' with"
140+
" '--use-alias' arg to delete indices pointed at the "
141+
"alias to make index name available.".format(index._name)
142+
)
114143

115144
def _populate(self, models, options):
116145
parallel = options['parallel']
@@ -123,29 +152,138 @@ def _populate(self, models, options):
123152
qs = doc().get_indexing_queryset()
124153
doc().update(qs, parallel=parallel, refresh=options['refresh'])
125154

126-
def _delete(self, models, options):
155+
def _get_alias_indices(self, alias):
156+
alias_indices = self.es_conn.indices.get_alias(name=alias)
157+
return list(alias_indices.keys())
158+
159+
def _delete_alias_indices(self, alias):
160+
alias_indices = self._get_alias_indices(alias)
161+
alias_delete_actions = [
162+
{"remove_index": {"index": index}} for index in alias_indices
163+
]
164+
self.es_conn.indices.update_aliases({"actions": alias_delete_actions})
165+
for index in alias_indices:
166+
self.stdout.write("Deleted index '{}'".format(index))
167+
168+
def _delete(self, models, aliases, options):
127169
index_names = [index._name for index in registry.get_indices(models)]
128170

129171
if not options['force']:
130172
response = input(
131173
"Are you sure you want to delete "
132-
"the '{}' indexes? [y/N]: ".format(", ".join(index_names)))
174+
"the '{}' indices? [y/N]: ".format(", ".join(index_names)))
133175
if response.lower() != 'y':
134176
self.stdout.write('Aborted')
135177
return False
136178

137-
for index in registry.get_indices(models):
138-
self.stdout.write("Deleting index '{}'".format(index._name))
139-
index.delete(ignore=404)
179+
if options['use_alias']:
180+
for index in index_names:
181+
alias_exists = index in aliases
182+
if alias_exists:
183+
self._delete_alias_indices(index)
184+
elif self.es_conn.indices.exists(index=index):
185+
self.stdout.write(
186+
"'{}' refers to an index, not an alias. Run "
187+
"'--delete' without '--use-alias' arg to delete "
188+
"index.".format(index)
189+
)
190+
return False
191+
else:
192+
for index in registry.get_indices(models):
193+
alias_exists = index._name in aliases
194+
if not alias_exists:
195+
self.stdout.write("Deleting index '{}'".format(index._name))
196+
index.delete(ignore=404)
197+
elif options['action'] == 'rebuild':
198+
self._delete_alias_indices(index._name)
199+
elif options['action'] == 'delete':
200+
self.stdout.write(
201+
"'{}' refers to an alias, not an index. Run "
202+
"'--delete' with '--use-alias' arg to delete indices "
203+
"pointed at the alias.".format(index._name)
204+
)
205+
return False
206+
140207
return True
141208

142-
def _rebuild(self, models, options):
143-
if not self._delete(models, options):
209+
def _update_alias(self, alias, new_index, alias_exists, options):
210+
alias_actions = [{"add": {"alias": alias, "index": new_index}}]
211+
212+
delete_existing_index = False
213+
if not alias_exists and self.es_conn.indices.exists(index=alias):
214+
# Elasticsearch will return an error if an index already
215+
# exists with the desired alias name. Therefore, we need to
216+
# delete that index.
217+
delete_existing_index = True
218+
alias_actions.append({"remove_index": {"index": alias}})
219+
220+
old_indices = []
221+
alias_delete_actions = []
222+
if alias_exists:
223+
# Elasticsearch will return an error if we search for
224+
# indices by alias but the alias doesn't exist. Therefore,
225+
# we want to be sure the alias exists.
226+
old_indices = self._get_alias_indices(alias)
227+
alias_actions.append(
228+
{"remove": {"alias": alias, "indices": old_indices}}
229+
)
230+
alias_delete_actions = [
231+
{"remove_index": {"index": index}} for index in old_indices
232+
]
233+
234+
self.es_conn.indices.update_aliases({"actions": alias_actions})
235+
if delete_existing_index:
236+
self.stdout.write("Deleted index '{}'".format(alias))
237+
238+
self.stdout.write(
239+
"Added alias '{}' to index '{}'".format(alias, new_index)
240+
)
241+
242+
if old_indices:
243+
for index in old_indices:
244+
self.stdout.write(
245+
"Removed alias '{}' from index '{}'".format(alias, index)
246+
)
247+
248+
if alias_delete_actions and not options['use_alias_keep_index']:
249+
self.es_conn.indices.update_aliases(
250+
{"actions": alias_delete_actions}
251+
)
252+
for index in old_indices:
253+
self.stdout.write("Deleted index '{}'".format(index))
254+
255+
def _rebuild(self, models, aliases, options):
256+
if (not options['use_alias']
257+
and not self._delete(models, aliases, options)):
144258
return
145259

146-
self._create(models, options)
260+
if options['use_alias']:
261+
alias_index_pairs = []
262+
index_suffix = "-" + datetime.now().strftime("%Y%m%d%H%M%S%f")
263+
for index in registry.get_indices(models):
264+
# The alias takes the original index name value. The
265+
# index name sent to Elasticsearch will be the alias
266+
# plus the suffix from above. In addition, the index
267+
# name needs to be limited to 255 characters, of which
268+
# 21 will always be taken by the suffix, leaving 234
269+
# characters from the original index name value.
270+
new_index = index._name[:234] + index_suffix
271+
alias_index_pairs.append(
272+
{'alias': index._name, 'index': new_index}
273+
)
274+
index._name = new_index
275+
276+
self._create(models, aliases, options)
147277
self._populate(models, options)
148278

279+
if options['use_alias']:
280+
for alias_index_pair in alias_index_pairs:
281+
alias = alias_index_pair['alias']
282+
alias_exists = alias in aliases
283+
self._update_alias(
284+
alias, alias_index_pair['index'], alias_exists, options
285+
)
286+
149287
def handle(self, *args, **options):
150288
if not options['action']:
151289
raise CommandError(
@@ -156,14 +294,21 @@ def handle(self, *args, **options):
156294
action = options['action']
157295
models = self._get_models(options['models'])
158296

297+
# We need to know if and which aliases exist to mitigate naming
298+
# conflicts with indices, therefore this is needed regardless
299+
# of using the '--use-alias' arg.
300+
aliases = []
301+
for index in self.es_conn.indices.get_alias().values():
302+
aliases += index['aliases'].keys()
303+
159304
if action == 'create':
160-
self._create(models, options)
305+
self._create(models, aliases, options)
161306
elif action == 'populate':
162307
self._populate(models, options)
163308
elif action == 'delete':
164-
self._delete(models, options)
309+
self._delete(models, aliases, options)
165310
elif action == 'rebuild':
166-
self._rebuild(models, options)
311+
self._rebuild(models, aliases, options)
167312
else:
168313
raise CommandError(
169314
"Invalid action. Must be one of"

0 commit comments

Comments
 (0)