@@ -167,7 +167,8 @@ def write(self, symbol, item, chunk_size):
167
167
raise Exception ("Can only chunk Series and DataFrames" )
168
168
169
169
previous_shas = []
170
- if self ._get_symbol_info (symbol ):
170
+ sym = self ._get_symbol_info (symbol )
171
+ if sym :
171
172
previous_shas = set ([Binary (x ['sha' ]) for x in self ._collection .find ({'symbol' : symbol },
172
173
projection = {'sha' : True , '_id' : False },
173
174
)])
@@ -177,6 +178,9 @@ def write(self, symbol, item, chunk_size):
177
178
178
179
for start , end , record in self .chunker .to_chunks (item , chunk_size ):
179
180
r , dtype = serialize (record , string_max_len = self .STRING_MAX )
181
+ # if symbol exists, dtypes better match
182
+ if sym and str (dtype ) != sym ['dtype' ]:
183
+ raise Exception ('Dtype mismatch - cannot write chunk' )
180
184
records .append (r )
181
185
ranges .append ((start , end ))
182
186
@@ -266,6 +270,8 @@ def append(self, symbol, item):
266
270
sym = self ._get_symbol_info (symbol )
267
271
continue
268
272
r , dtype = serialize (record , string_max_len = self .STRING_MAX )
273
+ if str (dtype ) != sym ['dtype' ]:
274
+ raise Exception ("Dtype mismatch - cannot append" )
269
275
records .append (r )
270
276
ranges .append ((start , end ))
271
277
@@ -321,6 +327,7 @@ def update(self, symbol, item):
321
327
if not sym :
322
328
raise NoDataFoundException ("Symbol does not exist. Cannot update" )
323
329
330
+
324
331
records = []
325
332
ranges = []
326
333
orig_ranges = []
@@ -337,7 +344,9 @@ def update(self, symbol, item):
337
344
else :
338
345
orig_ranges .append ((None , None ))
339
346
340
- r , _ = serialize (record , string_max_len = self .STRING_MAX )
347
+ r , dtype = serialize (record , string_max_len = self .STRING_MAX )
348
+ if str (dtype ) != sym ['dtype' ]:
349
+ raise Exception ('Dtype mismatch - cannot update' )
341
350
records .append (r )
342
351
ranges .append ((start , end ))
343
352
0 commit comments