Skip to content

Commit 9abb909

Browse files
committed
Merge pull request pandas-dev#103 from manahl/tickstore_write_initial_images
Tickstore write initial images
2 parents 517d402 + 641f409 commit 9abb909

File tree

5 files changed

+187
-77
lines changed

5 files changed

+187
-77
lines changed

arctic/tickstore/tickstore.py

Lines changed: 88 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from pandas.core.frame import _arrays_to_mgr
1111
import pymongo
1212
from pymongo.errors import OperationFailure
13+
import copy
1314

1415
from ..date import DateRange, to_pandas_closed_closed, mktz, datetime_to_ms, CLOSED_CLOSED, to_dt
1516
from ..decorators import mongo_retry
@@ -68,6 +69,7 @@
6869
COLUMNS = 'cs'
6970
DATA = 'd'
7071
DTYPE = 't'
72+
IMAGE_TIME = 't'
7173
ROWMASK = 'm'
7274

7375
COUNT = 'c'
@@ -78,8 +80,6 @@
7880

7981
class TickStore(object):
8082

81-
chunk_size = 100000
82-
8383
@classmethod
8484
def initialize_library(cls, arctic_lib, **kwargs):
8585
TickStore(arctic_lib)._ensure_index()
@@ -91,7 +91,16 @@ def _ensure_index(self):
9191
(START, pymongo.ASCENDING)], background=True)
9292
collection.create_index([(START, pymongo.ASCENDING)], background=True)
9393

94-
def __init__(self, arctic_lib):
94+
def __init__(self, arctic_lib, chunk_size=100000):
95+
"""
96+
Parameters
97+
----------
98+
arctic_lib : TickStore
99+
Arctic Library
100+
chunk_size : int
101+
Number of ticks to store in a document before splitting to another document.
102+
if the library was obtained through get_library then set with: self._chuck_size = 10000
103+
"""
95104
self._arctic_lib = arctic_lib
96105

97106
# Do we allow reading from secondaries
@@ -100,6 +109,8 @@ def __init__(self, arctic_lib):
100109
# The default collections
101110
self._collection = arctic_lib.get_top_level_collection()
102111

112+
self._chunk_size = chunk_size
113+
103114
def __getstate__(self):
104115
return {'arctic_lib': self._arctic_lib}
105116

@@ -334,7 +345,7 @@ def _set_or_promote_dtype(self, column_dtypes, c, dtype):
334345

335346
def _prepend_image(self, document, im, rtn_length, column_dtypes, column_set, columns):
336347
image = im[IMAGE]
337-
first_dt = im['t']
348+
first_dt = im[IMAGE_TIME]
338349
if not first_dt.tzinfo:
339350
first_dt = first_dt.replace(tzinfo=mktz('UTC'))
340351
document[INDEX] = np.insert(document[INDEX], 0, np.uint64(datetime_to_ms(first_dt)))
@@ -354,7 +365,7 @@ def _prepend_image(self, document, im, rtn_length, column_dtypes, column_set, co
354365
for field in set(document).difference(set(image)):
355366
if field == INDEX:
356367
continue
357-
logger.debug("Field %s is missing from image!", field)
368+
logger.debug("Field %s is missing from image!" % field)
358369
if document[field] is not None:
359370
val = np.nan
360371
document[field] = np.insert(document[field], 0, document[field].dtype.type(val))
@@ -450,16 +461,21 @@ def _assert_nonoverlapping_data(self, symbol, start, end):
450461
raise OverlappingDataException("Document already exists with start:{} end:{} in the range of our start:{} end:{}".format(
451462
doc[START], doc[END], start, end))
452463

453-
def write(self, symbol, data):
464+
def write(self, symbol, data, initial_image=None):
454465
"""
455466
Writes a list of market data events.
456467
457468
Parameters
458469
----------
459470
symbol : `str`
460471
symbol name for the item
461-
data : list of dicts
472+
data : list of dicts or a pandas.DataFrame
462473
List of ticks to store to the tick-store.
474+
if a list of dicts, each dict must contain a 'index' datetime
475+
if a pandas.DataFrame the index must be a Timestamp that can be converted to a datetime
476+
initial_image : dict
477+
Dict of the initial image at the start of the document. If this contains a 'index' entry it is
478+
assumed to be the time of the timestamp of the index
463479
"""
464480
pandas = False
465481
# Check for overlapping data
@@ -475,38 +491,42 @@ def write(self, symbol, data):
475491
self._assert_nonoverlapping_data(symbol, to_dt(start), to_dt(end))
476492

477493
if pandas:
478-
buckets = self._pandas_to_buckets(data, symbol)
494+
buckets = self._pandas_to_buckets(data, symbol, initial_image)
479495
else:
480-
buckets = self._to_buckets(data, symbol)
496+
buckets = self._to_buckets(data, symbol, initial_image)
481497
self._write(buckets)
482498

483499
def _write(self, buckets):
484500
start = dt.now()
485501
mongo_retry(self._collection.insert_many)(buckets)
486502
t = (dt.now() - start).total_seconds()
487-
ticks = len(buckets) * self.chunk_size
488-
print("%d buckets in %s: approx %s ticks/sec" % (len(buckets), t, int(ticks / t)))
503+
ticks = len(buckets) * self._chunk_size
504+
logger.debug("%d buckets in %s: approx %s ticks/sec" % (len(buckets), t, int(ticks / t)))
489505

490-
def _pandas_to_buckets(self, x, symbol):
506+
def _pandas_to_buckets(self, x, symbol, initial_image):
491507
rtn = []
492-
for i in range(0, len(x), self.chunk_size):
493-
rtn.append(self._pandas_to_bucket(x[i:i + self.chunk_size], symbol))
508+
for i in range(0, len(x), self._chunk_size):
509+
bucket, initial_image = TickStore._pandas_to_bucket(x[i:i + self._chunk_size], symbol, initial_image)
510+
rtn.append(bucket)
494511
return rtn
495512

496-
def _to_buckets(self, x, symbol):
513+
def _to_buckets(self, x, symbol, initial_image):
497514
rtn = []
498-
for i in range(0, len(x), self.chunk_size):
499-
rtn.append(self._to_bucket(x[i:i + self.chunk_size], symbol))
515+
for i in range(0, len(x), self._chunk_size):
516+
bucket, initial_image = TickStore._to_bucket(x[i:i + self._chunk_size], symbol, initial_image)
517+
rtn.append(bucket)
500518
return rtn
501519

502-
def _to_ms(self, date):
520+
@staticmethod
521+
def _to_ms(date):
503522
if isinstance(date, dt):
504523
if not date.tzinfo:
505-
logger.warning('WARNING: treating naive datetime as London in write path')
524+
logger.warning('WARNING: treating naive datetime as UTC in write path')
506525
return datetime_to_ms(date)
507526
return date
508527

509-
def _str_dtype(self, dtype):
528+
@staticmethod
529+
def _str_dtype(dtype):
510530
"""
511531
Represent dtypes without byte order, as earlier Java tickstore code doesn't support explicit byte order.
512532
"""
@@ -522,8 +542,8 @@ def _str_dtype(self, dtype):
522542
else:
523543
raise UnhandledDtypeException("Bad dtype '%s'" % dtype)
524544

525-
526-
def _ensure_supported_dtypes(self, array):
545+
@staticmethod
546+
def _ensure_supported_dtypes(array):
527547
# We only support these types for now, as we need to read them in Java
528548
if (array.dtype.kind) == 'i':
529549
array = array.astype('<i8')
@@ -538,42 +558,68 @@ def _ensure_supported_dtypes(self, array):
538558
array = array.astype(array.dtype.newbyteorder('<'))
539559
return array
540560

541-
def _pandas_to_bucket(self, df, symbol):
542-
start = to_dt(df.index[0].to_datetime())
561+
@staticmethod
562+
def _pandas_compute_final_image(df, image, end):
563+
# Compute the final image with forward fill of df applied to the image
564+
final_image = copy.copy(image)
565+
last_values = df.ffill().tail(1).to_dict()
566+
last_dict = {i: list(a.values())[0] for i, a in last_values.items()}
567+
final_image.update(last_dict)
568+
final_image['index'] = end
569+
return final_image
570+
571+
@staticmethod
572+
def _pandas_to_bucket(df, symbol, initial_image):
573+
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(df)}
543574
end = to_dt(df.index[-1].to_datetime())
544-
rtn = {START: start, END: end, SYMBOL: symbol}
545-
rtn[VERSION] = CHUNK_VERSION_NUMBER
546-
rtn[COUNT] = len(df)
547-
rtn[COLUMNS] = {}
575+
if initial_image :
576+
if 'index' in initial_image:
577+
start = min(to_dt(df.index[0].to_datetime()), initial_image['index'])
578+
else:
579+
start = to_dt(df.index[0].to_datetime())
580+
image_start = initial_image.get('index', start)
581+
image = {k: v for k, v in initial_image.items() if k != 'index'}
582+
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
583+
final_image = TickStore._pandas_compute_final_image(df, initial_image, end)
584+
else:
585+
start = to_dt(df.index[0].to_datetime())
586+
final_image = {}
587+
rtn[END] = end
588+
rtn[START] = start
548589

549590
logger.warning("NB treating all values as 'exists' - no longer sparse")
550591
rowmask = Binary(lz4.compressHC(np.packbits(np.ones(len(df), dtype='uint8'))))
551592

552593
recs = df.to_records(convert_datetime64=False)
553594
for col in df:
554-
array = self._ensure_supported_dtypes(recs[col])
595+
array = TickStore._ensure_supported_dtypes(recs[col])
555596
col_data = {}
556597
col_data[DATA] = Binary(lz4.compressHC(array.tostring()))
557598
col_data[ROWMASK] = rowmask
558-
col_data[DTYPE] = self._str_dtype(array.dtype)
599+
col_data[DTYPE] = TickStore._str_dtype(array.dtype)
559600
rtn[COLUMNS][col] = col_data
560601
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([recs['index'][0].astype('datetime64[ms]').view('uint64')],
561602
np.diff(recs['index'].astype('datetime64[ms]').view('uint64')))
562603
).tostring()))
563-
return rtn
604+
return rtn, final_image
564605

565-
def _to_bucket(self, ticks, symbol):
606+
@staticmethod
607+
def _to_bucket(ticks, symbol, initial_image):
608+
rtn = {SYMBOL: symbol, VERSION: CHUNK_VERSION_NUMBER, COLUMNS: {}, COUNT: len(ticks)}
566609
data = {}
567610
rowmask = {}
568611
start = to_dt(ticks[0]['index'])
569612
end = to_dt(ticks[-1]['index'])
613+
final_image = copy.copy(initial_image) if initial_image else {}
570614
for i, t in enumerate(ticks):
615+
if initial_image:
616+
final_image.update(t)
571617
for k, v in iteritems(t):
572618
try:
573619
if k != 'index':
574620
rowmask[k][i] = 1
575621
else:
576-
v = self._to_ms(v)
622+
v = TickStore._to_ms(v)
577623
data[k].append(v)
578624
except KeyError:
579625
if k != 'index':
@@ -583,21 +629,22 @@ def _to_bucket(self, ticks, symbol):
583629

584630
rowmask = dict([(k, Binary(lz4.compressHC(np.packbits(v).tostring())))
585631
for k, v in iteritems(rowmask)])
586-
587-
rtn = {START: start, END: end, SYMBOL: symbol}
588-
rtn[VERSION] = CHUNK_VERSION_NUMBER
589-
rtn[COUNT] = len(ticks)
590-
rtn[COLUMNS] = {}
591632
for k, v in iteritems(data):
592633
if k != 'index':
593634
v = np.array(v)
594-
v = self._ensure_supported_dtypes(v)
635+
v = TickStore._ensure_supported_dtypes(v)
595636
rtn[COLUMNS][k] = {DATA: Binary(lz4.compressHC(v.tostring())),
596-
DTYPE: self._str_dtype(v.dtype),
637+
DTYPE: TickStore._str_dtype(v.dtype),
597638
ROWMASK: rowmask[k]}
598639

640+
if initial_image:
641+
image_start = initial_image.get('index', start)
642+
start = min(start, image_start)
643+
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
644+
rtn[END] = end
645+
rtn[START] = start
599646
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
600-
return rtn
647+
return rtn, final_image
601648

602649
def max_date(self, symbol):
603650
"""

tests/integration/tickstore/test_ts_delete.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_delete(tickstore_lib):
2121
'index': dt(2013, 1, 30, tzinfo=mktz('Europe/London'))
2222
},
2323
]
24-
tickstore_lib.chunk_size = 1
24+
tickstore_lib._chunk_size = 1
2525
tickstore_lib.write('SYM', DUMMY_DATA)
2626
tickstore_lib.delete('SYM')
2727
with pytest.raises(NoDataFoundException):
@@ -45,7 +45,7 @@ def test_delete_daterange(tickstore_lib):
4545
'index': dt(2013, 2, 1, tzinfo=mktz('Europe/London'))
4646
},
4747
]
48-
tickstore_lib.chunk_size = 1
48+
tickstore_lib._chunk_size = 1
4949
tickstore_lib.write('SYM', DUMMY_DATA)
5050

5151
# Delete with a date-range

tests/integration/tickstore/test_ts_read.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def test_read_all_cols_all_dtypes(tickstore_lib, chunk_size):
112112
'index': dt(1970, 1, 1, 0, 0, 1, tzinfo=mktz('UTC')),
113113
},
114114
]
115-
tickstore_lib.chunk_size = 3
115+
tickstore_lib._chunk_size = chunk_size
116116
tickstore_lib.write('sym', data)
117117
df = tickstore_lib.read('sym', columns=None)
118118

@@ -167,7 +167,7 @@ def test_date_range(tickstore_lib):
167167
tickstore_lib.delete('SYM')
168168

169169
# Chunk every 3 symbols and lets have some fun
170-
tickstore_lib.chunk_size = 3
170+
tickstore_lib._chunk_size = 3
171171
tickstore_lib.write('SYM', DUMMY_DATA)
172172

173173
with patch.object(tickstore_lib._collection, 'find', side_effect=tickstore_lib._collection.find) as f:
@@ -222,7 +222,7 @@ def test_date_range_end_not_in_range(tickstore_lib):
222222
},
223223
]
224224

225-
tickstore_lib.chunk_size = 1
225+
tickstore_lib._chunk_size = 1
226226
tickstore_lib.write('SYM', DUMMY_DATA)
227227
with patch.object(tickstore_lib._collection, 'find', side_effect=tickstore_lib._collection.find) as f:
228228
df = tickstore_lib.read('SYM', date_range=DateRange(20130101, dt(2013, 1, 2, 9, 0)), columns=None)
@@ -251,7 +251,7 @@ def test_date_range_default_timezone(tickstore_lib, tz_name):
251251
]
252252

253253
with patch('arctic.date._mktz.DEFAULT_TIME_ZONE_NAME', tz_name):
254-
tickstore_lib.chunk_size = 1
254+
tickstore_lib._chunk_size = 1
255255
tickstore_lib.write('SYM', DUMMY_DATA)
256256
df = tickstore_lib.read('SYM', date_range=DateRange(20130101, 20130701), columns=None)
257257
assert len(df) == 2
@@ -281,7 +281,7 @@ def test_date_range_no_bounds(tickstore_lib):
281281
},
282282
]
283283

284-
tickstore_lib.chunk_size = 1
284+
tickstore_lib._chunk_size = 1
285285
tickstore_lib.write('SYM', DUMMY_DATA)
286286

287287
# 1) No start, no end
@@ -315,7 +315,7 @@ def test_date_range_BST(tickstore_lib):
315315
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('Europe/London'))
316316
},
317317
]
318-
tickstore_lib.chunk_size = 1
318+
tickstore_lib._chunk_size = 1
319319
tickstore_lib.write('SYM', DUMMY_DATA)
320320

321321
df = tickstore_lib.read('SYM', columns=None)
@@ -363,7 +363,7 @@ def test_read_out_of_order(tickstore_lib):
363363
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('UTC'))
364364
},
365365
]
366-
tickstore_lib.chunk_size = 3
366+
tickstore_lib._chunk_size = 3
367367
tickstore_lib.write('SYM', DUMMY_DATA)
368368
tickstore_lib.read('SYM', columns=None)
369369
assert len(tickstore_lib.read('SYM', columns=None, date_range=DateRange(dt(2013, 6, 1, tzinfo=mktz('UTC')), dt(2013, 6, 2, tzinfo=mktz('UTC'))))) == 3
@@ -380,7 +380,7 @@ def test_read_longs(tickstore_lib):
380380
'index': dt(2013, 6, 1, 13, 00, tzinfo=mktz('Europe/London'))
381381
},
382382
]
383-
tickstore_lib.chunk_size = 3
383+
tickstore_lib._chunk_size = 3
384384
tickstore_lib.write('SYM', DUMMY_DATA)
385385
tickstore_lib.read('SYM', columns=None)
386386
read = tickstore_lib.read('SYM', columns=None, date_range=DateRange(dt(2013, 6, 1), dt(2013, 6, 2)))

tests/integration/tickstore/test_ts_write.py

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,3 @@ def test_ts_write_pandas(tickstore_lib):
7979

8080
read = tickstore_lib.read('SYM', columns=None)
8181
assert_frame_equal(read, data, check_names=False)
82-
83-
84-
def test_to_bucket(tickstore_lib):
85-
bucket = tickstore_lib._to_bucket(DUMMY_DATA, 'SYM')
86-
assert bucket[SYMBOL] == 'SYM'
87-
assert bucket[START] == dt(2013, 1, 1, tzinfo=mktz('Europe/London'))
88-
assert bucket[END] == dt(2013, 7, 5, tzinfo=mktz('Europe/London'))
89-
assert bucket[COUNT] == 5
90-
91-
92-
def test_pandas_to_bucket(tickstore_lib):
93-
df = read_str_as_pandas(""" index | near
94-
2012-09-08 17:06:11 | 1.0
95-
2012-10-08 17:06:11 | 2.0
96-
2012-10-09 17:06:11 | 2.5
97-
2012-11-08 17:06:11 | 3.0""")
98-
df = df.tz_localize('UTC')
99-
bucket = tickstore_lib._pandas_to_bucket(df, 'SYM')
100-
assert bucket[SYMBOL] == 'SYM'
101-
assert bucket[START] == dt(2012, 9, 8, 17, 6, 11, tzinfo=mktz('UTC'))
102-
assert bucket[END] == dt(2012, 11, 8, 17, 6, 11, tzinfo=mktz('UTC'))
103-
assert bucket[COUNT] == 4
104-
assert len(bucket[COLUMNS]) == 1
105-
assert 'near' in bucket[COLUMNS]
106-

0 commit comments

Comments
 (0)