30
30
LEN = 'l'
31
31
SERIALIZER = 'se'
32
32
CHUNKER = 'ch'
33
+ USERMETA = 'u'
33
34
34
35
MAX_CHUNK_SIZE = 15 * 1024 * 1024
35
36
@@ -245,7 +246,7 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
245
246
return data
246
247
return CHUNKER_MAP [sym [CHUNKER ]].filter (data , chunk_range )
247
248
248
- def write (self , symbol , item , chunker = DateChunker (), ** kwargs ):
249
+ def write (self , symbol , item , metadata = None , chunker = DateChunker (), ** kwargs ):
249
250
"""
250
251
Writes data from item to symbol in the database
251
252
@@ -255,6 +256,8 @@ def write(self, symbol, item, chunker=DateChunker(), **kwargs):
255
256
the symbol that will be used to reference the written data
256
257
item: Dataframe or Series
257
258
the data to write the database
259
+ metadata: ?
260
+ optional per symbol metadata
258
261
chunker: Object of type Chunker
259
262
A chunker that chunks the data in item
260
263
kwargs:
@@ -276,13 +279,13 @@ def write(self, symbol, item, chunker=DateChunker(), **kwargs):
276
279
doc [LEN ] = len (item )
277
280
doc [SERIALIZER ] = self .serializer .TYPE
278
281
doc [CHUNKER ] = chunker .TYPE
282
+ doc [USERMETA ] = metadata
279
283
280
284
sym = self ._get_symbol_info (symbol )
281
285
if sym :
282
286
previous_shas = set ([Binary (x [SHA ]) for x in self ._collection .find ({SYMBOL : symbol },
283
287
projection = {SHA : True , '_id' : False },
284
288
)])
285
-
286
289
op = False
287
290
bulk = self ._collection .initialize_unordered_bulk_op ()
288
291
meta_bulk = self ._mdata .initialize_unordered_bulk_op ()
@@ -334,7 +337,7 @@ def write(self, symbol, item, chunker=DateChunker(), **kwargs):
334
337
{'$set' : doc },
335
338
upsert = True )
336
339
337
- def __update (self , sym , item , combine_method = None , chunk_range = None ):
340
+ def __update (self , sym , item , metadata = None , combine_method = None , chunk_range = None ):
338
341
'''
339
342
helper method used by update and append since they very closely
340
343
resemble eachother. Really differ only by the combine method.
@@ -415,9 +418,10 @@ def __update(self, sym, item, combine_method=None, chunk_range=None):
415
418
bulk .execute ()
416
419
meta_bulk .execute ()
417
420
421
+ sym [USERMETA ] = metadata
418
422
self ._symbols .replace_one ({SYMBOL : symbol }, sym )
419
423
420
- def append (self , symbol , item ):
424
+ def append (self , symbol , item , metadata = None ):
421
425
"""
422
426
Appends data from item to symbol's data in the database.
423
427
@@ -429,13 +433,15 @@ def append(self, symbol, item):
429
433
the symbol for the given item in the DB
430
434
item: DataFrame or Series
431
435
the data to append
436
+ metadata: ?
437
+ optional per symbol metadata
432
438
"""
433
439
sym = self ._get_symbol_info (symbol )
434
440
if not sym :
435
441
raise NoDataFoundException ("Symbol does not exist." )
436
- self .__update (sym , item , combine_method = SER_MAP [sym [SERIALIZER ]].combine )
442
+ self .__update (sym , item , metadata = metadata , combine_method = SER_MAP [sym [SERIALIZER ]].combine )
437
443
438
- def update (self , symbol , item , chunk_range = None , upsert = False , ** kwargs ):
444
+ def update (self , symbol , item , metadata = None , chunk_range = None , upsert = False , ** kwargs ):
439
445
"""
440
446
Overwrites data in DB with data in item for the given symbol.
441
447
@@ -447,6 +453,8 @@ def update(self, symbol, item, chunk_range=None, upsert=False, **kwargs):
447
453
the symbol for the given item in the DB
448
454
item: DataFrame or Series
449
455
the data to update
456
+ metadata: ?
457
+ optional per symbol metadata
450
458
chunk_range: None, or a range object
451
459
If a range is specified, it will clear/delete the data within the
452
460
range and overwrite it with the data in item. This allows the user
@@ -462,15 +470,15 @@ def update(self, symbol, item, chunk_range=None, upsert=False, **kwargs):
462
470
sym = self ._get_symbol_info (symbol )
463
471
if not sym :
464
472
if upsert :
465
- return self .write (symbol , item , ** kwargs )
473
+ return self .write (symbol , item , metadata = metadata , ** kwargs )
466
474
else :
467
475
raise NoDataFoundException ("Symbol does not exist." )
468
476
if chunk_range is not None :
469
477
if len (CHUNKER_MAP [sym [CHUNKER ]].filter (item , chunk_range )) == 0 :
470
478
raise Exception ('Range must be inclusive of data' )
471
- self .__update (sym , item , combine_method = self .serializer .combine , chunk_range = chunk_range )
479
+ self .__update (sym , item , metadata = metadata , combine_method = self .serializer .combine , chunk_range = chunk_range )
472
480
else :
473
- self .__update (sym , item , combine_method = lambda old , new : new , chunk_range = chunk_range )
481
+ self .__update (sym , item , metadata = metadata , combine_method = lambda old , new : new , chunk_range = chunk_range )
474
482
475
483
def get_info (self , symbol ):
476
484
"""
@@ -497,6 +505,43 @@ def get_info(self, symbol):
497
505
ret ['serializer' ] = sym [SERIALIZER ]
498
506
return ret
499
507
508
+ def read_metadata (self , symbol ):
509
+ '''
510
+ Reads user defined metadata out for the given symbol
511
+
512
+ Parameters
513
+ ----------
514
+ symbol: str
515
+ symbol for the given item in the DB
516
+
517
+ Returns
518
+ -------
519
+ ?
520
+ '''
521
+ sym = self ._get_symbol_info (symbol )
522
+ if not sym :
523
+ raise NoDataFoundException ("Symbol does not exist." )
524
+ x = self ._symbols .find_one ({SYMBOL : symbol })
525
+ return x [USERMETA ] if USERMETA in x else None
526
+
527
+ def write_metadata (self , symbol , metadata ):
528
+ '''
529
+ writes user defined metadata for the given symbol
530
+
531
+ Parameters
532
+ ----------
533
+ symbol: str
534
+ symbol for the given item in the DB
535
+ metadata: ?
536
+ metadata to write
537
+ '''
538
+ sym = self ._get_symbol_info (symbol )
539
+ if not sym :
540
+ raise NoDataFoundException ("Symbol does not exist." )
541
+
542
+ sym [USERMETA ] = metadata
543
+ self ._symbols .replace_one ({SYMBOL : symbol }, sym )
544
+
500
545
def get_chunk_ranges (self , symbol , chunk_range = None , reverse = False ):
501
546
"""
502
547
Returns a generator of (Start, End) tuples for each chunk in the symbol
0 commit comments