4
4
import ast
5
5
6
6
from bson .binary import Binary
7
- from pandas import Series , DataFrame
7
+ from pandas import Series , DataFrame , concat
8
8
9
9
from ..store ._version_store_utils import checksum
10
10
from ..decorators import mongo_retry
@@ -245,49 +245,45 @@ def write(self, symbol, item, chunk_size):
245
245
{'$set' : doc },
246
246
upsert = True )
247
247
248
- def append (self , symbol , item ):
249
- """
250
- Appends data from item to symbol's data in the database
248
+ def __concat (self , a , b ):
249
+ return concat ([a , b ]).sort ()
251
250
252
- Parameters
253
- ----------
254
- symbol: str
255
- the symbol for the given item in the DB
256
- item:
257
- the data to append
258
- """
251
+ def __combine (self , a , b ):
252
+ return a .combine_first (b )
259
253
254
+ def __update (self , symbol , item , combine_method = None ):
260
255
sym = self ._get_symbol_info (symbol )
261
256
if not sym :
262
- raise NoDataFoundException ("Symbol does not exist. Cannot append " )
257
+ raise NoDataFoundException ("Symbol does not exist." )
263
258
264
259
if isinstance (item , Series ) and sym ['type' ] == 'df' :
265
- raise Exception ("cannot append a series to a dataframe " )
260
+ raise Exception ("Symbol types do not match " )
266
261
if isinstance (item , DataFrame ) and sym ['type' ] == 'series' :
267
- raise Exception ("cannot append a dataframe to a series " )
262
+ raise Exception ("Symbol types do not match " )
268
263
269
264
records = []
270
265
ranges = []
271
- dtype = None
272
-
266
+ new_chunks = []
273
267
for start , end , record in self .chunker .to_chunks (item , sym ['chunk_size' ]):
274
- '''
275
- if we have a multiindex there is a chance that part of the append
276
- will overlap an already written chunk, so we need to update
277
- where the date part of the index overlaps
278
- '''
279
- if item .index .nlevels > 1 :
280
- df = self .read (symbol , chunk_range = self .chunker .to_range (start , end ))
281
- if not df .empty :
282
- if df .equals (record ):
283
- continue
284
- record = record .combine_first (df )
285
- self .update (symbol , record )
286
- sym = self ._get_symbol_info (symbol )
268
+ # read out matching chunks
269
+ df = self .read (symbol , chunk_range = self .chunker .to_range (start , end ), filter_data = False )
270
+ # assuming they exist, update them and store the original chunk
271
+ # range for later use
272
+ if not df .empty :
273
+ record = combine_method (record , df )
274
+ if record is None or record .equals (df ):
287
275
continue
276
+
277
+ new_chunks .append (False )
278
+ sym ['append_count' ] += len (record )
279
+ sym ['len' ] -= len (df )
280
+ else :
281
+ new_chunks .append (True )
282
+ sym ['chunk_count' ] += 1
283
+
288
284
r , dtype = serialize (record , string_max_len = self .STRING_MAX )
289
285
if str (dtype ) != sym ['dtype' ]:
290
- raise Exception (" Dtype mismatch - cannot append" )
286
+ raise Exception (' Dtype mismatch.' )
291
287
records .append (r )
292
288
ranges .append ((start , end ))
293
289
@@ -299,38 +295,59 @@ def append(self, symbol, item):
299
295
300
296
item = item .astype (dtype )
301
297
302
- if str (dtype ) != sym ['dtype' ]:
303
- raise Exception ("Dtype mismatch - cannot append" )
304
-
305
298
data = item .tostring ()
306
299
sym ['len' ] += len (item )
307
300
if len (item ) > 0 :
308
- sym ['chunk_count' ] += len (records )
309
- sym ['append_count' ] += len (records )
310
301
sym ['append_size' ] += len (data )
311
302
312
303
chunks = [r .tostring () for r in records ]
313
304
chunks = compress_array (chunks )
314
305
315
- for chunk , rng in zip (chunks , ranges ):
306
+ bulk = self ._collection .initialize_unordered_bulk_op ()
307
+ for chunk , rng , new_chunk in zip (chunks , ranges , new_chunks ):
316
308
start = rng [0 ]
317
- end = rng [- 1 ]
309
+ end = rng [1 ]
318
310
319
311
segment = {'data' : Binary (chunk )}
320
312
segment ['start' ] = start
321
313
segment ['end' ] = end
322
- self ._collection .update_one ({'symbol' : symbol , 'sha' : checksum (symbol , segment )},
323
- {'$set' : segment },
324
- upsert = True )
314
+ sha = checksum (symbol , segment )
315
+ segment ['sha' ] = sha
316
+ if new_chunk :
317
+ # new chunk
318
+ bulk .find ({'symbol' : symbol , 'sha' : sha }
319
+ ).upsert ().update_one ({'$set' : segment })
320
+ else :
321
+ bulk .find ({'symbol' : symbol , 'start' : start , 'end' : end }
322
+ ).update_one ({'$set' : segment })
323
+ if len (chunks ) > 0 :
324
+ bulk .execute ()
325
325
326
326
self ._symbols .replace_one ({'symbol' : symbol }, sym )
327
327
328
+ def append (self , symbol , item ):
329
+ """
330
+ Appends data from item to symbol's data in the database.
331
+
332
+ Is not idempotent
333
+
334
+ Parameters
335
+ ----------
336
+ symbol: str
337
+ the symbol for the given item in the DB
338
+ item:
339
+ the data to append
340
+ """
341
+ self .__update (symbol , item , combine_method = self .__concat )
342
+
328
343
def update (self , symbol , item ):
329
344
"""
330
345
Merges data from item onto existing data in the database for symbol
331
346
data that exists in symbol and item for the same index/multiindex will
332
347
be overwritten by the data in item.
333
348
349
+ Is idempotent
350
+
334
351
Parameters
335
352
----------
336
353
symbol: str
@@ -339,70 +356,7 @@ def update(self, symbol, item):
339
356
the data to update
340
357
"""
341
358
342
- sym = self ._get_symbol_info (symbol )
343
- if not sym :
344
- raise NoDataFoundException ("Symbol does not exist. Cannot update" )
345
-
346
-
347
- records = []
348
- ranges = []
349
- orig_ranges = []
350
- for start , end , record in self .chunker .to_chunks (item , sym ['chunk_size' ]):
351
- # read out matching chunks
352
- df = self .read (symbol , chunk_range = self .chunker .to_range (start , end ))
353
- # assuming they exist, update them and store the original chunk
354
- # range for later use
355
- if not df .empty :
356
- if df .equals (record ):
357
- continue
358
- record = record .combine_first (df )
359
- orig_ranges .append ((self .chunker .to_start_end (record )))
360
- else :
361
- orig_ranges .append ((None , None ))
362
-
363
- r , dtype = serialize (record , string_max_len = self .STRING_MAX )
364
- if str (dtype ) != sym ['dtype' ]:
365
- raise Exception ('Dtype mismatch - cannot update' )
366
- records .append (r )
367
- ranges .append ((start , end ))
368
-
369
- if len (records ) > 0 :
370
- chunks = [r .tostring () for r in records ]
371
- lens = [len (i ) for i in chunks ]
372
- chunks = compress_array (chunks )
373
-
374
- seg_count = 0
375
- seg_len = 0
376
-
377
- bulk = self ._collection .initialize_unordered_bulk_op ()
378
- for chunk , rng , orig_rng , rec_len in zip (chunks , ranges , orig_ranges , lens ):
379
- start = rng [0 ]
380
- end = rng [1 ]
381
- orig_start = orig_rng [0 ]
382
- if orig_start is None :
383
- sym ['len' ] += rec_len
384
- seg_count += 1
385
- seg_len += rec_len
386
- segment = {'data' : Binary (chunk )}
387
- segment ['start' ] = start
388
- segment ['end' ] = end
389
- sha = checksum (symbol , segment )
390
- segment ['sha' ] = sha
391
- if orig_start is None :
392
- # new chunk
393
- bulk .find ({'symbol' : symbol , 'sha' : sha , 'start' : segment ['start' ]}
394
- ).upsert ().update_one ({'$set' : segment })
395
- else :
396
- bulk .find ({'symbol' : symbol , 'start' : orig_start }
397
- ).update_one ({'$set' : segment })
398
- if len (chunks ) > 0 :
399
- bulk .execute ()
400
-
401
- if seg_count != 0 :
402
- sym ['chunk_count' ] += seg_count
403
- sym ['append_size' ] += seg_len
404
- sym ['append_count' ] += seg_count
405
- self ._symbols .replace_one ({'symbol' : symbol }, sym )
359
+ self .__update (symbol , item , combine_method = self .__combine )
406
360
407
361
def get_info (self , symbol ):
408
362
sym = self ._get_symbol_info (symbol )
0 commit comments