1
1
import ast
2
2
import logging
3
3
4
+ import lz4
5
+ import numpy as np
4
6
from bson .binary import Binary
5
7
from pandas import DataFrame , Series , Panel
6
- import numpy as np
7
8
9
+ from arctic .exceptions import UnorderedDataException
8
10
from arctic .serialization .numpy_records import SeriesSerializer , DataFrameSerializer
11
+ from ._ndarray_store import NdarrayStore
9
12
from .._compression import compress , decompress
10
13
from ..date ._util import to_pandas_closed_closed
11
14
from ..exceptions import ArcticException
12
- from ._ndarray_store import NdarrayStore
13
-
14
15
15
16
log = logging .getLogger (__name__ )
16
17
@@ -116,6 +117,24 @@ def get_info(self, version):
116
117
ret ['dtype' ] = ast .literal_eval (version ['dtype' ])
117
118
return ret
118
119
120
+ def read_segment_last_dt (self , version ):
121
+ if 'segment_index' in version :
122
+ index = np .fromstring (lz4 .decompress (version ['segment_index' ]), dtype = INDEX_DTYPE )
123
+ dt_index = self ._datetime64_index (index )
124
+ if dt_index :
125
+ return index [dt_index ][- 1 ]
126
+ return None
127
+
128
+ def slice_overlap_item_or_raise (self , item , previous_version , concat ):
129
+ """If new item has overlap dt with previous version, keep only new bits if concat=True; raise if concat=False"""
130
+ prev_version_last_dt = self .read_segment_last_dt (previous_version )
131
+ if prev_version_last_dt and len (item ) > 0 and item .index [0 ] <= prev_version_last_dt :
132
+ if concat :
133
+ item = item [item .index > prev_version_last_dt ]
134
+ else :
135
+ raise UnorderedDataException (
136
+ "new data {} before to symbol ending {}" .format (item .index [0 ], prev_version_last_dt ))
137
+ return item
119
138
120
139
def _start_end (date_range , dts ):
121
140
"""
@@ -152,7 +171,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
152
171
item , md = self .SERIALIZER .serialize (item )
153
172
super (PandasSeriesStore , self ).write (arctic_lib , version , symbol , item , previous_version , dtype = md )
154
173
155
- def append (self , arctic_lib , version , symbol , item , previous_version , ** kwargs ):
174
+ def append (self , arctic_lib , version , symbol , item , previous_version , concat = False , ** kwargs ):
175
+ item = self .slice_overlap_item_or_raise (item , previous_version , concat )
156
176
item , md = self .SERIALIZER .serialize (item )
157
177
super (PandasSeriesStore , self ).append (arctic_lib , version , symbol , item , previous_version , dtype = md , ** kwargs )
158
178
@@ -176,7 +196,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
176
196
item , md = self .SERIALIZER .serialize (item )
177
197
super (PandasDataFrameStore , self ).write (arctic_lib , version , symbol , item , previous_version , dtype = md )
178
198
179
- def append (self , arctic_lib , version , symbol , item , previous_version , ** kwargs ):
199
+ def append (self , arctic_lib , version , symbol , item , previous_version , concat = False , ** kwargs ):
200
+ item = self .slice_overlap_item_or_raise (item , previous_version , concat )
180
201
item , md = self .SERIALIZER .serialize (item )
181
202
super (PandasDataFrameStore , self ).append (arctic_lib , version , symbol , item , previous_version , dtype = md , ** kwargs )
182
203
0 commit comments