Skip to content

Commit 882620b

Browse files
authored
* Feature pandas-dev#162 - rename method for ChunkStore to allow symbol renaming
1 parent 7624809 commit 882620b

File tree

3 files changed

+82
-16
lines changed

3 files changed

+82
-16
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
* Bugfix: #187 Compatibility with latest version of pytest-dbfixtures
66
* Feature: #182 Improve ChunkStore read/write performance
7+
* Feature: #162 Rename API for ChunkStore
78

89
### 1.26 (2016-07-20)
910

arctic/chunkstore/chunkstore.py

+38-16
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
ROWS = 'r'
2525

2626

27-
2827
class ChunkStore(object):
2928
@classmethod
3029
def initialize_library(cls, arctic_lib, **kwargs):
@@ -73,12 +72,11 @@ def __str__(self):
7372
def __repr__(self):
7473
return str(self)
7574

76-
def _checksum(self, symbol, doc):
75+
def _checksum(self, doc):
7776
"""
7877
Checksum the passed in dictionary
7978
"""
8079
sha = hashlib.sha1()
81-
sha.update(symbol.encode('ascii'))
8280
sha.update(self.chunker.chunk_to_str(doc[START]).encode('ascii'))
8381
sha.update(self.chunker.chunk_to_str(doc[END]).encode('ascii'))
8482
for k in doc[DATA][COLUMNS]:
@@ -126,6 +124,31 @@ def list_symbols(self):
126124
def _get_symbol_info(self, symbol):
127125
return self._symbols.find_one({SYMBOL: symbol})
128126

127+
def rename(self, from_symbol, to_symbol):
128+
"""
129+
Rename a symbol
130+
131+
Parameters
132+
----------
133+
from_symbol: str
134+
the existing symbol that will be renamed
135+
to_symbol: str
136+
the new symbol name
137+
"""
138+
139+
sym = self._get_symbol_info(from_symbol)
140+
if not sym:
141+
raise NoDataFoundException('No data found for %s' % (from_symbol))
142+
143+
if self._get_symbol_info(to_symbol) is not None:
144+
raise Exception('Symbol %s already exists' % (to_symbol))
145+
146+
mongo_retry(self._collection.update_many)({SYMBOL: from_symbol},
147+
{'$set': {SYMBOL: to_symbol}})
148+
149+
mongo_retry(self._symbols.update_one)({SYMBOL: from_symbol},
150+
{'$set': {SYMBOL: to_symbol}})
151+
129152
def read(self, symbol, chunk_range=None, columns=None, filter_data=True):
130153
"""
131154
Reads data for a given symbol from the database.
@@ -160,7 +183,7 @@ def read(self, symbol, chunk_range=None, columns=None, filter_data=True):
160183
spec.update(self.chunker.to_mongo(chunk_range))
161184

162185
segments = []
163-
for _, x in enumerate(self._collection.find(spec, sort=[(START, pymongo.ASCENDING)],)):
186+
for x in self._collection.find(spec, sort=[(START, pymongo.ASCENDING)],):
164187
segments.append(x[DATA])
165188

166189
data = self.serializer.deserialize(segments, columns)
@@ -192,7 +215,7 @@ def write(self, symbol, item, chunk_size):
192215
doc[CHUNK_SIZE] = chunk_size
193216
doc[ROWS] = len(item)
194217
doc[TYPE] = 'dataframe' if isinstance(item, DataFrame) else 'series'
195-
218+
196219
sym = self._get_symbol_info(symbol)
197220
if sym:
198221
previous_shas = set([Binary(x[SHA]) for x in self._collection.find({SYMBOL: symbol},
@@ -212,7 +235,7 @@ def write(self, symbol, item, chunk_size):
212235
chunk[START] = start
213236
chunk[END] = end
214237
chunk[SYMBOL] = symbol
215-
chunk[SHA] = self._checksum(symbol, chunk)
238+
chunk[SHA] = self._checksum(chunk)
216239

217240
if chunk[SHA] not in previous_shas:
218241
op = True
@@ -248,13 +271,12 @@ def __update(self, symbol, item, combine_method=None):
248271
sym = self._get_symbol_info(symbol)
249272
if not sym:
250273
raise NoDataFoundException("Symbol does not exist.")
251-
274+
252275
if sym[TYPE] == 'series' and not isinstance(item, Series):
253276
raise Exception("Cannot combine Series and DataFrame")
254277
if sym[TYPE] == 'dataframe' and not isinstance(item, DataFrame):
255278
raise Exception("Cannot combine DataFrame and Series")
256279

257-
258280
bulk = self._collection.initialize_unordered_bulk_op()
259281
op = False
260282
for start, end, record in self.chunker.to_chunks(item, sym[CHUNK_SIZE]):
@@ -278,19 +300,19 @@ def __update(self, symbol, item, combine_method=None):
278300
data = self.serializer.serialize(record)
279301
op = True
280302

281-
segment = {DATA: data}
282-
segment[TYPE] = 'dataframe' if isinstance(record, DataFrame) else 'series'
283-
segment[START] = start
284-
segment[END] = end
285-
sha = self._checksum(symbol, segment)
286-
segment[SHA] = sha
303+
chunk = {DATA: data}
304+
chunk[TYPE] = 'dataframe' if isinstance(record, DataFrame) else 'series'
305+
chunk[START] = start
306+
chunk[END] = end
307+
sha = self._checksum(chunk)
308+
chunk[SHA] = sha
287309
if new_chunk:
288310
# new chunk
289311
bulk.find({SYMBOL: symbol, SHA: sha}
290-
).upsert().update_one({'$set': segment})
312+
).upsert().update_one({'$set': chunk})
291313
else:
292314
bulk.find({SYMBOL: symbol, START: start, END: end}
293-
).update_one({'$set': segment})
315+
).update_one({'$set': chunk})
294316

295317
if op:
296318
bulk.execute()

tests/integration/chunkstore/test_chunkstore.py

+43
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import numpy as np
88
import random
99
import pytest
10+
import pymongo
11+
12+
from arctic.chunkstore.chunkstore import START, SYMBOL
1013

1114

1215
def test_write_dataframe(chunkstore_lib):
@@ -902,3 +905,43 @@ def test_read_column_subset(chunkstore_lib):
902905
chunkstore_lib.write('test', df, 'D')
903906
r = chunkstore_lib.read('test', columns=['prev_close', 'volume'])
904907
assert_frame_equal(r, df[['prev_close', 'volume']])
908+
909+
910+
def test_rename(chunkstore_lib):
911+
df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6, 7, 8, 9],
912+
'open': [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9],
913+
'close': [1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8, 8.9, 9.0],
914+
'prev_close': [.1, .2, .3, .4, .5, .6, .7, .8, .8],
915+
'volume': [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000]
916+
},
917+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
918+
(dt(2016, 1, 2), 1),
919+
(dt(2016, 1, 3), 1),
920+
(dt(2016, 2, 1), 1),
921+
(dt(2016, 2, 2), 1),
922+
(dt(2016, 2, 3), 1),
923+
(dt(2016, 3, 1), 1),
924+
(dt(2016, 3, 2), 1),
925+
(dt(2016, 3, 3), 1)],
926+
names=['date', 'id'])
927+
)
928+
929+
chunkstore_lib.write('test', df, 'D')
930+
assert_frame_equal(chunkstore_lib.read('test'), df)
931+
chunkstore_lib.rename('test', 'new_name')
932+
assert_frame_equal(chunkstore_lib.read('new_name'), df)
933+
934+
with pytest.raises(Exception) as e:
935+
chunkstore_lib.rename('new_name', 'new_name')
936+
assert('already exists' in str(e))
937+
938+
assert('test' not in chunkstore_lib.list_symbols())
939+
940+
'''
941+
read out all chunks that have symbol set to 'test'. List should be empty
942+
'''
943+
chunks = []
944+
for x in chunkstore_lib._collection.find({SYMBOL: 'test'}, sort=[(START, pymongo.ASCENDING)],):
945+
chunks.append(x)
946+
947+
assert(len(chunks) == 0)

0 commit comments

Comments
 (0)