1
1
import ast
2
2
import logging
3
3
4
+ import numpy as np
4
5
from bson .binary import Binary
5
6
from pandas import DataFrame , Series , Panel
6
- import numpy as np
7
7
8
+ from arctic .exceptions import UnorderedDataException
8
9
from arctic .serialization .numpy_records import SeriesSerializer , DataFrameSerializer
10
+ from ._ndarray_store import NdarrayStore
9
11
from .._compression import compress , decompress
10
12
from ..date ._util import to_pandas_closed_closed
11
13
from ..exceptions import ArcticException
12
- from ._ndarray_store import NdarrayStore
13
-
14
14
15
15
log = logging .getLogger (__name__ )
16
16
@@ -116,6 +116,24 @@ def get_info(self, version):
116
116
ret ['dtype' ] = ast .literal_eval (version ['dtype' ])
117
117
return ret
118
118
119
+ def read_segment_last_dt (self , version ):
120
+ if 'segment_index' in version :
121
+ index = np .fromstring (decompress (version ['segment_index' ]), dtype = INDEX_DTYPE )
122
+ dt_index = self ._datetime64_index (index )
123
+ if dt_index :
124
+ return index [dt_index ][- 1 ]
125
+ return None
126
+
127
+ def slice_overlap_item_or_raise (self , item , previous_version , concat ):
128
+ """If new item has overlap dt with previous version, keep only new bits if concat=True; raise if concat=False"""
129
+ prev_version_last_dt = self .read_segment_last_dt (previous_version )
130
+ if prev_version_last_dt and len (item ) > 0 and item .index [0 ] <= prev_version_last_dt :
131
+ if concat :
132
+ item = item [item .index > prev_version_last_dt ]
133
+ else :
134
+ raise UnorderedDataException (
135
+ "new data {} before to symbol ending {}" .format (item .index [0 ], prev_version_last_dt ))
136
+ return item
119
137
120
138
def _start_end (date_range , dts ):
121
139
"""
@@ -152,7 +170,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
152
170
item , md = self .SERIALIZER .serialize (item )
153
171
super (PandasSeriesStore , self ).write (arctic_lib , version , symbol , item , previous_version , dtype = md )
154
172
155
- def append (self , arctic_lib , version , symbol , item , previous_version , ** kwargs ):
173
+ def append (self , arctic_lib , version , symbol , item , previous_version , concat = False , ** kwargs ):
174
+ item = self .slice_overlap_item_or_raise (item , previous_version , concat )
156
175
item , md = self .SERIALIZER .serialize (item )
157
176
super (PandasSeriesStore , self ).append (arctic_lib , version , symbol , item , previous_version , dtype = md , ** kwargs )
158
177
@@ -176,7 +195,8 @@ def write(self, arctic_lib, version, symbol, item, previous_version):
176
195
item , md = self .SERIALIZER .serialize (item )
177
196
super (PandasDataFrameStore , self ).write (arctic_lib , version , symbol , item , previous_version , dtype = md )
178
197
179
- def append (self , arctic_lib , version , symbol , item , previous_version , ** kwargs ):
198
+ def append (self , arctic_lib , version , symbol , item , previous_version , concat = False , ** kwargs ):
199
+ item = self .slice_overlap_item_or_raise (item , previous_version , concat )
180
200
item , md = self .SERIALIZER .serialize (item )
181
201
super (PandasDataFrameStore , self ).append (arctic_lib , version , symbol , item , previous_version , dtype = md , ** kwargs )
182
202
0 commit comments