9
9
10
10
from ...tasks import (index_objects_to_es , switch_es_index , create_new_es_index ,
11
11
index_missing_objects )
12
- from ...utils import chunks
12
+ from ...utils import chunk_queryset
13
13
14
14
log = logging .getLogger (__name__ )
15
15
16
16
17
17
class Command (BaseCommand ):
18
18
19
19
@staticmethod
20
- def _get_indexing_tasks (app_label , model_name , instance_ids , document_class , index_name ):
21
- chunk_objects = chunks (instance_ids , settings .ES_TASK_CHUNK_SIZE )
20
+ def _get_indexing_tasks (app_label , model_name , queryset , document_class , index_name ):
21
+ queryset = queryset .values_list ('id' , flat = True )
22
+ chunked_queryset = chunk_queryset (queryset , settings .ES_TASK_CHUNK_SIZE )
22
23
23
- for chunk in chunk_objects :
24
+ for chunk in chunked_queryset :
24
25
data = {
25
26
'app_label' : app_label ,
26
27
'model_name' : model_name ,
27
28
'document_class' : document_class ,
28
29
'index_name' : index_name ,
29
- 'objects_id' : chunk
30
+ 'objects_id' : list ( chunk )
30
31
}
31
32
yield index_objects_to_es .si (** data )
32
33
33
34
def _run_reindex_tasks (self , models ):
34
35
for doc in registry .get_documents (models ):
35
- qs = doc ().get_queryset ()
36
- instance_ids = list (qs .values_list ('id' , flat = True ))
36
+ queryset = doc ().get_queryset ()
37
+ # Get latest object from the queryset
38
+ latest_object = queryset .latest ('modified_date' )
39
+ latest_object_time = latest_object .modified_date
37
40
38
- app_label = qs .model ._meta .app_label
39
- model_name = qs .model .__name__
41
+ app_label = queryset .model ._meta .app_label
42
+ model_name = queryset .model .__name__
40
43
41
44
index_name = doc ._doc_type .index
42
45
timestamp = datetime .datetime .now ().strftime ('%Y%m%d%H%M%S' )
@@ -48,7 +51,7 @@ def _run_reindex_tasks(self, models):
48
51
new_index_name = new_index_name )
49
52
50
53
indexing_tasks = self ._get_indexing_tasks (app_label = app_label , model_name = model_name ,
51
- instance_ids = instance_ids ,
54
+ queryset = queryset ,
52
55
document_class = str (doc ),
53
56
index_name = new_index_name )
54
57
@@ -58,18 +61,19 @@ def _run_reindex_tasks(self, models):
58
61
59
62
# Task to run in order to add the objects
60
63
# that has been inserted into database while indexing_tasks was running
64
+ # We pass the creation time of latest object, so its possible to index later items
61
65
missed_index_task = index_missing_objects .si (app_label = app_label ,
62
66
model_name = model_name ,
63
67
document_class = str (doc ),
64
- indexed_instance_ids = instance_ids )
68
+ latest_indexed = latest_object_time )
65
69
66
70
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
67
71
chord_tasks = chord (header = indexing_tasks , body = post_index_task )
68
72
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
69
73
chain (pre_index_task , chord_tasks , missed_index_task ).apply_async ()
70
74
71
75
message = ("Successfully issued tasks for {}.{}, total {} items"
72
- .format (app_label , model_name , len ( instance_ids )))
76
+ .format (app_label , model_name , queryset . count ( )))
73
77
log .info (message )
74
78
75
79
def add_arguments (self , parser ):
0 commit comments