Skip to content

Commit 9a07177

Browse files
committed
[Fix readthedocs#4333] Implement asynchronous search reindex functionality using celery
1 parent d6638b9 commit 9a07177

File tree

9 files changed

+255
-78
lines changed

9 files changed

+255
-78
lines changed

readthedocs/core/management/commands/reindex_elasticsearch.py

Lines changed: 0 additions & 56 deletions
This file was deleted.

readthedocs/search/documents.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from elasticsearch_dsl.query import SimpleQueryString, Bool
44

55
from readthedocs.projects.models import Project, HTMLFile
6-
from .conf import SEARCH_EXCLUDED_FILE
7-
86
from readthedocs.search.faceted_search import ProjectSearch, FileSearch
7+
from .conf import SEARCH_EXCLUDED_FILE
8+
from .mixins import RTDDocTypeMixin
99

1010
project_conf = settings.ES_INDEXES['project']
1111
project_index = Index(project_conf['name'])
@@ -17,7 +17,7 @@
1717

1818

1919
@project_index.doc_type
20-
class ProjectDocument(DocType):
20+
class ProjectDocument(RTDDocTypeMixin, DocType):
2121

2222
class Meta(object):
2323
model = Project
@@ -47,7 +47,7 @@ def faceted_search(cls, query, language=None, using=None, index=None):
4747

4848

4949
@page_index.doc_type
50-
class PageDocument(DocType):
50+
class PageDocument(RTDDocTypeMixin, DocType):
5151

5252
class Meta(object):
5353
model = HTMLFile
@@ -121,21 +121,3 @@ def get_queryset(self):
121121
queryset = (queryset.filter(project__documentation_type='sphinx')
122122
.exclude(name__in=SEARCH_EXCLUDED_FILE))
123123
return queryset
124-
125-
def update(self, thing, refresh=None, action='index', **kwargs):
126-
"""Overwrite in order to index only certain files"""
127-
# Object not exist in the provided queryset should not be indexed
128-
# TODO: remove this overwrite when the issue has been fixed
129-
# See below link for more information
130-
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
131-
# Moreover, do not need to check if its a delete action
132-
# Because while delete action, the object is already remove from database
133-
if isinstance(thing, HTMLFile) and action != 'delete':
134-
# Its a model instance.
135-
queryset = self.get_queryset()
136-
obj = queryset.filter(pk=thing.pk)
137-
if not obj.exists():
138-
return None
139-
140-
return super(PageDocument, self).update(thing=thing, refresh=refresh,
141-
action=action, **kwargs)

readthedocs/search/management/__init__.py

Whitespace-only changes.

readthedocs/search/management/commands/__init__.py

Whitespace-only changes.
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import datetime
2+
import logging
3+
4+
from celery import chord, chain
5+
from django.apps import apps
6+
from django.conf import settings
7+
from django.core.management import BaseCommand
8+
from django_elasticsearch_dsl.registries import registry
9+
10+
from ...tasks import (index_objects_to_es_task, switch_es_index_task, create_new_es_index_task,
11+
index_missing_objects_task)
12+
from ...utils import chunks
13+
14+
log = logging.getLogger(__name__)
15+
16+
17+
class Command(BaseCommand):
18+
19+
@staticmethod
20+
def _get_indexing_tasks(app_label, model_name, instance_ids, document_class, index_name):
21+
chunk_objects = chunks(instance_ids, settings.ES_TASK_CHUNK_SIZE)
22+
23+
for chunk in chunk_objects:
24+
data = {
25+
'app_label': app_label,
26+
'model_name': model_name,
27+
'document_class': document_class,
28+
'index_name': index_name,
29+
'objects_id': chunk
30+
}
31+
yield index_objects_to_es_task.si(**data)
32+
33+
def _run_reindex_tasks(self, models):
34+
for doc in registry.get_documents(models):
35+
qs = doc().get_queryset()
36+
instance_ids = list(qs.values_list('id', flat=True))
37+
38+
app_label = qs.model._meta.app_label
39+
model_name = qs.model.__name__
40+
41+
old_index_name = doc._doc_type.index
42+
timestamp_prefix = 'temp-{}-'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
43+
new_index_name = timestamp_prefix + old_index_name
44+
45+
pre_index_task = create_new_es_index_task.si(app_label=app_label,
46+
model_name=model_name,
47+
old_index_name=old_index_name,
48+
new_index_name=new_index_name)
49+
50+
indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
51+
instance_ids=instance_ids, document_class=str(doc),
52+
index_name=new_index_name)
53+
54+
post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name,
55+
old_index_name=old_index_name,
56+
new_index_name=new_index_name)
57+
58+
# Task to run in order to add the objects
59+
# that has been inserted into database while indexing_tasks was running
60+
missed_index_task = index_missing_objects_task.si(app_label=app_label,
61+
model_name=model_name,
62+
document_class=str(doc),
63+
indexed_instance_ids=instance_ids)
64+
65+
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
66+
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
67+
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
68+
chain(pre_index_task, chord_tasks, missed_index_task).apply_async()
69+
70+
message = "Successfully issued tasks for {}.{}".format(app_label, model_name)
71+
log.info(message)
72+
73+
@staticmethod
74+
def _get_models(args):
75+
for model_name in args:
76+
yield apps.get_model(model_name)
77+
78+
def add_arguments(self, parser):
79+
parser.add_argument(
80+
'--models',
81+
dest='models',
82+
type=str,
83+
nargs='*',
84+
help=("Specify the model to be updated in elasticsearch."
85+
"The format is <app_label>.<model_name>")
86+
)
87+
88+
def handle(self, *args, **options):
89+
models = None
90+
if options['models']:
91+
models = self._get_models(options['models'])
92+
93+
self._run_reindex_tasks(models=models)

readthedocs/search/mixins.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from django.db import models
2+
from django.core.paginator import Paginator
3+
4+
5+
class RTDDocTypeMixin(object):
6+
"""Override some methods of DocType of DED
7+
8+
Changelog as following:
9+
- Do not index object that not exist in the provided queryset
10+
- Take additional argument in update method `index_name` to update specific index
11+
12+
Issues:
13+
- https://github.com/sabricot/django-elasticsearch-dsl/issues/111
14+
"""
15+
16+
def _prepare_action(self, object_instance, action, index_name=None):
17+
return {
18+
'_op_type': action,
19+
'_index': index_name or str(self._doc_type.index),
20+
'_type': self._doc_type.mapping.doc_type,
21+
'_id': object_instance.pk,
22+
'_source': (
23+
self.prepare(object_instance) if action != 'delete' else None
24+
),
25+
}
26+
27+
def _get_actions(self, object_list, action, index_name=None):
28+
if self._doc_type.queryset_pagination is not None:
29+
paginator = Paginator(
30+
object_list, self._doc_type.queryset_pagination
31+
)
32+
for page in paginator.page_range:
33+
for object_instance in paginator.page(page).object_list:
34+
yield self._prepare_action(object_instance, action, index_name)
35+
else:
36+
for object_instance in object_list:
37+
yield self._prepare_action(object_instance, action, index_name)
38+
39+
def update(self, thing, refresh=None, action='index', index_name=None, **kwargs):
40+
"""
41+
Update each document in ES for a model, iterable of models or queryset
42+
"""
43+
if refresh is True or (
44+
refresh is None and self._doc_type.auto_refresh
45+
):
46+
kwargs['refresh'] = True
47+
48+
# TODO: remove this overwrite when the issue has been fixed
49+
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
50+
# Moreover, do not need to check if its a delete action
51+
# Because while delete action, the object is already remove from database
52+
if isinstance(thing, models.Model) and action != 'delete':
53+
# Its a model instance.
54+
queryset = self.get_queryset()
55+
obj = queryset.filter(pk=thing.pk)
56+
if not obj.exists():
57+
return None
58+
59+
object_list = [thing]
60+
else:
61+
object_list = thing
62+
63+
return self.bulk(
64+
self._get_actions(object_list, action, index_name=index_name), **kwargs
65+
)

readthedocs/search/tasks.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import logging
2+
3+
from django.apps import apps
4+
from django_elasticsearch_dsl.registries import registry
5+
6+
from readthedocs.worker import app
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
def _get_index(indices, index_name):
12+
for index in indices:
13+
if str(index) == index_name:
14+
return index
15+
16+
17+
def _get_document(model, document_class):
18+
documents = registry.get_documents(models=[model])
19+
20+
for document in documents:
21+
if str(document) == document_class:
22+
return document
23+
24+
25+
@app.task(queue='web')
26+
def create_new_es_index_task(app_label, model_name, old_index_name, new_index_name):
27+
model = apps.get_model(app_label, model_name)
28+
indices = registry.get_indices(models=[model])
29+
old_index = _get_index(indices=indices, index_name=old_index_name)
30+
new_index = old_index.clone(name=new_index_name)
31+
new_index.create()
32+
33+
34+
@app.task(queue='web')
35+
def switch_es_index_task(app_label, model_name, old_index_name, new_index_name):
36+
model = apps.get_model(app_label, model_name)
37+
indices = registry.get_indices(models=[model])
38+
old_index = _get_index(indices=indices, index_name=old_index_name)
39+
40+
new_index = old_index.clone(name=new_index_name)
41+
42+
if old_index.exists():
43+
# Alias can not be used to delete an index.
44+
# https://www.elastic.co/guide/en/elasticsearch/reference/6.0/indices-delete-index.html
45+
# So get the index actual name to delete it
46+
old_index_info = old_index.get()
47+
# The info is a dictionary and the key is the actual name of the index
48+
old_index_name = old_index_info.keys()[0]
49+
old_index.connection.indices.delete(index=old_index_name)
50+
51+
new_index.put_alias(name=old_index_name)
52+
53+
54+
@app.task(queue='web')
55+
def index_objects_to_es_task(app_label, model_name, document_class, index_name, objects_id):
56+
model = apps.get_model(app_label, model_name)
57+
document = _get_document(model=model, document_class=document_class)
58+
59+
# Use queryset from model as the ids are specific
60+
queryset = model.objects.all().filter(id__in=objects_id).iterator()
61+
log.info("Indexing model: {}, id:'{}'".format(model.__name__, objects_id))
62+
document().update(queryset, index_name=index_name)
63+
64+
65+
@app.task(queue='web')
66+
def index_missing_objects_task(app_label, model_name, document_class, indexed_instance_ids):
67+
"""
68+
Task to insure that none of the object is missed from indexing.
69+
70+
The object ids are sent to task for indexing.
71+
But in the meantime, new objects can be created/deleted in database
72+
and they will not be in the tasks.
73+
This task will index all the objects excluding the ones which have got indexed already
74+
"""
75+
76+
model = apps.get_model(app_label, model_name)
77+
document = _get_document(model=model, document_class=document_class)
78+
queryset = document().get_queryset().exclude(id__in=indexed_instance_ids)
79+
document().update(queryset.iterator())
80+
81+
log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__))
82+
83+
# TODO: Figure out how to remove the objects from ES index that has been deleted

readthedocs/search/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,3 +321,11 @@ def get_project_list_or_404(project_slug, user):
321321

322322
project_list = list(subprojects) + [project]
323323
return project_list
324+
325+
326+
def chunks(elements, chunk_size):
327+
"""Yield successive n-sized chunks from l."""
328+
# Taken from https://stackoverflow.com/a/312464
329+
# licensed under cc by-sa 3.0
330+
for i in range(0, len(elements), chunk_size):
331+
yield elements[i:i + chunk_size]

readthedocs/settings/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ def USE_PROMOS(self): # noqa
320320
'hosts': '127.0.0.1:9200'
321321
},
322322
}
323+
# Chunk size for elasticsearch reindex celery tasks
324+
ES_TASK_CHUNK_SIZE = 100
323325

324326
# ANALYZER = 'analysis': {
325327
# 'analyzer': {

0 commit comments

Comments
 (0)