Skip to content

[Fix #4333] Implement asynchronous search reindex functionality using celery #4368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions readthedocs/projects/migrations/0028_importedfile_modified_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2018-07-27 09:54
from __future__ import unicode_literals

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('projects', '0027_add_htmlfile_model'),
]

operations = [
migrations.AddField(
model_name='importedfile',
name='modified_date',
field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'),
preserve_default=False,
),
]
2 changes: 2 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch, reverse
from django.db import models
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -911,6 +912,7 @@ class ImportedFile(models.Model):
path = models.CharField(_('Path'), max_length=255)
md5 = models.CharField(_('MD5 checksum'), max_length=255)
commit = models.CharField(_('Commit'), max_length=255)
modified_date = models.DateTimeField(_('Modified date'), auto_now=True)

def get_absolute_url(self):
return resolve(project=self.project, version_slug=self.version.slug, filename=self.path)
Expand Down
54 changes: 29 additions & 25 deletions readthedocs/search/management/commands/reindex_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,73 @@
from django.core.management import BaseCommand
from django_elasticsearch_dsl.registries import registry

from ...tasks import (index_objects_to_es_task, switch_es_index_task, create_new_es_index_task,
index_missing_objects_task)
from ...utils import chunks
from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index,
index_missing_objects)
from ...utils import chunk_queryset

log = logging.getLogger(__name__)


class Command(BaseCommand):

@staticmethod
def _get_indexing_tasks(app_label, model_name, instance_ids, document_class, index_name):
chunk_objects = chunks(instance_ids, settings.ES_TASK_CHUNK_SIZE)
def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name):
queryset = queryset.values_list('id', flat=True)
chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE)

for chunk in chunk_objects:
for chunk in chunked_queryset:
data = {
'app_label': app_label,
'model_name': model_name,
'document_class': document_class,
'index_name': index_name,
'objects_id': chunk
'objects_id': list(chunk)
}
yield index_objects_to_es_task.si(**data)
yield index_objects_to_es.si(**data)

def _run_reindex_tasks(self, models):
for doc in registry.get_documents(models):
qs = doc().get_queryset()
instance_ids = list(qs.values_list('id', flat=True))
queryset = doc().get_queryset()
# Get latest object from the queryset
latest_object = queryset.latest('modified_date')
latest_object_time = latest_object.modified_date
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just use the current time, not query the entire queryset for the time. This will be quite a slow query likely in prod.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah! I was also thinking about it.


app_label = qs.model._meta.app_label
model_name = qs.model.__name__
app_label = queryset.model._meta.app_label
model_name = queryset.model.__name__

index_name = doc._doc_type.index
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
new_index_name = "{}_{}".format(index_name, timestamp)

pre_index_task = create_new_es_index_task.si(app_label=app_label,
model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)
pre_index_task = create_new_es_index.si(app_label=app_label,
model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
instance_ids=instance_ids,
queryset=queryset,
document_class=str(doc),
index_name=new_index_name)

post_index_task = switch_es_index_task.si(app_label=app_label, model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)
post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

# Task to run in order to add the objects
# that has been inserted into database while indexing_tasks was running
missed_index_task = index_missing_objects_task.si(app_label=app_label,
model_name=model_name,
document_class=str(doc),
indexed_instance_ids=instance_ids)
# We pass the creation time of latest object, so its possible to index later items
missed_index_task = index_missing_objects.si(app_label=app_label,
model_name=model_name,
document_class=str(doc),
latest_indexed=latest_object_time)

# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
chain(pre_index_task, chord_tasks, missed_index_task).apply_async()

message = ("Successfully issued tasks for {}.{}, total {} items"
.format(app_label, model_name, len(instance_ids)))
.format(app_label, model_name, queryset.count()))
log.info(message)

def add_arguments(self, parser):
Expand Down
4 changes: 2 additions & 2 deletions readthedocs/search/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from readthedocs.projects.models import HTMLFile
from readthedocs.projects.signals import bulk_post_create, bulk_post_delete
from readthedocs.search.documents import PageDocument
from readthedocs.search.tasks import index_objects_to_es_task
from readthedocs.search.tasks import index_objects_to_es

before_project_search = django.dispatch.Signal(providing_args=["body"])
before_file_search = django.dispatch.Signal(providing_args=["body"])
Expand All @@ -27,7 +27,7 @@ def index_html_file(instance_list, **_):

# Do not index if autosync is disabled globally
if DEDConfig.autosync_enabled():
index_objects_to_es_task(**kwargs)
index_objects_to_es(**kwargs)


@receiver(bulk_post_delete, sender=HTMLFile)
Expand Down
33 changes: 24 additions & 9 deletions readthedocs/search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,26 @@


def _get_index(indices, index_name):
"""
Get Index from all the indices

:param indices: DED indices list
:param index_name: Name of the index
:return: DED Index
"""
for index in indices:
if str(index) == index_name:
return index


def _get_document(model, document_class):
"""
Get DED document class object from the model and name of document class

:param model: The model class to find the document
:param document_class: the name of the document class.
:return: DED DocType object
"""
documents = registry.get_documents(models=[model])

for document in documents:
Expand All @@ -23,7 +37,7 @@ def _get_document(model, document_class):


@app.task(queue='web')
def create_new_es_index_task(app_label, model_name, index_name, new_index_name):
def create_new_es_index(app_label, model_name, index_name, new_index_name):
model = apps.get_model(app_label, model_name)
indices = registry.get_indices(models=[model])
old_index = _get_index(indices=indices, index_name=index_name)
Expand All @@ -32,7 +46,7 @@ def create_new_es_index_task(app_label, model_name, index_name, new_index_name):


@app.task(queue='web')
def switch_es_index_task(app_label, model_name, index_name, new_index_name):
def switch_es_index(app_label, model_name, index_name, new_index_name):
model = apps.get_model(app_label, model_name)
indices = registry.get_indices(models=[model])
old_index = _get_index(indices=indices, index_name=index_name)
Expand All @@ -54,7 +68,7 @@ def switch_es_index_task(app_label, model_name, index_name, new_index_name):


@app.task(queue='web')
def index_objects_to_es_task(app_label, model_name, document_class, index_name, objects_id):
def index_objects_to_es(app_label, model_name, document_class, index_name, objects_id):
model = apps.get_model(app_label, model_name)
document = _get_document(model=model, document_class=document_class)

Expand All @@ -65,18 +79,19 @@ def index_objects_to_es_task(app_label, model_name, document_class, index_name,


@app.task(queue='web')
def index_missing_objects_task(app_label, model_name, document_class, indexed_instance_ids):
def index_missing_objects(app_label, model_name, document_class, latest_indexed):
"""
Task to insure that none of the object is missed from indexing.

The object ids are sent to task for indexing.
But in the meantime, new objects can be created/deleted in database
and they will not be in the tasks.
This task will index all the objects excluding the ones which have got indexed already
The object ids are sent to `index_objects_to_es` task for indexing.
While the task is running, new objects can be created/deleted in database
and they will not be in the tasks for indexing into ES.
This task will index all the objects that got into DB after the `latest_indexed` timestamp
to ensure that everything is in ES index.
"""
model = apps.get_model(app_label, model_name)
document = _get_document(model=model, document_class=document_class)
queryset = document().get_queryset().exclude(id__in=indexed_instance_ids)
queryset = document().get_queryset().exclude(modified_date__lte=latest_indexed)
document().update(queryset.iterator())

log.info("Indexed {} missing objects from model: {}'".format(queryset.count(), model.__name__))
Expand Down
11 changes: 6 additions & 5 deletions readthedocs/search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,10 @@ def get_project_list_or_404(project_slug, user):
return project_list


def chunks(elements, chunk_size):
"""Yield successive `chunk_size` chunks from elements."""
# Taken from https://stackoverflow.com/a/312464
def chunk_queryset(queryset, chunk_size):
"""Yield successive `chunk_size` chunks of queryset."""
# Based on https://stackoverflow.com/a/312464
# licensed under cc by-sa 3.0
for i in range(0, len(elements), chunk_size):
yield elements[i:i + chunk_size]
total = queryset.count()
for i in range(0, total, chunk_size):
yield queryset[i:i + chunk_size]