Skip to content

Commit cc1a02d

Browse files
authored
Search: improve re-index management command (#7904)
1 parent 61738e1 commit cc1a02d

File tree

2 files changed

+191
-64
lines changed

2 files changed

+191
-64
lines changed
Lines changed: 190 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
import datetime
1+
import itertools
22
import logging
3+
import textwrap
4+
from datetime import datetime, timedelta
35

4-
from celery import chain, chord
56
from django.apps import apps
67
from django.conf import settings
78
from django.core.management import BaseCommand
8-
from django.utils import timezone
99
from django_elasticsearch_dsl.registries import registry
1010

11-
from ...tasks import (
11+
from readthedocs.builds.models import Version
12+
from readthedocs.projects.models import HTMLFile, Project
13+
from readthedocs.search.tasks import (
1214
create_new_es_index,
1315
index_missing_objects,
1416
index_objects_to_es,
@@ -23,7 +25,7 @@ class Command(BaseCommand):
2325
@staticmethod
2426
def _get_indexing_tasks(app_label, model_name, index_name, queryset, document_class):
2527
chunk_size = settings.ES_TASK_CHUNK_SIZE
26-
qs_iterator = queryset.only('pk').iterator()
28+
qs_iterator = queryset.values_list('pk', flat=True).iterator()
2729
is_iterator_empty = False
2830

2931
data = {
@@ -32,21 +34,13 @@ def _get_indexing_tasks(app_label, model_name, index_name, queryset, document_cl
3234
'document_class': document_class,
3335
'index_name': index_name,
3436
}
35-
36-
while not is_iterator_empty:
37-
objects_id = []
38-
39-
try:
40-
for _ in range(chunk_size):
41-
pk = next(qs_iterator).pk
42-
objects_id.append(pk)
43-
44-
if pk % 5000 == 0:
45-
log.info('Total: %s', pk)
46-
47-
except StopIteration:
48-
is_iterator_empty = True
49-
37+
current = 0
38+
while True:
39+
objects_id = list(itertools.islice(qs_iterator, chunk_size))
40+
if not objects_id:
41+
break
42+
current += len(objects_id)
43+
log.info('Total: %s', current)
5044
data['objects_id'] = objects_id
5145
yield index_objects_to_es.si(**data)
5246

@@ -58,56 +52,141 @@ def _run_reindex_tasks(self, models, queue):
5852
else:
5953
log.info('Adding indexing tasks to default queue')
6054

61-
index_time = timezone.now()
62-
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
55+
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
6356

6457
for doc in registry.get_documents(models):
6558
queryset = doc().get_queryset()
66-
# Get latest object from the queryset
6759

6860
app_label = queryset.model._meta.app_label
6961
model_name = queryset.model.__name__
7062

7163
index_name = doc._index._name
7264
new_index_name = "{}_{}".format(index_name, timestamp)
73-
# Set index temporarily for indexing,
74-
# this will only get set during the running of this command
65+
66+
# Set and create a temporal index for indexing.
67+
create_new_es_index(
68+
app_label=app_label,
69+
model_name=model_name,
70+
index_name=index_name,
71+
new_index_name=new_index_name,
72+
)
7573
doc._index._name = new_index_name
74+
log.info('Temporal index created: %s', new_index_name)
75+
76+
indexing_tasks = self._get_indexing_tasks(
77+
app_label=app_label,
78+
model_name=model_name,
79+
queryset=queryset,
80+
index_name=new_index_name,
81+
document_class=str(doc),
82+
)
83+
for task in indexing_tasks:
84+
task.apply_async(**apply_async_kwargs)
85+
86+
log.info(
87+
"Tasks issued successfully. model=%s.%s items=%s",
88+
app_label, model_name, str(queryset.count())
89+
)
90+
return timestamp
91+
92+
def _change_index(self, models, timestamp):
93+
for doc in registry.get_documents(models):
94+
queryset = doc().get_queryset()
95+
app_label = queryset.model._meta.app_label
96+
model_name = queryset.model.__name__
97+
index_name = doc._index._name
98+
new_index_name = "{}_{}".format(index_name, timestamp)
99+
switch_es_index(
100+
app_label=app_label,
101+
model_name=model_name,
102+
index_name=index_name,
103+
new_index_name=new_index_name,
104+
)
105+
log.info(
106+
"Index name changed. model=%s.%s from=%s to=%s",
107+
app_label, model_name, new_index_name, index_name,
108+
)
109+
110+
def _reindex_from(self, days_ago, models, queue=None):
111+
functions = {
112+
apps.get_model('projects.HTMLFile'): self._reindex_files_from,
113+
apps.get_model('projects.Project'): self._reindex_projects_from,
114+
}
115+
models = models or functions.keys()
116+
for model in models:
117+
if model not in functions:
118+
log.warning("Re-index from not available for model %s", model.__name__)
119+
continue
120+
functions[model](days_ago=days_ago, queue=queue)
121+
122+
def _reindex_projects_from(self, days_ago, queue=None):
123+
"""Reindex projects with recent changes."""
124+
since = datetime.now() - timedelta(days=days_ago)
125+
queryset = Project.objects.filter(modified_date__gte=since).distinct()
126+
app_label = Project._meta.app_label
127+
model_name = Project.__name__
128+
apply_async_kwargs = {}
129+
if queue:
130+
apply_async_kwargs['queue'] = queue
131+
132+
for doc in registry.get_documents(models=[Project]):
133+
indexing_tasks = self._get_indexing_tasks(
134+
app_label=app_label,
135+
model_name=model_name,
136+
queryset=queryset,
137+
index_name=doc._index._name,
138+
document_class=str(doc),
139+
)
140+
for task in indexing_tasks:
141+
task.apply_async(**apply_async_kwargs)
142+
log.info(
143+
"Tasks issued successfully. model=%s.%s items=%s",
144+
app_label, model_name, str(queryset.count())
145+
)
146+
147+
def _reindex_files_from(self, days_ago, queue=None):
148+
"""Reindex HTML files from versions with recent builds."""
149+
chunk_size = settings.ES_TASK_CHUNK_SIZE
150+
since = datetime.now() - timedelta(days=days_ago)
151+
queryset = Version.objects.filter(builds__date__gte=since).distinct()
152+
app_label = HTMLFile._meta.app_label
153+
model_name = HTMLFile.__name__
154+
apply_async_kwargs = {
155+
'kwargs': {
156+
'app_label': app_label,
157+
'model_name': model_name,
158+
},
159+
}
160+
if queue:
161+
apply_async_kwargs['queue'] = queue
162+
163+
for doc in registry.get_documents(models=[HTMLFile]):
164+
apply_async_kwargs['kwargs']['document_class'] = str(doc)
165+
for version in queryset.iterator():
166+
project = version.project
167+
files_qs = (
168+
HTMLFile.objects
169+
.filter(version=version)
170+
.values_list('pk', flat=True)
171+
.iterator()
172+
)
173+
current = 0
174+
while True:
175+
objects_id = list(itertools.islice(files_qs, chunk_size))
176+
if not objects_id:
177+
break
178+
current += len(objects_id)
179+
log.info(
180+
'Re-indexing files. version=%s:%s total=%s',
181+
project.slug, version.slug, current,
182+
)
183+
apply_async_kwargs['kwargs']['objects_id'] = objects_id
184+
index_objects_to_es.apply_async(**apply_async_kwargs)
76185

77-
pre_index_task = create_new_es_index.si(app_label=app_label,
78-
model_name=model_name,
79-
index_name=index_name,
80-
new_index_name=new_index_name)
81-
82-
indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
83-
queryset=queryset,
84-
index_name=new_index_name,
85-
document_class=str(doc))
86-
87-
post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name,
88-
index_name=index_name,
89-
new_index_name=new_index_name)
90-
91-
# Task to run in order to add the objects
92-
# that has been inserted into database while indexing_tasks was running
93-
# We pass the creation time of latest object, so its possible to index later items
94-
missed_index_task = index_missing_objects.si(app_label=app_label,
95-
model_name=model_name,
96-
document_class=str(doc),
97-
index_generation_time=index_time)
98-
99-
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
100-
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
101-
if queue:
102-
pre_index_task.set(queue=queue)
103-
chord_tasks.set(queue=queue)
104-
missed_index_task.set(queue=queue)
105-
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
106-
chain(pre_index_task, chord_tasks, missed_index_task).apply_async(**apply_async_kwargs)
107-
108-
message = ("Successfully issued tasks for {}.{}, total {} items"
109-
.format(app_label, model_name, queryset.count()))
110-
log.info(message)
186+
log.info(
187+
"Tasks issued successfully. version=%s:%s items=%s",
188+
project.slug, version.slug, str(current),
189+
)
111190

112191
def add_arguments(self, parser):
113192
parser.add_argument(
@@ -116,13 +195,34 @@ def add_arguments(self, parser):
116195
action='store',
117196
help="Set the celery queue name for the task."
118197
)
198+
parser.add_argument(
199+
'--change-index',
200+
dest='change_index',
201+
action='store',
202+
help=(
203+
"Change the index to the new one using the given timestamp and delete the old one. "
204+
"**This should be run after a re-index is completed**."
205+
),
206+
)
207+
parser.add_argument(
208+
'--update-from',
209+
dest='update_from',
210+
type=int,
211+
action='store',
212+
help=(
213+
"Re-index the models from the given days. "
214+
"This should be run after a re-index."
215+
),
216+
)
119217
parser.add_argument(
120218
'--models',
121219
dest='models',
122220
type=str,
123221
nargs='*',
124-
help=("Specify the model to be updated in elasticsearch."
125-
"The format is <app_label>.<model_name>")
222+
help=(
223+
"Specify the model to be updated in elasticsearch. "
224+
"The format is <app_label>.<model_name>"
225+
),
126226
)
127227

128228
def handle(self, *args, **options):
@@ -131,7 +231,7 @@ def handle(self, *args, **options):
131231
132232
You can specify model to get indexed by passing
133233
`--model <app_label>.<model_name>` parameter.
134-
Otherwise, it will reindex all the models
234+
Otherwise, it will re-index all the models
135235
"""
136236
models = None
137237
if options['models']:
@@ -141,4 +241,31 @@ def handle(self, *args, **options):
141241
if options.get('queue'):
142242
queue = options['queue']
143243

144-
self._run_reindex_tasks(models=models, queue=queue)
244+
change_index = options['change_index']
245+
update_from = options['update_from']
246+
if change_index:
247+
timestamp = change_index
248+
self._change_index(models=models, timestamp=timestamp)
249+
print(textwrap.dedent(
250+
"""
251+
Indexes had been changed.
252+
253+
Remember to re-index changed projects and versions with the
254+
`--update-from n` argument,
255+
where `n` is the number of days since the re-index.
256+
"""
257+
))
258+
elif update_from:
259+
self._reindex_from(days_ago=update_from, models=models, queue=queue)
260+
else:
261+
timestamp = self._run_reindex_tasks(models=models, queue=queue)
262+
print(textwrap.dedent(
263+
f"""
264+
Re-indexing tasks have been created.
265+
Timestamp: {timestamp}
266+
267+
Please monitor the tasks.
268+
After they are completed run the same command with the
269+
`--change-index {timestamp}` argument.
270+
"""
271+
))

readthedocs/settings/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ def DOCKER_LIMITS(self):
610610
},
611611
}
612612
# Chunk size for elasticsearch reindex celery tasks
613-
ES_TASK_CHUNK_SIZE = 100
613+
ES_TASK_CHUNK_SIZE = 500
614614

615615
# Info from Honza about this:
616616
# The key to determine shard number is actually usually not the node count,

0 commit comments

Comments
 (0)