10
10
from pandas .core .frame import _arrays_to_mgr
11
11
import pymongo
12
12
from pymongo .errors import OperationFailure
13
+ import copy
13
14
14
15
from ..date import DateRange , to_pandas_closed_closed , mktz , datetime_to_ms , CLOSED_CLOSED , to_dt
15
16
from ..decorators import mongo_retry
78
79
79
80
class TickStore (object ):
80
81
81
- chunk_size = 100000
82
-
83
82
@classmethod
84
83
def initialize_library (cls , arctic_lib , ** kwargs ):
85
84
TickStore (arctic_lib )._ensure_index ()
@@ -91,7 +90,16 @@ def _ensure_index(self):
91
90
(START , pymongo .ASCENDING )], background = True )
92
91
collection .create_index ([(START , pymongo .ASCENDING )], background = True )
93
92
94
- def __init__ (self , arctic_lib ):
93
+ def __init__ (self , arctic_lib , chunk_size = 100000 ):
94
+ """
95
+ Parameters
96
+ ----------
97
+ arctic_lib : TickStore
98
+ Arctic Library
99
+ chunk_size : int
100
+ Number of ticks to store in a document before splitting to another document.
101
+ if the library was obtained through get_library then set with: self._chuck_size = 10000
102
+ """
95
103
self ._arctic_lib = arctic_lib
96
104
97
105
# Do we allow reading from secondaries
@@ -100,6 +108,8 @@ def __init__(self, arctic_lib):
100
108
# The default collections
101
109
self ._collection = arctic_lib .get_top_level_collection ()
102
110
111
+ self ._chunk_size = chunk_size
112
+
103
113
def __getstate__ (self ):
104
114
return {'arctic_lib' : self ._arctic_lib }
105
115
@@ -334,7 +344,7 @@ def _set_or_promote_dtype(self, column_dtypes, c, dtype):
334
344
335
345
def _prepend_image (self , document , im , rtn_length , column_dtypes , column_set , columns ):
336
346
image = im [IMAGE ]
337
- first_dt = im ['t' ]
347
+ first_dt = im [DTYPE ]
338
348
if not first_dt .tzinfo :
339
349
first_dt = first_dt .replace (tzinfo = mktz ('UTC' ))
340
350
document [INDEX ] = np .insert (document [INDEX ], 0 , np .uint64 (datetime_to_ms (first_dt )))
@@ -354,7 +364,7 @@ def _prepend_image(self, document, im, rtn_length, column_dtypes, column_set, co
354
364
for field in set (document ).difference (set (image )):
355
365
if field == INDEX :
356
366
continue
357
- logger .debug ("Field %s is missing from image!" , field )
367
+ logger .debug ("Field %s is missing from image!" % field )
358
368
if document [field ] is not None :
359
369
val = np .nan
360
370
document [field ] = np .insert (document [field ], 0 , document [field ].dtype .type (val ))
@@ -450,16 +460,21 @@ def _assert_nonoverlapping_data(self, symbol, start, end):
450
460
raise OverlappingDataException ("Document already exists with start:{} end:{} in the range of our start:{} end:{}" .format (
451
461
doc [START ], doc [END ], start , end ))
452
462
453
- def write (self , symbol , data ):
463
+ def write (self , symbol , data , initial_image = None ):
454
464
"""
455
465
Writes a list of market data events.
456
466
457
467
Parameters
458
468
----------
459
469
symbol : `str`
460
470
symbol name for the item
461
- data : list of dicts
471
+ data : list of dicts or a panda.DataFrame
462
472
List of ticks to store to the tick-store.
473
+ if a list of dicts, each dict must contain a 'index' datetime
474
+ if a panda.DataFrame the index must be a Timestamp that can be converted to a datetime
475
+ initial_image : dict
476
+ Dict of the initial image at the start of the document. If this contains a 'index' entry it is
477
+ assumed to be the time of the timestamp of the index
463
478
"""
464
479
pandas = False
465
480
# Check for overlapping data
@@ -475,38 +490,41 @@ def write(self, symbol, data):
475
490
self ._assert_nonoverlapping_data (symbol , to_dt (start ), to_dt (end ))
476
491
477
492
if pandas :
478
- buckets = self ._pandas_to_buckets (data , symbol )
493
+ buckets = self ._pandas_to_buckets (data , symbol , initial_image )
479
494
else :
480
- buckets = self ._to_buckets (data , symbol )
495
+ buckets = self ._to_buckets (data , symbol , initial_image )
481
496
self ._write (buckets )
482
497
483
498
def _write (self , buckets ):
484
499
start = dt .now ()
485
500
mongo_retry (self ._collection .insert_many )(buckets )
486
501
t = (dt .now () - start ).total_seconds ()
487
- ticks = len (buckets ) * self .chunk_size
488
- print ("%d buckets in %s: approx %s ticks/sec" % (len (buckets ), t , int (ticks / t )))
502
+ ticks = len (buckets ) * self ._chunk_size
489
503
490
- def _pandas_to_buckets (self , x , symbol ):
504
+ def _pandas_to_buckets (self , x , symbol , initial_image ):
491
505
rtn = []
492
- for i in range (0 , len (x ), self .chunk_size ):
493
- rtn .append (self ._pandas_to_bucket (x [i :i + self .chunk_size ], symbol ))
506
+ for i in range (0 , len (x ), self ._chunk_size ):
507
+ bucket , initial_image = TickStore ._pandas_to_bucket (x [i :i + self ._chunk_size ], symbol , initial_image )
508
+ rtn .append (bucket )
494
509
return rtn
495
510
496
- def _to_buckets (self , x , symbol ):
511
+ def _to_buckets (self , x , symbol , initial_image ):
497
512
rtn = []
498
- for i in range (0 , len (x ), self .chunk_size ):
499
- rtn .append (self ._to_bucket (x [i :i + self .chunk_size ], symbol ))
513
+ for i in range (0 , len (x ), self ._chunk_size ):
514
+ bucket , initial_image = TickStore ._to_bucket (x [i :i + self ._chunk_size ], symbol , initial_image )
515
+ rtn .append (bucket )
500
516
return rtn
501
517
502
- def _to_ms (self , date ):
518
+ @staticmethod
519
+ def _to_ms (date ):
503
520
if isinstance (date , dt ):
504
521
if not date .tzinfo :
505
- logger .warning ('WARNING: treating naive datetime as London in write path' )
522
+ logger .warning ('WARNING: treating naive datetime as UTC in write path' )
506
523
return datetime_to_ms (date )
507
524
return date
508
525
509
- def _str_dtype (self , dtype ):
526
+ @staticmethod
527
+ def _str_dtype (dtype ):
510
528
"""
511
529
Represent dtypes without byte order, as earlier Java tickstore code doesn't support explicit byte order.
512
530
"""
@@ -522,8 +540,8 @@ def _str_dtype(self, dtype):
522
540
else :
523
541
raise UnhandledDtypeException ("Bad dtype '%s'" % dtype )
524
542
525
-
526
- def _ensure_supported_dtypes (self , array ):
543
+ @ staticmethod
544
+ def _ensure_supported_dtypes (array ):
527
545
# We only support these types for now, as we need to read them in Java
528
546
if (array .dtype .kind ) == 'i' :
529
547
array = array .astype ('<i8' )
@@ -538,42 +556,68 @@ def _ensure_supported_dtypes(self, array):
538
556
array = array .astype (array .dtype .newbyteorder ('<' ))
539
557
return array
540
558
541
- def _pandas_to_bucket (self , df , symbol ):
542
- start = to_dt (df .index [0 ].to_datetime ())
559
+ @staticmethod
560
+ def _pandas_compute_final_image (df , image , end ):
561
+ # Compute the final image with forward fill of df applied to the image
562
+ final_image = copy .copy (image )
563
+ last_values = df .ffill ().tail (1 ).to_dict ()
564
+ last_dict = {i : a .values ()[0 ] for i , a in last_values .items ()}
565
+ final_image .update (last_dict )
566
+ final_image ['index' ] = end
567
+ return final_image
568
+
569
+ @staticmethod
570
+ def _pandas_to_bucket (df , symbol , initial_image ):
571
+ rtn = {SYMBOL : symbol , VERSION : CHUNK_VERSION_NUMBER , COLUMNS : {}, COUNT : len (df )}
543
572
end = to_dt (df .index [- 1 ].to_datetime ())
544
- rtn = {START : start , END : end , SYMBOL : symbol }
545
- rtn [VERSION ] = CHUNK_VERSION_NUMBER
546
- rtn [COUNT ] = len (df )
547
- rtn [COLUMNS ] = {}
573
+ if initial_image :
574
+ if 'index' in initial_image :
575
+ start = min (to_dt (df .index [0 ].to_datetime ()), initial_image ['index' ])
576
+ else :
577
+ start = to_dt (df .index [0 ].to_datetime ())
578
+ image_start = initial_image .get ('index' , start )
579
+ image = {k : v for k , v in initial_image .items () if k != 'index' }
580
+ rtn [IMAGE_DOC ] = {DTYPE : image_start , START : 0 , IMAGE : initial_image }
581
+ final_image = TickStore ._pandas_compute_final_image (df , initial_image , end )
582
+ else :
583
+ start = to_dt (df .index [0 ].to_datetime ())
584
+ final_image = {}
585
+ rtn [END ] = end
586
+ rtn [START ] = start
548
587
549
588
logger .warning ("NB treating all values as 'exists' - no longer sparse" )
550
589
rowmask = Binary (lz4 .compressHC (np .packbits (np .ones (len (df ), dtype = 'uint8' ))))
551
590
552
591
recs = df .to_records (convert_datetime64 = False )
553
592
for col in df :
554
- array = self ._ensure_supported_dtypes (recs [col ])
593
+ array = TickStore ._ensure_supported_dtypes (recs [col ])
555
594
col_data = {}
556
595
col_data [DATA ] = Binary (lz4 .compressHC (array .tostring ()))
557
596
col_data [ROWMASK ] = rowmask
558
- col_data [DTYPE ] = self ._str_dtype (array .dtype )
597
+ col_data [DTYPE ] = TickStore ._str_dtype (array .dtype )
559
598
rtn [COLUMNS ][col ] = col_data
560
599
rtn [INDEX ] = Binary (lz4 .compressHC (np .concatenate (([recs ['index' ][0 ].astype ('datetime64[ms]' ).view ('uint64' )],
561
600
np .diff (recs ['index' ].astype ('datetime64[ms]' ).view ('uint64' )))
562
601
).tostring ()))
563
- return rtn
602
+ return rtn , final_image
564
603
565
- def _to_bucket (self , ticks , symbol ):
604
+ @staticmethod
605
+ def _to_bucket (ticks , symbol , initial_image ):
606
+ rtn = {SYMBOL : symbol , VERSION : CHUNK_VERSION_NUMBER , COLUMNS : {}, COUNT : len (ticks )}
566
607
data = {}
567
608
rowmask = {}
568
609
start = to_dt (ticks [0 ]['index' ])
569
610
end = to_dt (ticks [- 1 ]['index' ])
611
+ final_image = copy .copy (initial_image ) if initial_image else {}
570
612
for i , t in enumerate (ticks ):
613
+ if initial_image :
614
+ final_image .update (t )
571
615
for k , v in iteritems (t ):
572
616
try :
573
617
if k != 'index' :
574
618
rowmask [k ][i ] = 1
575
619
else :
576
- v = self ._to_ms (v )
620
+ v = TickStore ._to_ms (v )
577
621
data [k ].append (v )
578
622
except KeyError :
579
623
if k != 'index' :
@@ -583,21 +627,22 @@ def _to_bucket(self, ticks, symbol):
583
627
584
628
rowmask = dict ([(k , Binary (lz4 .compressHC (np .packbits (v ).tostring ())))
585
629
for k , v in iteritems (rowmask )])
586
-
587
- rtn = {START : start , END : end , SYMBOL : symbol }
588
- rtn [VERSION ] = CHUNK_VERSION_NUMBER
589
- rtn [COUNT ] = len (ticks )
590
- rtn [COLUMNS ] = {}
591
630
for k , v in iteritems (data ):
592
631
if k != 'index' :
593
632
v = np .array (v )
594
- v = self ._ensure_supported_dtypes (v )
633
+ v = TickStore ._ensure_supported_dtypes (v )
595
634
rtn [COLUMNS ][k ] = {DATA : Binary (lz4 .compressHC (v .tostring ())),
596
- DTYPE : self ._str_dtype (v .dtype ),
635
+ DTYPE : TickStore ._str_dtype (v .dtype ),
597
636
ROWMASK : rowmask [k ]}
598
637
638
+ if initial_image :
639
+ image_start = initial_image .get ('index' , start )
640
+ start = min (start , image_start )
641
+ rtn [IMAGE_DOC ] = {DTYPE : image_start , START : 0 , IMAGE : final_image }
642
+ rtn [END ] = end
643
+ rtn [START ] = start
599
644
rtn [INDEX ] = Binary (lz4 .compressHC (np .concatenate (([data ['index' ][0 ]], np .diff (data ['index' ]))).tostring ()))
600
- return rtn
645
+ return rtn , final_image
601
646
602
647
def max_date (self , symbol ):
603
648
"""
0 commit comments