@@ -32,22 +32,20 @@ def initialize_library(cls, arctic_lib, **kwargs):
32
32
33
33
@mongo_retry
34
34
def _ensure_index (self ):
35
- collection = self ._collection .symbols
36
- collection .create_index ([("symbol" , pymongo .ASCENDING ),
37
- ("_id" , pymongo .DESCENDING )],
38
- background = True )
39
-
40
- collection = self ._collection
41
- collection .create_index ([('symbol' , pymongo .HASHED )], background = True )
42
- collection .create_index ([('symbol' , pymongo .ASCENDING ),
43
- ('sha' , pymongo .ASCENDING )],
44
- unique = True ,
45
- background = True )
46
- collection .create_index ([('symbol' , pymongo .ASCENDING ),
47
- ('parent' , pymongo .ASCENDING ),
48
- ('start' , pymongo .ASCENDING ),
49
- ('end' , pymongo .ASCENDING )],
50
- unique = True , background = True )
35
+ self ._symbols .create_index ([("symbol" , pymongo .ASCENDING )],
36
+ unique = True ,
37
+ background = True )
38
+
39
+ self ._collection .create_index ([('symbol' , pymongo .HASHED )],
40
+ background = True )
41
+ self ._collection .create_index ([('symbol' , pymongo .ASCENDING ),
42
+ ('sha' , pymongo .ASCENDING )],
43
+ unique = True ,
44
+ background = True )
45
+ self ._collection .create_index ([('symbol' , pymongo .ASCENDING ),
46
+ ('start' , pymongo .ASCENDING ),
47
+ ('end' , pymongo .ASCENDING )],
48
+ unique = True , background = True )
51
49
52
50
@mongo_retry
53
51
def __init__ (self , arctic_lib , chunker = DateChunker ()):
@@ -123,7 +121,6 @@ def read(self, symbol, chunk_range=None):
123
121
raise NoDataFoundException ('No data found for %s in library %s' % (symbol , self ._collection .get_name ()))
124
122
125
123
spec = {'symbol' : symbol ,
126
- 'parent' : sym ['_id' ],
127
124
}
128
125
129
126
if chunk_range is not None :
@@ -159,7 +156,7 @@ def write(self, symbol, item, chunk_size):
159
156
A chunk size that is understood by the specified chunker
160
157
"""
161
158
162
- doc = {'_id' : bson . ObjectId () }
159
+ doc = {}
163
160
doc ['symbol' ] = symbol
164
161
doc ['chunk_size' ] = chunk_size
165
162
@@ -170,6 +167,12 @@ def write(self, symbol, item, chunk_size):
170
167
else :
171
168
raise Exception ("Can only chunk Series and DataFrames" )
172
169
170
+ previous_shas = []
171
+ if self ._get_symbol_info (symbol ):
172
+ previous_shas = set ([x ['sha' ] for x in self ._collection .find ({'symbol' : symbol },
173
+ projection = {'sha' : True , '_id' : False },
174
+ )])
175
+
173
176
records = []
174
177
ranges = []
175
178
dtype = None
@@ -184,12 +187,6 @@ def write(self, symbol, item, chunk_size):
184
187
if record .dtype .hasobject :
185
188
raise UnhandledDtypeException ()
186
189
187
- sym = self ._get_symbol_info (symbol )
188
- if sym :
189
- # if the symbol already exists, we are basically overwriting it
190
- # clean up the data before we orphan the symbol chunks
191
- self .delete (symbol )
192
-
193
190
doc ['dtype' ] = str (dtype )
194
191
doc ['shape' ] = (- 1 ,) + item .shape [1 :]
195
192
doc ['dtype_metadata' ] = dict (dtype .metadata or {})
@@ -210,16 +207,25 @@ def write(self, symbol, item, chunk_size):
210
207
chunk ['end' ] = end
211
208
chunk ['symbol' ] = symbol
212
209
chunk ['sha' ] = checksum (symbol , chunk )
213
- bulk .find ({'symbol' : symbol , 'sha' : chunk ['sha' ], 'start' : chunk ['start' ]}
214
- ).upsert ().update_one ({'$set' : chunk , '$addToSet' : {'parent' : doc ['_id' ]}})
210
+ if chunk ['sha' ] not in previous_shas :
211
+ bulk .find ({'symbol' : symbol , 'sha' : chunk ['sha' ]},
212
+ ).upsert ().update_one ({'$set' : chunk })
213
+ else :
214
+ # already exists, dont need to update in mongo
215
+ previous_shas = previous_shas .remove (chunk ['sha' ])
215
216
if seg_count != 0 :
216
217
bulk .execute ()
217
218
218
219
doc ['chunk_count' ] = seg_count
219
220
doc ['append_size' ] = 0
220
221
doc ['append_count' ] = 0
221
222
222
- mongo_retry (self ._symbols .insert_one )(doc )
223
+ if previous_shas :
224
+ mongo_retry (self ._collection .delete_many )({'sha' : {'$in' : list (previous_shas )}})
225
+
226
+ mongo_retry (self ._symbols .update_one )({'symbol' : symbol },
227
+ {'$set' : doc },
228
+ upsert = True )
223
229
224
230
def append (self , symbol , item ):
225
231
"""
@@ -294,7 +300,7 @@ def append(self, symbol, item):
294
300
segment ['start' ] = start
295
301
segment ['end' ] = end
296
302
self ._collection .update_one ({'symbol' : symbol , 'sha' : checksum (symbol , segment )},
297
- {'$set' : segment , '$addToSet' : { 'parent' : sym [ '_id' ]} },
303
+ {'$set' : segment },
298
304
upsert = True )
299
305
300
306
self ._symbols .replace_one ({'symbol' : symbol }, sym )
@@ -362,10 +368,10 @@ def update(self, symbol, item):
362
368
if orig_start is None :
363
369
# new chunk
364
370
bulk .find ({'symbol' : symbol , 'sha' : sha , 'start' : segment ['start' ]}
365
- ).upsert ().update_one ({'$set' : segment , '$addToSet' : { 'parent' : sym [ '_id' ]} })
371
+ ).upsert ().update_one ({'$set' : segment })
366
372
else :
367
373
bulk .find ({'symbol' : symbol , 'start' : orig_start }
368
- ).update_one ({'$set' : segment , '$addToSet' : { 'parent' : sym [ '_id' ]} })
374
+ ).update_one ({'$set' : segment })
369
375
if len (chunks ) > 0 :
370
376
bulk .execute ()
371
377
0 commit comments