@@ -192,31 +192,30 @@ def write(self, symbol, item, chunk_size):
192
192
doc ['dtype_metadata' ] = dict (dtype .metadata or {})
193
193
doc ['len' ] = len (item )
194
194
195
- seg_count = 0
196
-
197
195
chunks = [r .tostring () for r in records ]
198
196
chunks = compress_array (chunks )
199
197
198
+ op = False
200
199
bulk = self ._collection .initialize_unordered_bulk_op ()
201
200
for chunk , rng in zip (chunks , ranges ):
202
201
start = rng [0 ]
203
202
end = rng [1 ]
204
- seg_count += 1
205
- chunk = {'data' : Binary (chunk ), 'compressed' : True }
203
+ chunk = {'data' : Binary (chunk )}
206
204
chunk ['start' ] = start
207
205
chunk ['end' ] = end
208
206
chunk ['symbol' ] = symbol
209
207
chunk ['sha' ] = checksum (symbol , chunk )
210
208
if chunk ['sha' ] not in previous_shas :
209
+ op = True
211
210
bulk .find ({'symbol' : symbol , 'sha' : chunk ['sha' ]},
212
211
).upsert ().update_one ({'$set' : chunk })
213
212
else :
214
213
# already exists, dont need to update in mongo
215
- previous_shas = previous_shas .remove (chunk ['sha' ])
216
- if seg_count != 0 :
214
+ previous_shas .remove (chunk ['sha' ])
215
+ if op :
217
216
bulk .execute ()
218
217
219
- doc ['chunk_count' ] = seg_count
218
+ doc ['chunk_count' ] = len ( chunks )
220
219
doc ['append_size' ] = 0
221
220
doc ['append_count' ] = 0
222
221
@@ -296,7 +295,7 @@ def append(self, symbol, item):
296
295
start = rng [0 ]
297
296
end = rng [- 1 ]
298
297
299
- segment = {'data' : Binary (chunk ), 'compressed' : True }
298
+ segment = {'data' : Binary (chunk )}
300
299
segment ['start' ] = start
301
300
segment ['end' ] = end
302
301
self ._collection .update_one ({'symbol' : symbol , 'sha' : checksum (symbol , segment )},
@@ -360,7 +359,7 @@ def update(self, symbol, item):
360
359
sym ['len' ] += rec_len
361
360
seg_count += 1
362
361
seg_len += rec_len
363
- segment = {'data' : Binary (chunk ), 'compressed' : True }
362
+ segment = {'data' : Binary (chunk )}
364
363
segment ['start' ] = start
365
364
segment ['end' ] = end
366
365
sha = checksum (symbol , segment )
0 commit comments