From 8fc3b65fd090d91ed5422af26afb056709c1306f Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Sat, 14 Jul 2018 05:10:11 +0600 Subject: [PATCH 01/15] [Fix #4333] Implement asynchronous search reindex functionality using celery --- .../commands/reindex_elasticsearch.py | 58 ------------ readthedocs/search/documents.py | 26 +----- readthedocs/search/management/__init__.py | 0 .../search/management/commands/__init__.py | 0 .../commands/reindex_elasticsearch.py | 93 +++++++++++++++++++ readthedocs/search/mixins.py | 65 +++++++++++++ readthedocs/search/tasks.py | 83 +++++++++++++++++ readthedocs/search/utils.py | 8 ++ readthedocs/settings/base.py | 2 + 9 files changed, 255 insertions(+), 80 deletions(-) delete mode 100644 readthedocs/core/management/commands/reindex_elasticsearch.py create mode 100644 readthedocs/search/management/__init__.py create mode 100644 readthedocs/search/management/commands/__init__.py create mode 100644 readthedocs/search/management/commands/reindex_elasticsearch.py create mode 100644 readthedocs/search/mixins.py create mode 100644 readthedocs/search/tasks.py diff --git a/readthedocs/core/management/commands/reindex_elasticsearch.py b/readthedocs/core/management/commands/reindex_elasticsearch.py deleted file mode 100644 index 7a5f25a065a..00000000000 --- a/readthedocs/core/management/commands/reindex_elasticsearch.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Reindex Elastic Search indexes""" - -from __future__ import absolute_import -import logging -from optparse import make_option - -from django.core.management.base import BaseCommand -from django.core.management.base import CommandError -from django.conf import settings - -from readthedocs.builds.constants import LATEST -from readthedocs.builds.models import Version -from readthedocs.projects.tasks import update_search - -log = logging.getLogger(__name__) - - -class Command(BaseCommand): - - help = __doc__ - - def add_arguments(self, parser): - parser.add_argument( - '-p', - dest='project', - default='', - help='Project to index' - ) - - def handle(self, *args, **options): - """Build/index all versions or a single project's version""" - project = options['project'] - - queryset = Version.objects.all() - - if project: - queryset = queryset.filter(project__slug=project) - if not queryset.exists(): - raise CommandError( - 'No project with slug: {slug}'.format(slug=project)) - log.info("Building all versions for %s", project) - elif getattr(settings, 'INDEX_ONLY_LATEST', True): - queryset = queryset.filter(slug=LATEST) - - for version in queryset: - log.info("Reindexing %s", version) - try: - commit = version.project.vcs_repo(version.slug).commit - except: # noqa - # An exception can be thrown here in production, but it's not - # documented what the exception here is - commit = None - - try: - update_search(version.pk, commit, - delete_non_commit_files=False) - except Exception as e: - log.exception('Reindex failed for %s, %s', version, e) diff --git a/readthedocs/search/documents.py b/readthedocs/search/documents.py index 3cc70343c35..bab26ef506d 100644 --- a/readthedocs/search/documents.py +++ b/readthedocs/search/documents.py @@ -3,9 +3,9 @@ from elasticsearch_dsl.query import SimpleQueryString, Bool from readthedocs.projects.models import Project, HTMLFile -from .conf import SEARCH_EXCLUDED_FILE - from readthedocs.search.faceted_search import ProjectSearch, FileSearch +from .conf import SEARCH_EXCLUDED_FILE +from .mixins import RTDDocTypeMixin project_conf = settings.ES_INDEXES['project'] project_index = Index(project_conf['name']) @@ -17,7 +17,7 @@ @project_index.doc_type -class ProjectDocument(DocType): +class ProjectDocument(RTDDocTypeMixin, DocType): class Meta(object): model = Project @@ -47,7 +47,7 @@ def faceted_search(cls, query, language=None, using=None, index=None): @page_index.doc_type -class PageDocument(DocType): +class PageDocument(RTDDocTypeMixin, DocType): class Meta(object): model = HTMLFile @@ -121,21 +121,3 @@ def get_queryset(self): queryset = (queryset.filter(project__documentation_type='sphinx') .exclude(name__in=SEARCH_EXCLUDED_FILE)) return queryset - - def update(self, thing, refresh=None, action='index', **kwargs): - """Overwrite in order to index only certain files""" - # Object not exist in the provided queryset should not be indexed - # TODO: remove this overwrite when the issue has been fixed - # See below link for more information - # https://github.com/sabricot/django-elasticsearch-dsl/issues/111 - # Moreover, do not need to check if its a delete action - # Because while delete action, the object is already remove from database - if isinstance(thing, HTMLFile) and action != 'delete': - # Its a model instance. - queryset = self.get_queryset() - obj = queryset.filter(pk=thing.pk) - if not obj.exists(): - return None - - return super(PageDocument, self).update(thing=thing, refresh=refresh, - action=action, **kwargs) diff --git a/readthedocs/search/management/__init__.py b/readthedocs/search/management/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/readthedocs/search/management/commands/__init__.py b/readthedocs/search/management/commands/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py new file mode 100644 index 00000000000..ed0d8dc5939 --- /dev/null +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -0,0 +1,93 @@ +import datetime +import logging + +from celery import chord, chain +from django.apps import apps +from django.conf import settings +from django.core.management import BaseCommand +from django_elasticsearch_dsl.registries import registry + +from ...tasks import (index_objects_to_es_task, switch_es_index_task, create_new_es_index_task, + index_missing_objects_task) +from ...utils import chunks + +log = logging.getLogger(__name__) + + +class Command(BaseCommand): + + @staticmethod + def _get_indexing_tasks(app_label, model_name, instance_ids, document_class, index_name): + chunk_objects = chunks(instance_ids, settings.ES_TASK_CHUNK_SIZE) + + for chunk in chunk_objects: + data = { + 'app_label': app_label, + 'model_name': model_name, + 'document_class': document_class, + 'index_name': index_name, + 'objects_id': chunk + } + yield index_objects_to_es_task.si(**data) + + def _run_reindex_tasks(self, models): + for doc in registry.get_documents(models): + qs = doc().get_queryset() + instance_ids = list(qs.values_list('id', flat=True)) + + app_label = qs.model._meta.app_label + model_name = qs.model.__name__ + + old_index_name = doc._doc_type.index + timestamp_prefix = 'temp-{}-'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')) + new_index_name = timestamp_prefix + old_index_name + + pre_index_task = create_new_es_index_task.si(app_label=app_label, + model_name=model_name, + old_index_name=old_index_name, + new_index_name=new_index_name) + + indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, + instance_ids=instance_ids, document_class=str(doc), + index_name=new_index_name) + + post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name, + old_index_name=old_index_name, + new_index_name=new_index_name) + + # Task to run in order to add the objects + # that has been inserted into database while indexing_tasks was running + missed_index_task = index_missing_objects_task.si(app_label=app_label, + model_name=model_name, + document_class=str(doc), + indexed_instance_ids=instance_ids) + + # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords + chord_tasks = chord(header=indexing_tasks, body=post_index_task) + # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain + chain(pre_index_task, chord_tasks, missed_index_task).apply_async() + + message = "Successfully issued tasks for {}.{}".format(app_label, model_name) + log.info(message) + + @staticmethod + def _get_models(args): + for model_name in args: + yield apps.get_model(model_name) + + def add_arguments(self, parser): + parser.add_argument( + '--models', + dest='models', + type=str, + nargs='*', + help=("Specify the model to be updated in elasticsearch." + "The format is .") + ) + + def handle(self, *args, **options): + models = None + if options['models']: + models = self._get_models(options['models']) + + self._run_reindex_tasks(models=models) diff --git a/readthedocs/search/mixins.py b/readthedocs/search/mixins.py new file mode 100644 index 00000000000..ccaf28fc5df --- /dev/null +++ b/readthedocs/search/mixins.py @@ -0,0 +1,65 @@ +from django.db import models +from django.core.paginator import Paginator + + +class RTDDocTypeMixin(object): + """Override some methods of DocType of DED + + Changelog as following: + - Do not index object that not exist in the provided queryset + - Take additional argument in update method `index_name` to update specific index + + Issues: + - https://github.com/sabricot/django-elasticsearch-dsl/issues/111 + """ + + def _prepare_action(self, object_instance, action, index_name=None): + return { + '_op_type': action, + '_index': index_name or str(self._doc_type.index), + '_type': self._doc_type.mapping.doc_type, + '_id': object_instance.pk, + '_source': ( + self.prepare(object_instance) if action != 'delete' else None + ), + } + + def _get_actions(self, object_list, action, index_name=None): + if self._doc_type.queryset_pagination is not None: + paginator = Paginator( + object_list, self._doc_type.queryset_pagination + ) + for page in paginator.page_range: + for object_instance in paginator.page(page).object_list: + yield self._prepare_action(object_instance, action, index_name) + else: + for object_instance in object_list: + yield self._prepare_action(object_instance, action, index_name) + + def update(self, thing, refresh=None, action='index', index_name=None, **kwargs): + """ + Update each document in ES for a model, iterable of models or queryset + """ + if refresh is True or ( + refresh is None and self._doc_type.auto_refresh + ): + kwargs['refresh'] = True + + # TODO: remove this overwrite when the issue has been fixed + # https://github.com/sabricot/django-elasticsearch-dsl/issues/111 + # Moreover, do not need to check if its a delete action + # Because while delete action, the object is already remove from database + if isinstance(thing, models.Model) and action != 'delete': + # Its a model instance. + queryset = self.get_queryset() + obj = queryset.filter(pk=thing.pk) + if not obj.exists(): + return None + + object_list = [thing] + else: + object_list = thing + + return self.bulk( + self._get_actions(object_list, action, index_name=index_name), **kwargs + ) \ No newline at end of file diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py new file mode 100644 index 00000000000..9b4df098954 --- /dev/null +++ b/readthedocs/search/tasks.py @@ -0,0 +1,83 @@ +import logging + +from django.apps import apps +from django_elasticsearch_dsl.registries import registry + +from readthedocs.worker import app + +log = logging.getLogger(__name__) + + +def _get_index(indices, index_name): + for index in indices: + if str(index) == index_name: + return index + + +def _get_document(model, document_class): + documents = registry.get_documents(models=[model]) + + for document in documents: + if str(document) == document_class: + return document + + +@app.task(queue='web') +def create_new_es_index_task(app_label, model_name, old_index_name, new_index_name): + model = apps.get_model(app_label, model_name) + indices = registry.get_indices(models=[model]) + old_index = _get_index(indices=indices, index_name=old_index_name) + new_index = old_index.clone(name=new_index_name) + new_index.create() + + +@app.task(queue='web') +def switch_es_index_task(app_label, model_name, old_index_name, new_index_name): + model = apps.get_model(app_label, model_name) + indices = registry.get_indices(models=[model]) + old_index = _get_index(indices=indices, index_name=old_index_name) + + new_index = old_index.clone(name=new_index_name) + + if old_index.exists(): + # Alias can not be used to delete an index. + # https://www.elastic.co/guide/en/elasticsearch/reference/6.0/indices-delete-index.html + # So get the index actual name to delete it + old_index_info = old_index.get() + # The info is a dictionary and the key is the actual name of the index + old_index_name = old_index_info.keys()[0] + old_index.connection.indices.delete(index=old_index_name) + + new_index.put_alias(name=old_index_name) + + +@app.task(queue='web') +def index_objects_to_es_task(app_label, model_name, document_class, index_name, objects_id): + model = apps.get_model(app_label, model_name) + document = _get_document(model=model, document_class=document_class) + + # Use queryset from model as the ids are specific + queryset = model.objects.all().filter(id__in=objects_id).iterator() + log.info("Indexing model: {}, id:'{}'".format(model.__name__, objects_id)) + document().update(queryset, index_name=index_name) + + +@app.task(queue='web') +def index_missing_objects_task(app_label, model_name, document_class, indexed_instance_ids): + """ + Task to insure that none of the object is missed from indexing. + + The object ids are sent to task for indexing. + But in the meantime, new objects can be created/deleted in database + and they will not be in the tasks. + This task will index all the objects excluding the ones which have got indexed already + """ + + model = apps.get_model(app_label, model_name) + document = _get_document(model=model, document_class=document_class) + queryset = document().get_queryset().exclude(id__in=indexed_instance_ids) + document().update(queryset.iterator()) + + log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__)) + + # TODO: Figure out how to remove the objects from ES index that has been deleted diff --git a/readthedocs/search/utils.py b/readthedocs/search/utils.py index 659effb1544..aed7f1e981f 100644 --- a/readthedocs/search/utils.py +++ b/readthedocs/search/utils.py @@ -321,3 +321,11 @@ def get_project_list_or_404(project_slug, user): project_list = list(subprojects) + [project] return project_list + + +def chunks(elements, chunk_size): + """Yield successive n-sized chunks from l.""" + # Taken from https://stackoverflow.com/a/312464 + # licensed under cc by-sa 3.0 + for i in range(0, len(elements), chunk_size): + yield elements[i:i + chunk_size] diff --git a/readthedocs/settings/base.py b/readthedocs/settings/base.py index 7e8ef5ff654..44da0bf0e56 100644 --- a/readthedocs/settings/base.py +++ b/readthedocs/settings/base.py @@ -324,6 +324,8 @@ def USE_PROMOS(self): # noqa 'hosts': '127.0.0.1:9200' }, } + # Chunk size for elasticsearch reindex celery tasks + ES_TASK_CHUNK_SIZE = 100 # ANALYZER = 'analysis': { # 'analyzer': { From fb16187549c49701eeb5a8f1ef928a151f6db49c Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Sat, 14 Jul 2018 05:27:07 +0600 Subject: [PATCH 02/15] fixing lint --- .../management/commands/reindex_elasticsearch.py | 3 ++- readthedocs/search/mixins.py | 11 ++++------- readthedocs/search/tasks.py | 1 - 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index ed0d8dc5939..9e40ccb63c6 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -48,7 +48,8 @@ def _run_reindex_tasks(self, models): new_index_name=new_index_name) indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, - instance_ids=instance_ids, document_class=str(doc), + instance_ids=instance_ids, + document_class=str(doc), index_name=new_index_name) post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name, diff --git a/readthedocs/search/mixins.py b/readthedocs/search/mixins.py index ccaf28fc5df..603920016f5 100644 --- a/readthedocs/search/mixins.py +++ b/readthedocs/search/mixins.py @@ -3,12 +3,11 @@ class RTDDocTypeMixin(object): - """Override some methods of DocType of DED - + """ + Override some methods of DocType of DED Changelog as following: - Do not index object that not exist in the provided queryset - Take additional argument in update method `index_name` to update specific index - Issues: - https://github.com/sabricot/django-elasticsearch-dsl/issues/111 """ @@ -37,9 +36,7 @@ def _get_actions(self, object_list, action, index_name=None): yield self._prepare_action(object_instance, action, index_name) def update(self, thing, refresh=None, action='index', index_name=None, **kwargs): - """ - Update each document in ES for a model, iterable of models or queryset - """ + """Update each document in ES for a model, iterable of models or queryset""" if refresh is True or ( refresh is None and self._doc_type.auto_refresh ): @@ -62,4 +59,4 @@ def update(self, thing, refresh=None, action='index', index_name=None, **kwargs) return self.bulk( self._get_actions(object_list, action, index_name=index_name), **kwargs - ) \ No newline at end of file + ) diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index 9b4df098954..39de5e6865f 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -72,7 +72,6 @@ def index_missing_objects_task(app_label, model_name, document_class, indexed_in and they will not be in the tasks. This task will index all the objects excluding the ones which have got indexed already """ - model = apps.get_model(app_label, model_name) document = _get_document(model=model, document_class=document_class) queryset = document().get_queryset().exclude(id__in=indexed_instance_ids) From 39d80311bcb68ff569111d6a3786832e2a2f50bf Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Sat, 14 Jul 2018 06:01:36 +0600 Subject: [PATCH 03/15] fixup --- .../commands/reindex_elasticsearch.py | 8 ++++---- readthedocs/search/mixins.py | 18 +++++++++++------- readthedocs/search/tasks.py | 18 ++++++++++-------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 9e40ccb63c6..4433ad5730b 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -38,13 +38,13 @@ def _run_reindex_tasks(self, models): app_label = qs.model._meta.app_label model_name = qs.model.__name__ - old_index_name = doc._doc_type.index + index_name = doc._doc_type.index timestamp_prefix = 'temp-{}-'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - new_index_name = timestamp_prefix + old_index_name + new_index_name = timestamp_prefix + index_name pre_index_task = create_new_es_index_task.si(app_label=app_label, model_name=model_name, - old_index_name=old_index_name, + index_name=index_name, new_index_name=new_index_name) indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, @@ -53,7 +53,7 @@ def _run_reindex_tasks(self, models): index_name=new_index_name) post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name, - old_index_name=old_index_name, + index_name=index_name, new_index_name=new_index_name) # Task to run in order to add the objects diff --git a/readthedocs/search/mixins.py b/readthedocs/search/mixins.py index 603920016f5..7efaaca7298 100644 --- a/readthedocs/search/mixins.py +++ b/readthedocs/search/mixins.py @@ -3,8 +3,10 @@ class RTDDocTypeMixin(object): + """ Override some methods of DocType of DED + Changelog as following: - Do not index object that not exist in the provided queryset - Take additional argument in update method `index_name` to update specific index @@ -44,14 +46,16 @@ def update(self, thing, refresh=None, action='index', index_name=None, **kwargs) # TODO: remove this overwrite when the issue has been fixed # https://github.com/sabricot/django-elasticsearch-dsl/issues/111 - # Moreover, do not need to check if its a delete action - # Because while delete action, the object is already remove from database - if isinstance(thing, models.Model) and action != 'delete': + if isinstance(thing, models.Model): # Its a model instance. - queryset = self.get_queryset() - obj = queryset.filter(pk=thing.pk) - if not obj.exists(): - return None + + # Do not need to check if its a delete action + # Because while delete action, the object is already remove from database + if action != 'delete': + queryset = self.get_queryset() + obj = queryset.filter(pk=thing.pk) + if not obj.exists(): + return None object_list = [thing] else: diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index 39de5e6865f..29a0211329f 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -23,21 +23,21 @@ def _get_document(model, document_class): @app.task(queue='web') -def create_new_es_index_task(app_label, model_name, old_index_name, new_index_name): +def create_new_es_index_task(app_label, model_name, index_name, new_index_name): model = apps.get_model(app_label, model_name) indices = registry.get_indices(models=[model]) - old_index = _get_index(indices=indices, index_name=old_index_name) + old_index = _get_index(indices=indices, index_name=index_name) new_index = old_index.clone(name=new_index_name) new_index.create() @app.task(queue='web') -def switch_es_index_task(app_label, model_name, old_index_name, new_index_name): +def switch_es_index_task(app_label, model_name, index_name, new_index_name): model = apps.get_model(app_label, model_name) indices = registry.get_indices(models=[model]) - old_index = _get_index(indices=indices, index_name=old_index_name) - + old_index = _get_index(indices=indices, index_name=index_name) new_index = old_index.clone(name=new_index_name) + old_index_actual_name = None if old_index.exists(): # Alias can not be used to delete an index. @@ -45,10 +45,12 @@ def switch_es_index_task(app_label, model_name, old_index_name, new_index_name): # So get the index actual name to delete it old_index_info = old_index.get() # The info is a dictionary and the key is the actual name of the index - old_index_name = old_index_info.keys()[0] - old_index.connection.indices.delete(index=old_index_name) + old_index_actual_name = old_index_info.keys()[0] - new_index.put_alias(name=old_index_name) + # Put alias into the new index name and delete the old index if its exist + new_index.put_alias(name=index_name) + if old_index_actual_name: + old_index.connection.indices.delete(index=old_index_actual_name) @app.task(queue='web') From bbbdca5a63980215722069864ac00f4146084f2e Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Mon, 16 Jul 2018 08:21:34 +0600 Subject: [PATCH 04/15] fixup message --- .../search/management/commands/reindex_elasticsearch.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 4433ad5730b..4dacc0957ff 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -68,7 +68,8 @@ def _run_reindex_tasks(self, models): # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain chain(pre_index_task, chord_tasks, missed_index_task).apply_async() - message = "Successfully issued tasks for {}.{}".format(app_label, model_name) + message = ("Successfully issued tasks for {}.{}, total {} items" + .format(app_label, model_name, len(instance_ids))) log.info(message) @staticmethod From faca6defcc658c53cabe2e26d8f6d54c3e576e91 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Mon, 16 Jul 2018 08:46:37 +0600 Subject: [PATCH 05/15] fixup index name --- .../search/management/commands/reindex_elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 4dacc0957ff..4cfb00d194a 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -39,8 +39,8 @@ def _run_reindex_tasks(self, models): model_name = qs.model.__name__ index_name = doc._doc_type.index - timestamp_prefix = 'temp-{}-'.format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')) - new_index_name = timestamp_prefix + index_name + timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + new_index_name = "{}_{}".format(index_name, timestamp) pre_index_task = create_new_es_index_task.si(app_label=app_label, model_name=model_name, From fd54d69addbd97fd8994cb3a993cf78c587256da Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Mon, 16 Jul 2018 18:43:39 +0600 Subject: [PATCH 06/15] fixing command --- .../management/commands/reindex_elasticsearch.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 4cfb00d194a..083da90d73d 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -72,11 +72,6 @@ def _run_reindex_tasks(self, models): .format(app_label, model_name, len(instance_ids))) log.info(message) - @staticmethod - def _get_models(args): - for model_name in args: - yield apps.get_model(model_name) - def add_arguments(self, parser): parser.add_argument( '--models', @@ -88,8 +83,15 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): + """ + Index models into Elasticsearch index asynchronously using celery. + + You can specify model to get indexed by passing + `--model .` parameter. + Otherwise, it will reindex all the models + """ models = None if options['models']: - models = self._get_models(options['models']) + models = [apps.get_model(model_name) for model_name in options['models']] self._run_reindex_tasks(models=models) From b9dbb5d1a3fc96d6196d6971ed806fb91a7e8f71 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Tue, 17 Jul 2018 01:53:48 +0600 Subject: [PATCH 07/15] fixing docstring --- readthedocs/search/mixins.py | 2 ++ readthedocs/search/utils.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/readthedocs/search/mixins.py b/readthedocs/search/mixins.py index 7efaaca7298..05a9bb8650b 100644 --- a/readthedocs/search/mixins.py +++ b/readthedocs/search/mixins.py @@ -15,6 +15,7 @@ class RTDDocTypeMixin(object): """ def _prepare_action(self, object_instance, action, index_name=None): + """Overwrite to take `index_name` from parameters for setting index dynamically""" return { '_op_type': action, '_index': index_name or str(self._doc_type.index), @@ -26,6 +27,7 @@ def _prepare_action(self, object_instance, action, index_name=None): } def _get_actions(self, object_list, action, index_name=None): + """Overwrite to take `index_name` from parameters for setting index dynamically""" if self._doc_type.queryset_pagination is not None: paginator = Paginator( object_list, self._doc_type.queryset_pagination diff --git a/readthedocs/search/utils.py b/readthedocs/search/utils.py index aed7f1e981f..d479ef213bc 100644 --- a/readthedocs/search/utils.py +++ b/readthedocs/search/utils.py @@ -324,7 +324,7 @@ def get_project_list_or_404(project_slug, user): def chunks(elements, chunk_size): - """Yield successive n-sized chunks from l.""" + """Yield successive `chunk_size` chunks from elements.""" # Taken from https://stackoverflow.com/a/312464 # licensed under cc by-sa 3.0 for i in range(0, len(elements), chunk_size): From ce4abaf9324f493a37b2549dce55dc4952228ad1 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Thu, 19 Jul 2018 05:32:45 +0600 Subject: [PATCH 08/15] optimizing indexing --- readthedocs/projects/signals.py | 4 ++++ readthedocs/projects/tasks.py | 30 ++++++++++++++++++++++-------- readthedocs/search/__init__.py | 1 + readthedocs/search/apps.py | 10 ++++++++++ readthedocs/search/documents.py | 1 + readthedocs/search/signals.py | 25 +++++++++++++++++++++++++ 6 files changed, 63 insertions(+), 8 deletions(-) create mode 100644 readthedocs/search/apps.py diff --git a/readthedocs/projects/signals.py b/readthedocs/projects/signals.py index 6ef49f9e67c..0c94d464b82 100644 --- a/readthedocs/projects/signals.py +++ b/readthedocs/projects/signals.py @@ -14,3 +14,7 @@ project_import = django.dispatch.Signal(providing_args=["project"]) files_changed = django.dispatch.Signal(providing_args=["project", "files"]) + +bulk_post_create = django.dispatch.Signal(providing_args=["instance_list"]) + +bulk_post_delete = django.dispatch.Signal(providing_args=["instance_list"]) diff --git a/readthedocs/projects/tasks.py b/readthedocs/projects/tasks.py index 88b92e24eb4..2a149daf74c 100644 --- a/readthedocs/projects/tasks.py +++ b/readthedocs/projects/tasks.py @@ -32,7 +32,8 @@ from .constants import LOG_TEMPLATE from .exceptions import RepositoryError from .models import ImportedFile, Project, Domain, Feature, HTMLFile -from .signals import before_vcs, after_vcs, before_build, after_build, files_changed +from .signals import before_vcs, after_vcs, before_build, after_build, files_changed, \ + bulk_post_create, bulk_post_delete from readthedocs.builds.constants import ( BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED, BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME) @@ -986,6 +987,7 @@ def _manage_imported_files(version, path, commit): :param commit: Commit that updated path """ changed_files = set() + created_html_files = [] for root, __, filenames in os.walk(path): for filename in filenames: if fnmatch.fnmatch(filename, '*.html'): @@ -1015,15 +1017,27 @@ def _manage_imported_files(version, path, commit): obj.commit = commit obj.save() - # Delete the HTMLFile first from previous versions - HTMLFile.objects.filter(project=version.project, - version=version - ).exclude(commit=commit).delete() + if isinstance(obj, HTMLFile): + # the `obj` is HTMLFile, so add it to the list + created_html_files.append(obj) + + # Send bulk_post_create signal for bulk indexing to Elasticsearch + bulk_post_create.send(sender=HTMLFile, instance_list=created_html_files) + + # Delete the HTMLFile first from previous commit and + # send bulk_post_delete signal for bulk removing from Elasticsearch + delete_queryset = (HTMLFile.objects.filter(project=version.project, version=version) + .exclude(commit=commit)) + # Keep the objects into memory to send it to signal + instance_list = list(delete_queryset) + # Safely delete from database + delete_queryset.delete() + # Always pass the list of instance, not queryset. + bulk_post_delete.send(sender=HTMLFile, instance_list=instance_list) # Delete ImportedFiles from previous versions - ImportedFile.objects.filter(project=version.project, - version=version - ).exclude(commit=commit).delete() + (ImportedFile.objects.filter(project=version.project, version=version) + .exclude(commit=commit).delete()) changed_files = [ resolve_path( version.project, filename=file, version_slug=version.slug, diff --git a/readthedocs/search/__init__.py b/readthedocs/search/__init__.py index e69de29bb2d..552c9337386 100644 --- a/readthedocs/search/__init__.py +++ b/readthedocs/search/__init__.py @@ -0,0 +1 @@ +default_app_config = 'readthedocs.search.apps.SearchConfig' diff --git a/readthedocs/search/apps.py b/readthedocs/search/apps.py new file mode 100644 index 00000000000..108a2ecc69f --- /dev/null +++ b/readthedocs/search/apps.py @@ -0,0 +1,10 @@ +"""Project app config""" + +from django.apps import AppConfig + + +class SearchConfig(AppConfig): + name = 'readthedocs.search' + + def ready(self): + from .signals import index_html_file, remove_html_file diff --git a/readthedocs/search/documents.py b/readthedocs/search/documents.py index bab26ef506d..b9223684cfe 100644 --- a/readthedocs/search/documents.py +++ b/readthedocs/search/documents.py @@ -52,6 +52,7 @@ class PageDocument(RTDDocTypeMixin, DocType): class Meta(object): model = HTMLFile fields = ('commit',) + ignore_signals = True project = fields.KeywordField(attr='project.slug') version = fields.KeywordField(attr='version.slug') diff --git a/readthedocs/search/signals.py b/readthedocs/search/signals.py index 6abdf64cce9..88349fe8ba1 100644 --- a/readthedocs/search/signals.py +++ b/readthedocs/search/signals.py @@ -1,7 +1,32 @@ """We define custom Django signals to trigger before executing searches.""" from __future__ import absolute_import import django.dispatch +from django.dispatch import receiver +from django_elasticsearch_dsl.registries import registry + +from readthedocs.projects.models import HTMLFile +from readthedocs.projects.signals import bulk_post_create, bulk_post_delete +from readthedocs.search.documents import PageDocument +from readthedocs.search.tasks import index_objects_to_es_task before_project_search = django.dispatch.Signal(providing_args=["body"]) before_file_search = django.dispatch.Signal(providing_args=["body"]) before_section_search = django.dispatch.Signal(providing_args=["body"]) + + +@receiver(bulk_post_create, sender=HTMLFile) +def index_html_file(instance_list, **_): + kwargs = { + 'app_label': HTMLFile._meta.app_label, + 'model_name': HTMLFile.__name__, + 'document_class': str(PageDocument), + 'index_name': None, # No neeed to change the index name + 'objects_id': [obj.id for obj in instance_list], + } + + index_objects_to_es_task.delay(**kwargs) + + +@receiver(bulk_post_delete, sender=HTMLFile) +def remove_html_file(instance_list, **_): + registry.delete(instance_list) From db51a903f6355566030300febe4dfa4f109bf488 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 20 Jul 2018 02:22:32 +0600 Subject: [PATCH 09/15] fixing tests and signals --- conftest.py | 2 +- readthedocs/projects/tasks.py | 2 +- readthedocs/search/documents.py | 2 +- readthedocs/search/signals.py | 2 +- readthedocs/search/tests/conftest.py | 4 ---- readthedocs/settings/base.py | 1 + readthedocs/settings/test.py | 1 + 7 files changed, 6 insertions(+), 8 deletions(-) diff --git a/conftest.py b/conftest.py index a75b22a48d0..3eb3f7e14ed 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import pytest +from django.conf import settings from rest_framework.test import APIClient try: @@ -46,7 +47,6 @@ def pytest_configure(config): def settings_modification(settings): settings.CELERY_ALWAYS_EAGER = True - @pytest.fixture def api_client(): return APIClient() diff --git a/readthedocs/projects/tasks.py b/readthedocs/projects/tasks.py index 2a149daf74c..8a51bb7ba13 100644 --- a/readthedocs/projects/tasks.py +++ b/readthedocs/projects/tasks.py @@ -1017,7 +1017,7 @@ def _manage_imported_files(version, path, commit): obj.commit = commit obj.save() - if isinstance(obj, HTMLFile): + if model_class == HTMLFile: # the `obj` is HTMLFile, so add it to the list created_html_files.append(obj) diff --git a/readthedocs/search/documents.py b/readthedocs/search/documents.py index b9223684cfe..fbfb09aabd9 100644 --- a/readthedocs/search/documents.py +++ b/readthedocs/search/documents.py @@ -52,7 +52,7 @@ class PageDocument(RTDDocTypeMixin, DocType): class Meta(object): model = HTMLFile fields = ('commit',) - ignore_signals = True + ignore_signals = settings.ES_PAGE_IGNORE_SIGNALS project = fields.KeywordField(attr='project.slug') version = fields.KeywordField(attr='version.slug') diff --git a/readthedocs/search/signals.py b/readthedocs/search/signals.py index 88349fe8ba1..ce0cc1e51ce 100644 --- a/readthedocs/search/signals.py +++ b/readthedocs/search/signals.py @@ -24,7 +24,7 @@ def index_html_file(instance_list, **_): 'objects_id': [obj.id for obj in instance_list], } - index_objects_to_es_task.delay(**kwargs) + index_objects_to_es_task(**kwargs) @receiver(bulk_post_delete, sender=HTMLFile) diff --git a/readthedocs/search/tests/conftest.py b/readthedocs/search/tests/conftest.py index 52c4811342e..4e46679732c 100644 --- a/readthedocs/search/tests/conftest.py +++ b/readthedocs/search/tests/conftest.py @@ -58,9 +58,5 @@ def get_dummy_processed_json(instance): @pytest.fixture(autouse=True) def mock_processed_json(mocker): - - # patch the function from `projects.tasks` because it has been point to there - # http://www.voidspace.org.uk/python/mock/patch.html#where-to-patch mocked_function = mocker.patch.object(HTMLFile, 'get_processed_json', autospec=True) mocked_function.side_effect = get_dummy_processed_json - diff --git a/readthedocs/settings/base.py b/readthedocs/settings/base.py index 44da0bf0e56..c44bd9e9901 100644 --- a/readthedocs/settings/base.py +++ b/readthedocs/settings/base.py @@ -326,6 +326,7 @@ def USE_PROMOS(self): # noqa } # Chunk size for elasticsearch reindex celery tasks ES_TASK_CHUNK_SIZE = 100 + ES_PAGE_IGNORE_SIGNALS = True # ANALYZER = 'analysis': { # 'analyzer': { diff --git a/readthedocs/settings/test.py b/readthedocs/settings/test.py index 79c7486e3cb..2a72ec03481 100644 --- a/readthedocs/settings/test.py +++ b/readthedocs/settings/test.py @@ -16,6 +16,7 @@ class CommunityTestSettings(CommunityDevSettings): DEBUG = False TEMPLATE_DEBUG = False + ES_PAGE_IGNORE_SIGNALS = False @property def ES_INDEXES(self): # noqa - avoid pep8 N802 From 612cfb8fabc63667d15a93d162340b3252ec71ae Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 27 Jul 2018 12:38:21 +0600 Subject: [PATCH 10/15] [Fix #4409] Disable autoindexing in local development --- readthedocs/search/signals.py | 11 ++++++++--- readthedocs/settings/dev.py | 3 +++ readthedocs/settings/test.py | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/readthedocs/search/signals.py b/readthedocs/search/signals.py index ce0cc1e51ce..70c3c9dba60 100644 --- a/readthedocs/search/signals.py +++ b/readthedocs/search/signals.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import django.dispatch from django.dispatch import receiver +from django_elasticsearch_dsl.apps import DEDConfig from django_elasticsearch_dsl.registries import registry from readthedocs.projects.models import HTMLFile @@ -20,13 +21,17 @@ def index_html_file(instance_list, **_): 'app_label': HTMLFile._meta.app_label, 'model_name': HTMLFile.__name__, 'document_class': str(PageDocument), - 'index_name': None, # No neeed to change the index name + 'index_name': None, # No need to change the index name 'objects_id': [obj.id for obj in instance_list], } - index_objects_to_es_task(**kwargs) + # Do not index if autosync is disabled globally + if DEDConfig.autosync_enabled(): + index_objects_to_es_task(**kwargs) @receiver(bulk_post_delete, sender=HTMLFile) def remove_html_file(instance_list, **_): - registry.delete(instance_list) + # Do not index if autosync is disabled globally + if DEDConfig.autosync_enabled(): + registry.delete(instance_list) diff --git a/readthedocs/settings/dev.py b/readthedocs/settings/dev.py index 7fa4dafe959..93b08d6bbc6 100644 --- a/readthedocs/settings/dev.py +++ b/readthedocs/settings/dev.py @@ -48,6 +48,9 @@ def DATABASES(self): # noqa 'test:8000', ) + # Disable auto syncing elasticsearch documents in development + ELASTICSEARCH_DSL_AUTOSYNC = False + @property def LOGGING(self): # noqa - avoid pep8 N802 logging = super(CommunityDevSettings, self).LOGGING diff --git a/readthedocs/settings/test.py b/readthedocs/settings/test.py index 2a72ec03481..d923b2359fa 100644 --- a/readthedocs/settings/test.py +++ b/readthedocs/settings/test.py @@ -17,6 +17,7 @@ class CommunityTestSettings(CommunityDevSettings): DEBUG = False TEMPLATE_DEBUG = False ES_PAGE_IGNORE_SIGNALS = False + ELASTICSEARCH_DSL_AUTOSYNC = True @property def ES_INDEXES(self): # noqa - avoid pep8 N802 From baf8421037b53acf6c581fbf0f4a27c1b06aa89e Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 27 Jul 2018 15:29:01 +0600 Subject: [PATCH 11/15] fixup as per comments --- .../commands/reindex_elasticsearch.py | 28 +++++++++---------- readthedocs/search/signals.py | 4 +-- readthedocs/search/tasks.py | 20 ++++++++++--- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 083da90d73d..9450c65c8d8 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -7,8 +7,8 @@ from django.core.management import BaseCommand from django_elasticsearch_dsl.registries import registry -from ...tasks import (index_objects_to_es_task, switch_es_index_task, create_new_es_index_task, - index_missing_objects_task) +from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index, + index_missing_objects) from ...utils import chunks log = logging.getLogger(__name__) @@ -28,7 +28,7 @@ def _get_indexing_tasks(app_label, model_name, instance_ids, document_class, ind 'index_name': index_name, 'objects_id': chunk } - yield index_objects_to_es_task.si(**data) + yield index_objects_to_es.si(**data) def _run_reindex_tasks(self, models): for doc in registry.get_documents(models): @@ -42,26 +42,26 @@ def _run_reindex_tasks(self, models): timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') new_index_name = "{}_{}".format(index_name, timestamp) - pre_index_task = create_new_es_index_task.si(app_label=app_label, - model_name=model_name, - index_name=index_name, - new_index_name=new_index_name) + pre_index_task = create_new_es_index.si(app_label=app_label, + model_name=model_name, + index_name=index_name, + new_index_name=new_index_name) indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, instance_ids=instance_ids, document_class=str(doc), index_name=new_index_name) - post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name, - index_name=index_name, - new_index_name=new_index_name) + post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name, + index_name=index_name, + new_index_name=new_index_name) # Task to run in order to add the objects # that has been inserted into database while indexing_tasks was running - missed_index_task = index_missing_objects_task.si(app_label=app_label, - model_name=model_name, - document_class=str(doc), - indexed_instance_ids=instance_ids) + missed_index_task = index_missing_objects.si(app_label=app_label, + model_name=model_name, + document_class=str(doc), + indexed_instance_ids=instance_ids) # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords chord_tasks = chord(header=indexing_tasks, body=post_index_task) diff --git a/readthedocs/search/signals.py b/readthedocs/search/signals.py index 70c3c9dba60..6e335e06d0a 100644 --- a/readthedocs/search/signals.py +++ b/readthedocs/search/signals.py @@ -8,7 +8,7 @@ from readthedocs.projects.models import HTMLFile from readthedocs.projects.signals import bulk_post_create, bulk_post_delete from readthedocs.search.documents import PageDocument -from readthedocs.search.tasks import index_objects_to_es_task +from readthedocs.search.tasks import index_objects_to_es before_project_search = django.dispatch.Signal(providing_args=["body"]) before_file_search = django.dispatch.Signal(providing_args=["body"]) @@ -27,7 +27,7 @@ def index_html_file(instance_list, **_): # Do not index if autosync is disabled globally if DEDConfig.autosync_enabled(): - index_objects_to_es_task(**kwargs) + index_objects_to_es(**kwargs) @receiver(bulk_post_delete, sender=HTMLFile) diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index 29a0211329f..699b23f8043 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -9,12 +9,24 @@ def _get_index(indices, index_name): + """ + Get Index from all the indices + :param indices: DED indices list + :param index_name: Name of the index + :return: DED Index + """ for index in indices: if str(index) == index_name: return index def _get_document(model, document_class): + """ + Get DED document class object from the model and name of document class + :param model: The model class to find the document + :param document_class: the name of the document class. + :return: DED DocType object + """ documents = registry.get_documents(models=[model]) for document in documents: @@ -23,7 +35,7 @@ def _get_document(model, document_class): @app.task(queue='web') -def create_new_es_index_task(app_label, model_name, index_name, new_index_name): +def create_new_es_index(app_label, model_name, index_name, new_index_name): model = apps.get_model(app_label, model_name) indices = registry.get_indices(models=[model]) old_index = _get_index(indices=indices, index_name=index_name) @@ -32,7 +44,7 @@ def create_new_es_index_task(app_label, model_name, index_name, new_index_name): @app.task(queue='web') -def switch_es_index_task(app_label, model_name, index_name, new_index_name): +def switch_es_index(app_label, model_name, index_name, new_index_name): model = apps.get_model(app_label, model_name) indices = registry.get_indices(models=[model]) old_index = _get_index(indices=indices, index_name=index_name) @@ -54,7 +66,7 @@ def switch_es_index_task(app_label, model_name, index_name, new_index_name): @app.task(queue='web') -def index_objects_to_es_task(app_label, model_name, document_class, index_name, objects_id): +def index_objects_to_es(app_label, model_name, document_class, index_name, objects_id): model = apps.get_model(app_label, model_name) document = _get_document(model=model, document_class=document_class) @@ -65,7 +77,7 @@ def index_objects_to_es_task(app_label, model_name, document_class, index_name, @app.task(queue='web') -def index_missing_objects_task(app_label, model_name, document_class, indexed_instance_ids): +def index_missing_objects(app_label, model_name, document_class, indexed_instance_ids): """ Task to insure that none of the object is missed from indexing. From 143ce7fedecdff6127d71a2e07ceeaa16dc9443e Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 27 Jul 2018 20:49:31 +0600 Subject: [PATCH 12/15] Optimizing reindexing management command --- readthedocs/projects/models.py | 2 ++ .../commands/reindex_elasticsearch.py | 28 +++++++++++-------- readthedocs/search/tasks.py | 13 +++++---- readthedocs/search/utils.py | 11 ++++---- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/readthedocs/projects/models.py b/readthedocs/projects/models.py index 1a87ff1e105..cef7b3eb4d6 100644 --- a/readthedocs/projects/models.py +++ b/readthedocs/projects/models.py @@ -13,6 +13,7 @@ from django.contrib.auth.models import User from django.core.urlresolvers import NoReverseMatch, reverse from django.db import models +from django.utils import timezone from django.utils.encoding import python_2_unicode_compatible from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _ @@ -911,6 +912,7 @@ class ImportedFile(models.Model): path = models.CharField(_('Path'), max_length=255) md5 = models.CharField(_('MD5 checksum'), max_length=255) commit = models.CharField(_('Commit'), max_length=255) + modified_date = models.DateTimeField(_('Modified date'), auto_now=True) def get_absolute_url(self): return resolve(project=self.project, version_slug=self.version.slug, filename=self.path) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 9450c65c8d8..7fa653ffe39 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -9,7 +9,7 @@ from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index, index_missing_objects) -from ...utils import chunks +from ...utils import chunk_queryset log = logging.getLogger(__name__) @@ -17,26 +17,29 @@ class Command(BaseCommand): @staticmethod - def _get_indexing_tasks(app_label, model_name, instance_ids, document_class, index_name): - chunk_objects = chunks(instance_ids, settings.ES_TASK_CHUNK_SIZE) + def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name): + queryset = queryset.values_list('id', flat=True) + chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE) - for chunk in chunk_objects: + for chunk in chunked_queryset: data = { 'app_label': app_label, 'model_name': model_name, 'document_class': document_class, 'index_name': index_name, - 'objects_id': chunk + 'objects_id': list(chunk) } yield index_objects_to_es.si(**data) def _run_reindex_tasks(self, models): for doc in registry.get_documents(models): - qs = doc().get_queryset() - instance_ids = list(qs.values_list('id', flat=True)) + queryset = doc().get_queryset() + # Get latest object from the queryset + latest_object = queryset.latest('modified_date') + latest_object_time = latest_object.modified_date - app_label = qs.model._meta.app_label - model_name = qs.model.__name__ + app_label = queryset.model._meta.app_label + model_name = queryset.model.__name__ index_name = doc._doc_type.index timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') @@ -48,7 +51,7 @@ def _run_reindex_tasks(self, models): new_index_name=new_index_name) indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, - instance_ids=instance_ids, + queryset=queryset, document_class=str(doc), index_name=new_index_name) @@ -58,10 +61,11 @@ def _run_reindex_tasks(self, models): # Task to run in order to add the objects # that has been inserted into database while indexing_tasks was running + # We pass the creation time of latest object, so its possible to index later items missed_index_task = index_missing_objects.si(app_label=app_label, model_name=model_name, document_class=str(doc), - indexed_instance_ids=instance_ids) + latest_indexed=latest_object_time) # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords chord_tasks = chord(header=indexing_tasks, body=post_index_task) @@ -69,7 +73,7 @@ def _run_reindex_tasks(self, models): chain(pre_index_task, chord_tasks, missed_index_task).apply_async() message = ("Successfully issued tasks for {}.{}, total {} items" - .format(app_label, model_name, len(instance_ids))) + .format(app_label, model_name, queryset.count())) log.info(message) def add_arguments(self, parser): diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index 699b23f8043..e814d6a30d5 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -77,18 +77,19 @@ def index_objects_to_es(app_label, model_name, document_class, index_name, objec @app.task(queue='web') -def index_missing_objects(app_label, model_name, document_class, indexed_instance_ids): +def index_missing_objects(app_label, model_name, document_class, latest_indexed): """ Task to insure that none of the object is missed from indexing. - The object ids are sent to task for indexing. - But in the meantime, new objects can be created/deleted in database - and they will not be in the tasks. - This task will index all the objects excluding the ones which have got indexed already + The object ids are sent to `index_objects_to_es` task for indexing. + While the task is running, new objects can be created/deleted in database + and they will not be in the tasks for indexing into ES. + This task will index all the objects that got into DB after the `latest_indexed` timestamp + to ensure that everything is in ES index. """ model = apps.get_model(app_label, model_name) document = _get_document(model=model, document_class=document_class) - queryset = document().get_queryset().exclude(id__in=indexed_instance_ids) + queryset = document().get_queryset().exclude(modified_date__lte=latest_indexed) document().update(queryset.iterator()) log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__)) diff --git a/readthedocs/search/utils.py b/readthedocs/search/utils.py index d479ef213bc..4a8b8019dfa 100644 --- a/readthedocs/search/utils.py +++ b/readthedocs/search/utils.py @@ -323,9 +323,10 @@ def get_project_list_or_404(project_slug, user): return project_list -def chunks(elements, chunk_size): - """Yield successive `chunk_size` chunks from elements.""" - # Taken from https://stackoverflow.com/a/312464 +def chunk_queryset(queryset, chunk_size): + """Yield successive `chunk_size` chunks of queryset.""" + # Based on https://stackoverflow.com/a/312464 # licensed under cc by-sa 3.0 - for i in range(0, len(elements), chunk_size): - yield elements[i:i + chunk_size] + total = queryset.count() + for i in range(0, total, chunk_size): + yield queryset[i:i + chunk_size] From abaeade1563c91c5e9c904e0135df95828a5d7e6 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 27 Jul 2018 20:54:51 +0600 Subject: [PATCH 13/15] adding migration --- .../0028_importedfile_modified_date.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 readthedocs/projects/migrations/0028_importedfile_modified_date.py diff --git a/readthedocs/projects/migrations/0028_importedfile_modified_date.py b/readthedocs/projects/migrations/0028_importedfile_modified_date.py new file mode 100644 index 00000000000..fc2f16c2387 --- /dev/null +++ b/readthedocs/projects/migrations/0028_importedfile_modified_date.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.13 on 2018-07-27 09:54 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('projects', '0027_add_htmlfile_model'), + ] + + operations = [ + migrations.AddField( + model_name='importedfile', + name='modified_date', + field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'), + preserve_default=False, + ), + ] From 9d6f2016c1f4bfc46b4080b364aff29010d2f12e Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Fri, 27 Jul 2018 21:03:33 +0600 Subject: [PATCH 14/15] fixup lint --- readthedocs/search/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index e814d6a30d5..48a4c19b02a 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -11,6 +11,7 @@ def _get_index(indices, index_name): """ Get Index from all the indices + :param indices: DED indices list :param index_name: Name of the index :return: DED Index @@ -23,6 +24,7 @@ def _get_index(indices, index_name): def _get_document(model, document_class): """ Get DED document class object from the model and name of document class + :param model: The model class to find the document :param document_class: the name of the document class. :return: DED DocType object From 652f869ea4384784833db596e272f85427bca691 Mon Sep 17 00:00:00 2001 From: Safwan Rahman Date: Tue, 31 Jul 2018 00:48:37 +0600 Subject: [PATCH 15/15] fixup as per comments --- .../search/management/commands/reindex_elasticsearch.py | 6 +++--- readthedocs/search/tasks.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py index 7fa653ffe39..ce28933dcb7 100644 --- a/readthedocs/search/management/commands/reindex_elasticsearch.py +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -5,6 +5,7 @@ from django.apps import apps from django.conf import settings from django.core.management import BaseCommand +from django.utils import timezone from django_elasticsearch_dsl.registries import registry from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index, @@ -35,8 +36,7 @@ def _run_reindex_tasks(self, models): for doc in registry.get_documents(models): queryset = doc().get_queryset() # Get latest object from the queryset - latest_object = queryset.latest('modified_date') - latest_object_time = latest_object.modified_date + index_time = timezone.now() app_label = queryset.model._meta.app_label model_name = queryset.model.__name__ @@ -65,7 +65,7 @@ def _run_reindex_tasks(self, models): missed_index_task = index_missing_objects.si(app_label=app_label, model_name=model_name, document_class=str(doc), - latest_indexed=latest_object_time) + index_generation_time=index_time) # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords chord_tasks = chord(header=indexing_tasks, body=post_index_task) diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py index 48a4c19b02a..f702b452656 100644 --- a/readthedocs/search/tasks.py +++ b/readthedocs/search/tasks.py @@ -79,7 +79,7 @@ def index_objects_to_es(app_label, model_name, document_class, index_name, objec @app.task(queue='web') -def index_missing_objects(app_label, model_name, document_class, latest_indexed): +def index_missing_objects(app_label, model_name, document_class, index_generation_time): """ Task to insure that none of the object is missed from indexing. @@ -91,7 +91,7 @@ def index_missing_objects(app_label, model_name, document_class, latest_indexed) """ model = apps.get_model(app_label, model_name) document = _get_document(model=model, document_class=document_class) - queryset = document().get_queryset().exclude(modified_date__lte=latest_indexed) + queryset = document().get_queryset().exclude(modified_date__lte=index_generation_time) document().update(queryset.iterator()) log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__))