diff --git a/conftest.py b/conftest.py index a75b22a48d0..3eb3f7e14ed 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import pytest +from django.conf import settings from rest_framework.test import APIClient try: @@ -46,7 +47,6 @@ def pytest_configure(config): def settings_modification(settings): settings.CELERY_ALWAYS_EAGER = True - @pytest.fixture def api_client(): return APIClient() diff --git a/readthedocs/core/management/commands/reindex_elasticsearch.py b/readthedocs/core/management/commands/reindex_elasticsearch.py deleted file mode 100644 index 7a5f25a065a..00000000000 --- a/readthedocs/core/management/commands/reindex_elasticsearch.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Reindex Elastic Search indexes""" - -from __future__ import absolute_import -import logging -from optparse import make_option - -from django.core.management.base import BaseCommand -from django.core.management.base import CommandError -from django.conf import settings - -from readthedocs.builds.constants import LATEST -from readthedocs.builds.models import Version -from readthedocs.projects.tasks import update_search - -log = logging.getLogger(__name__) - - -class Command(BaseCommand): - - help = __doc__ - - def add_arguments(self, parser): - parser.add_argument( - '-p', - dest='project', - default='', - help='Project to index' - ) - - def handle(self, *args, **options): - """Build/index all versions or a single project's version""" - project = options['project'] - - queryset = Version.objects.all() - - if project: - queryset = queryset.filter(project__slug=project) - if not queryset.exists(): - raise CommandError( - 'No project with slug: {slug}'.format(slug=project)) - log.info("Building all versions for %s", project) - elif getattr(settings, 'INDEX_ONLY_LATEST', True): - queryset = queryset.filter(slug=LATEST) - - for version in queryset: - log.info("Reindexing %s", version) - try: - commit = version.project.vcs_repo(version.slug).commit - except: # noqa - # An exception can be thrown here in production, but it's not - # documented what the exception here is - commit = None - - try: - update_search(version.pk, commit, - delete_non_commit_files=False) - except Exception as e: - log.exception('Reindex failed for %s, %s', version, e) diff --git a/readthedocs/projects/migrations/0028_importedfile_modified_date.py b/readthedocs/projects/migrations/0028_importedfile_modified_date.py new file mode 100644 index 00000000000..fc2f16c2387 --- /dev/null +++ b/readthedocs/projects/migrations/0028_importedfile_modified_date.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.13 on 2018-07-27 09:54 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('projects', '0027_add_htmlfile_model'), + ] + + operations = [ + migrations.AddField( + model_name='importedfile', + name='modified_date', + field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'), + preserve_default=False, + ), + ] diff --git a/readthedocs/projects/models.py b/readthedocs/projects/models.py index 1a87ff1e105..cef7b3eb4d6 100644 --- a/readthedocs/projects/models.py +++ b/readthedocs/projects/models.py @@ -13,6 +13,7 @@ from django.contrib.auth.models import User from django.core.urlresolvers import NoReverseMatch, reverse from django.db import models +from django.utils import timezone from django.utils.encoding import python_2_unicode_compatible from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _ @@ -911,6 +912,7 @@ class ImportedFile(models.Model): path = models.CharField(_('Path'), max_length=255) md5 = models.CharField(_('MD5 checksum'), max_length=255) commit = models.CharField(_('Commit'), max_length=255) + modified_date = models.DateTimeField(_('Modified date'), auto_now=True) def get_absolute_url(self): return resolve(project=self.project, version_slug=self.version.slug, filename=self.path) diff --git a/readthedocs/projects/signals.py b/readthedocs/projects/signals.py index 6ef49f9e67c..0c94d464b82 100644 --- a/readthedocs/projects/signals.py +++ b/readthedocs/projects/signals.py @@ -14,3 +14,7 @@ project_import = django.dispatch.Signal(providing_args=["project"]) files_changed = django.dispatch.Signal(providing_args=["project", "files"]) + +bulk_post_create = django.dispatch.Signal(providing_args=["instance_list"]) + +bulk_post_delete = django.dispatch.Signal(providing_args=["instance_list"]) diff --git a/readthedocs/projects/tasks.py b/readthedocs/projects/tasks.py index 88b92e24eb4..8a51bb7ba13 100644 --- a/readthedocs/projects/tasks.py +++ b/readthedocs/projects/tasks.py @@ -32,7 +32,8 @@ from .constants import LOG_TEMPLATE from .exceptions import RepositoryError from .models import ImportedFile, Project, Domain, Feature, HTMLFile -from .signals import before_vcs, after_vcs, before_build, after_build, files_changed +from .signals import before_vcs, after_vcs, before_build, after_build, files_changed, \ + bulk_post_create, bulk_post_delete from readthedocs.builds.constants import ( BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED, BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME) @@ -986,6 +987,7 @@ def _manage_imported_files(version, path, commit): :param commit: Commit that updated path """ changed_files = set() + created_html_files = [] for root, __, filenames in os.walk(path): for filename in filenames: if fnmatch.fnmatch(filename, '*.html'): @@ -1015,15 +1017,27 @@ def _manage_imported_files(version, path, commit): obj.commit = commit obj.save() - # Delete the HTMLFile first from previous versions - HTMLFile.objects.filter(project=version.project, - version=version - ).exclude(commit=commit).delete() + if model_class == HTMLFile: + # the `obj` is HTMLFile, so add it to the list + created_html_files.append(obj) + + # Send bulk_post_create signal for bulk indexing to Elasticsearch + bulk_post_create.send(sender=HTMLFile, instance_list=created_html_files) + + # Delete the HTMLFile first from previous commit and + # send bulk_post_delete signal for bulk removing from Elasticsearch + delete_queryset = (HTMLFile.objects.filter(project=version.project, version=version) + .exclude(commit=commit)) + # Keep the objects into memory to send it to signal + instance_list = list(delete_queryset) + # Safely delete from database + delete_queryset.delete() + # Always pass the list of instance, not queryset. + bulk_post_delete.send(sender=HTMLFile, instance_list=instance_list) # Delete ImportedFiles from previous versions - ImportedFile.objects.filter(project=version.project, - version=version - ).exclude(commit=commit).delete() + (ImportedFile.objects.filter(project=version.project, version=version) + .exclude(commit=commit).delete()) changed_files = [ resolve_path( version.project, filename=file, version_slug=version.slug, diff --git a/readthedocs/search/__init__.py b/readthedocs/search/__init__.py index e69de29bb2d..552c9337386 100644 --- a/readthedocs/search/__init__.py +++ b/readthedocs/search/__init__.py @@ -0,0 +1 @@ +default_app_config = 'readthedocs.search.apps.SearchConfig' diff --git a/readthedocs/search/apps.py b/readthedocs/search/apps.py new file mode 100644 index 00000000000..108a2ecc69f --- /dev/null +++ b/readthedocs/search/apps.py @@ -0,0 +1,10 @@ +"""Project app config""" + +from django.apps import AppConfig + + +class SearchConfig(AppConfig): + name = 'readthedocs.search' + + def ready(self): + from .signals import index_html_file, remove_html_file diff --git a/readthedocs/search/documents.py b/readthedocs/search/documents.py index 3cc70343c35..fbfb09aabd9 100644 --- a/readthedocs/search/documents.py +++ b/readthedocs/search/documents.py @@ -3,9 +3,9 @@ from elasticsearch_dsl.query import SimpleQueryString, Bool from readthedocs.projects.models import Project, HTMLFile -from .conf import SEARCH_EXCLUDED_FILE - from readthedocs.search.faceted_search import ProjectSearch, FileSearch +from .conf import SEARCH_EXCLUDED_FILE +from .mixins import RTDDocTypeMixin project_conf = settings.ES_INDEXES['project'] project_index = Index(project_conf['name']) @@ -17,7 +17,7 @@ @project_index.doc_type -class ProjectDocument(DocType): +class ProjectDocument(RTDDocTypeMixin, DocType): class Meta(object): model = Project @@ -47,11 +47,12 @@ def faceted_search(cls, query, language=None, using=None, index=None): @page_index.doc_type -class PageDocument(DocType): +class PageDocument(RTDDocTypeMixin, DocType): class Meta(object): model = HTMLFile fields = ('commit',) + ignore_signals = settings.ES_PAGE_IGNORE_SIGNALS project = fields.KeywordField(attr='project.slug') version = fields.KeywordField(attr='version.slug') @@ -121,21 +122,3 @@ def get_queryset(self): queryset = (queryset.filter(project__documentation_type='sphinx') .exclude(name__in=SEARCH_EXCLUDED_FILE)) return queryset - - def update(self, thing, refresh=None, action='index', **kwargs): - """Overwrite in order to index only certain files""" - # Object not exist in the provided queryset should not be indexed - # TODO: remove this overwrite when the issue has been fixed - # See below link for more information - # https://github.com/sabricot/django-elasticsearch-dsl/issues/111 - # Moreover, do not need to check if its a delete action - # Because while delete action, the object is already remove from database - if isinstance(thing, HTMLFile) and action != 'delete': - # Its a model instance. - queryset = self.get_queryset() - obj = queryset.filter(pk=thing.pk) - if not obj.exists(): - return None - - return super(PageDocument, self).update(thing=thing, refresh=refresh, - action=action, **kwargs) diff --git a/readthedocs/search/management/__init__.py b/readthedocs/search/management/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/readthedocs/search/management/commands/__init__.py b/readthedocs/search/management/commands/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/readthedocs/search/management/commands/reindex_elasticsearch.py b/readthedocs/search/management/commands/reindex_elasticsearch.py new file mode 100644 index 00000000000..ce28933dcb7 --- /dev/null +++ b/readthedocs/search/management/commands/reindex_elasticsearch.py @@ -0,0 +1,101 @@ +import datetime +import logging + +from celery import chord, chain +from django.apps import apps +from django.conf import settings +from django.core.management import BaseCommand +from django.utils import timezone +from django_elasticsearch_dsl.registries import registry + +from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index, + index_missing_objects) +from ...utils import chunk_queryset + +log = logging.getLogger(__name__) + + +class Command(BaseCommand): + + @staticmethod + def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name): + queryset = queryset.values_list('id', flat=True) + chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE) + + for chunk in chunked_queryset: + data = { + 'app_label': app_label, + 'model_name': model_name, + 'document_class': document_class, + 'index_name': index_name, + 'objects_id': list(chunk) + } + yield index_objects_to_es.si(**data) + + def _run_reindex_tasks(self, models): + for doc in registry.get_documents(models): + queryset = doc().get_queryset() + # Get latest object from the queryset + index_time = timezone.now() + + app_label = queryset.model._meta.app_label + model_name = queryset.model.__name__ + + index_name = doc._doc_type.index + timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + new_index_name = "{}_{}".format(index_name, timestamp) + + pre_index_task = create_new_es_index.si(app_label=app_label, + model_name=model_name, + index_name=index_name, + new_index_name=new_index_name) + + indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name, + queryset=queryset, + document_class=str(doc), + index_name=new_index_name) + + post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name, + index_name=index_name, + new_index_name=new_index_name) + + # Task to run in order to add the objects + # that has been inserted into database while indexing_tasks was running + # We pass the creation time of latest object, so its possible to index later items + missed_index_task = index_missing_objects.si(app_label=app_label, + model_name=model_name, + document_class=str(doc), + index_generation_time=index_time) + + # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords + chord_tasks = chord(header=indexing_tasks, body=post_index_task) + # http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain + chain(pre_index_task, chord_tasks, missed_index_task).apply_async() + + message = ("Successfully issued tasks for {}.{}, total {} items" + .format(app_label, model_name, queryset.count())) + log.info(message) + + def add_arguments(self, parser): + parser.add_argument( + '--models', + dest='models', + type=str, + nargs='*', + help=("Specify the model to be updated in elasticsearch." + "The format is .") + ) + + def handle(self, *args, **options): + """ + Index models into Elasticsearch index asynchronously using celery. + + You can specify model to get indexed by passing + `--model .` parameter. + Otherwise, it will reindex all the models + """ + models = None + if options['models']: + models = [apps.get_model(model_name) for model_name in options['models']] + + self._run_reindex_tasks(models=models) diff --git a/readthedocs/search/mixins.py b/readthedocs/search/mixins.py new file mode 100644 index 00000000000..05a9bb8650b --- /dev/null +++ b/readthedocs/search/mixins.py @@ -0,0 +1,68 @@ +from django.db import models +from django.core.paginator import Paginator + + +class RTDDocTypeMixin(object): + + """ + Override some methods of DocType of DED + + Changelog as following: + - Do not index object that not exist in the provided queryset + - Take additional argument in update method `index_name` to update specific index + Issues: + - https://github.com/sabricot/django-elasticsearch-dsl/issues/111 + """ + + def _prepare_action(self, object_instance, action, index_name=None): + """Overwrite to take `index_name` from parameters for setting index dynamically""" + return { + '_op_type': action, + '_index': index_name or str(self._doc_type.index), + '_type': self._doc_type.mapping.doc_type, + '_id': object_instance.pk, + '_source': ( + self.prepare(object_instance) if action != 'delete' else None + ), + } + + def _get_actions(self, object_list, action, index_name=None): + """Overwrite to take `index_name` from parameters for setting index dynamically""" + if self._doc_type.queryset_pagination is not None: + paginator = Paginator( + object_list, self._doc_type.queryset_pagination + ) + for page in paginator.page_range: + for object_instance in paginator.page(page).object_list: + yield self._prepare_action(object_instance, action, index_name) + else: + for object_instance in object_list: + yield self._prepare_action(object_instance, action, index_name) + + def update(self, thing, refresh=None, action='index', index_name=None, **kwargs): + """Update each document in ES for a model, iterable of models or queryset""" + if refresh is True or ( + refresh is None and self._doc_type.auto_refresh + ): + kwargs['refresh'] = True + + # TODO: remove this overwrite when the issue has been fixed + # https://github.com/sabricot/django-elasticsearch-dsl/issues/111 + if isinstance(thing, models.Model): + # Its a model instance. + + # Do not need to check if its a delete action + # Because while delete action, the object is already remove from database + if action != 'delete': + queryset = self.get_queryset() + obj = queryset.filter(pk=thing.pk) + if not obj.exists(): + return None + + object_list = [thing] + else: + object_list = thing + + return self.bulk( + self._get_actions(object_list, action, index_name=index_name), **kwargs + ) diff --git a/readthedocs/search/signals.py b/readthedocs/search/signals.py index 6abdf64cce9..6e335e06d0a 100644 --- a/readthedocs/search/signals.py +++ b/readthedocs/search/signals.py @@ -1,7 +1,37 @@ """We define custom Django signals to trigger before executing searches.""" from __future__ import absolute_import import django.dispatch +from django.dispatch import receiver +from django_elasticsearch_dsl.apps import DEDConfig +from django_elasticsearch_dsl.registries import registry + +from readthedocs.projects.models import HTMLFile +from readthedocs.projects.signals import bulk_post_create, bulk_post_delete +from readthedocs.search.documents import PageDocument +from readthedocs.search.tasks import index_objects_to_es before_project_search = django.dispatch.Signal(providing_args=["body"]) before_file_search = django.dispatch.Signal(providing_args=["body"]) before_section_search = django.dispatch.Signal(providing_args=["body"]) + + +@receiver(bulk_post_create, sender=HTMLFile) +def index_html_file(instance_list, **_): + kwargs = { + 'app_label': HTMLFile._meta.app_label, + 'model_name': HTMLFile.__name__, + 'document_class': str(PageDocument), + 'index_name': None, # No need to change the index name + 'objects_id': [obj.id for obj in instance_list], + } + + # Do not index if autosync is disabled globally + if DEDConfig.autosync_enabled(): + index_objects_to_es(**kwargs) + + +@receiver(bulk_post_delete, sender=HTMLFile) +def remove_html_file(instance_list, **_): + # Do not index if autosync is disabled globally + if DEDConfig.autosync_enabled(): + registry.delete(instance_list) diff --git a/readthedocs/search/tasks.py b/readthedocs/search/tasks.py new file mode 100644 index 00000000000..f702b452656 --- /dev/null +++ b/readthedocs/search/tasks.py @@ -0,0 +1,99 @@ +import logging + +from django.apps import apps +from django_elasticsearch_dsl.registries import registry + +from readthedocs.worker import app + +log = logging.getLogger(__name__) + + +def _get_index(indices, index_name): + """ + Get Index from all the indices + + :param indices: DED indices list + :param index_name: Name of the index + :return: DED Index + """ + for index in indices: + if str(index) == index_name: + return index + + +def _get_document(model, document_class): + """ + Get DED document class object from the model and name of document class + + :param model: The model class to find the document + :param document_class: the name of the document class. + :return: DED DocType object + """ + documents = registry.get_documents(models=[model]) + + for document in documents: + if str(document) == document_class: + return document + + +@app.task(queue='web') +def create_new_es_index(app_label, model_name, index_name, new_index_name): + model = apps.get_model(app_label, model_name) + indices = registry.get_indices(models=[model]) + old_index = _get_index(indices=indices, index_name=index_name) + new_index = old_index.clone(name=new_index_name) + new_index.create() + + +@app.task(queue='web') +def switch_es_index(app_label, model_name, index_name, new_index_name): + model = apps.get_model(app_label, model_name) + indices = registry.get_indices(models=[model]) + old_index = _get_index(indices=indices, index_name=index_name) + new_index = old_index.clone(name=new_index_name) + old_index_actual_name = None + + if old_index.exists(): + # Alias can not be used to delete an index. + # https://www.elastic.co/guide/en/elasticsearch/reference/6.0/indices-delete-index.html + # So get the index actual name to delete it + old_index_info = old_index.get() + # The info is a dictionary and the key is the actual name of the index + old_index_actual_name = old_index_info.keys()[0] + + # Put alias into the new index name and delete the old index if its exist + new_index.put_alias(name=index_name) + if old_index_actual_name: + old_index.connection.indices.delete(index=old_index_actual_name) + + +@app.task(queue='web') +def index_objects_to_es(app_label, model_name, document_class, index_name, objects_id): + model = apps.get_model(app_label, model_name) + document = _get_document(model=model, document_class=document_class) + + # Use queryset from model as the ids are specific + queryset = model.objects.all().filter(id__in=objects_id).iterator() + log.info("Indexing model: {}, id:'{}'".format(model.__name__, objects_id)) + document().update(queryset, index_name=index_name) + + +@app.task(queue='web') +def index_missing_objects(app_label, model_name, document_class, index_generation_time): + """ + Task to insure that none of the object is missed from indexing. + + The object ids are sent to `index_objects_to_es` task for indexing. + While the task is running, new objects can be created/deleted in database + and they will not be in the tasks for indexing into ES. + This task will index all the objects that got into DB after the `latest_indexed` timestamp + to ensure that everything is in ES index. + """ + model = apps.get_model(app_label, model_name) + document = _get_document(model=model, document_class=document_class) + queryset = document().get_queryset().exclude(modified_date__lte=index_generation_time) + document().update(queryset.iterator()) + + log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__)) + + # TODO: Figure out how to remove the objects from ES index that has been deleted diff --git a/readthedocs/search/tests/conftest.py b/readthedocs/search/tests/conftest.py index 52c4811342e..4e46679732c 100644 --- a/readthedocs/search/tests/conftest.py +++ b/readthedocs/search/tests/conftest.py @@ -58,9 +58,5 @@ def get_dummy_processed_json(instance): @pytest.fixture(autouse=True) def mock_processed_json(mocker): - - # patch the function from `projects.tasks` because it has been point to there - # http://www.voidspace.org.uk/python/mock/patch.html#where-to-patch mocked_function = mocker.patch.object(HTMLFile, 'get_processed_json', autospec=True) mocked_function.side_effect = get_dummy_processed_json - diff --git a/readthedocs/search/utils.py b/readthedocs/search/utils.py index 659effb1544..4a8b8019dfa 100644 --- a/readthedocs/search/utils.py +++ b/readthedocs/search/utils.py @@ -321,3 +321,12 @@ def get_project_list_or_404(project_slug, user): project_list = list(subprojects) + [project] return project_list + + +def chunk_queryset(queryset, chunk_size): + """Yield successive `chunk_size` chunks of queryset.""" + # Based on https://stackoverflow.com/a/312464 + # licensed under cc by-sa 3.0 + total = queryset.count() + for i in range(0, total, chunk_size): + yield queryset[i:i + chunk_size] diff --git a/readthedocs/settings/base.py b/readthedocs/settings/base.py index 7e8ef5ff654..c44bd9e9901 100644 --- a/readthedocs/settings/base.py +++ b/readthedocs/settings/base.py @@ -324,6 +324,9 @@ def USE_PROMOS(self): # noqa 'hosts': '127.0.0.1:9200' }, } + # Chunk size for elasticsearch reindex celery tasks + ES_TASK_CHUNK_SIZE = 100 + ES_PAGE_IGNORE_SIGNALS = True # ANALYZER = 'analysis': { # 'analyzer': { diff --git a/readthedocs/settings/dev.py b/readthedocs/settings/dev.py index 7fa4dafe959..93b08d6bbc6 100644 --- a/readthedocs/settings/dev.py +++ b/readthedocs/settings/dev.py @@ -48,6 +48,9 @@ def DATABASES(self): # noqa 'test:8000', ) + # Disable auto syncing elasticsearch documents in development + ELASTICSEARCH_DSL_AUTOSYNC = False + @property def LOGGING(self): # noqa - avoid pep8 N802 logging = super(CommunityDevSettings, self).LOGGING diff --git a/readthedocs/settings/test.py b/readthedocs/settings/test.py index 79c7486e3cb..d923b2359fa 100644 --- a/readthedocs/settings/test.py +++ b/readthedocs/settings/test.py @@ -16,6 +16,8 @@ class CommunityTestSettings(CommunityDevSettings): DEBUG = False TEMPLATE_DEBUG = False + ES_PAGE_IGNORE_SIGNALS = False + ELASTICSEARCH_DSL_AUTOSYNC = True @property def ES_INDEXES(self): # noqa - avoid pep8 N802