Skip to content

Commit b52c8ab

Browse files
authored
Merge branch 'master' into add-concat-flag
2 parents 4505920 + 5621ce4 commit b52c8ab

File tree

12 files changed

+270
-77
lines changed

12 files changed

+270
-77
lines changed

CHANGES.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
## Changelog
22

3-
### 1.43
3+
### 1.44
4+
* Feature: Expose compressHC from internal arctic LZ4 and remove external LZ4 dependency
45
* Feature: Appending older data (compare to what's exist in library) will raise. Use `concat=True` to append only the
56
new bits
6-
* Bugfix: #350 remove deprecated pandas calls
7+
8+
### 1.43 (2017-05-30)
9+
* Bugfix: #350 remove deprecated pandas calls
10+
* Bugfix: #360 version incorrect in empty append in VersionStore
11+
* Feature: #365 add generic BSON store
712

813
### 1.42 (2017-05-12)
914
* Bugfix: #346 fixed daterange subsetting error on very large dataframes in version store

arctic/_compression.py

+14-21
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
1-
import lz4
21
import os
32
import logging
3+
from . import _compress as clz4
4+
45

56
logger = logging.getLogger(__name__)
67

78
# switch to parallel LZ4 compress (and potentially other parallel stuff), Default True
89
ENABLE_PARALLEL = not os.environ.get('DISABLE_PARALLEL')
910
LZ4_N_PARALLEL = 50 # No. of elements to use parellel compression in LZ4 mode
1011

11-
try:
12-
from . import _compress as clz4
13-
except ImportError:
14-
logger.warning("Couldn't import cython lz4")
15-
import lz4 as clz4
16-
ENABLE_PARALLEL = False
17-
1812

1913
def enable_parallel_lz4(mode):
2014
"""
@@ -33,12 +27,9 @@ def enable_parallel_lz4(mode):
3327
def compress_array(str_list):
3428
"""
3529
Compress an array of strings
36-
37-
By default LZ4 mode is standard in interactive mode,
38-
and high compresion in applications/scripts
3930
"""
4031
if not ENABLE_PARALLEL:
41-
return [lz4.compress(s) for s in str_list]
32+
return [clz4.compress(s) for s in str_list]
4233

4334
# Less than 50 chunks its quicker to compress sequentially..
4435
if len(str_list) > LZ4_N_PARALLEL:
@@ -47,27 +38,21 @@ def compress_array(str_list):
4738
return [clz4.compress(s) for s in str_list]
4839

4940

50-
def _get_lib():
51-
if ENABLE_PARALLEL:
52-
return clz4
53-
return lz4
54-
55-
5641
def compress(_str):
5742
"""
5843
Compress a string
5944
6045
By default LZ4 mode is standard in interactive mode,
6146
and high compresion in applications/scripts
6247
"""
63-
return _get_lib().compress(_str)
48+
return clz4.compress(_str)
6449

6550

6651
def decompress(_str):
6752
"""
6853
Decompress a string
6954
"""
70-
return _get_lib().decompress(_str)
55+
return clz4.decompress(_str)
7156

7257

7358
def decompress_array(str_list):
@@ -76,4 +61,12 @@ def decompress_array(str_list):
7661
"""
7762
if ENABLE_PARALLEL:
7863
return clz4.decompressarr(str_list)
79-
return [lz4.decompress(chunk) for chunk in str_list]
64+
return [clz4.decompress(chunk) for chunk in str_list]
65+
66+
67+
def compressHC(_str):
68+
"""
69+
HC compression
70+
"""
71+
return clz4.compressHC(_str)
72+

arctic/arctic.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from .decorators import mongo_retry
1010
from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException
1111
from .hooks import get_mongodb_uri
12-
from .store import version_store
12+
from .store import version_store, bson_store
1313
from .tickstore import tickstore, toplevel
1414
from .chunkstore import chunkstore
1515
from six import string_types
@@ -27,7 +27,8 @@
2727
LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore,
2828
tickstore.TICK_STORE_TYPE: tickstore.TickStore,
2929
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore,
30-
chunkstore.CHUNK_STORE_TYPE: chunkstore.ChunkStore
30+
chunkstore.CHUNK_STORE_TYPE: chunkstore.ChunkStore,
31+
bson_store.BSON_STORE_TYPE: bson_store.BSONStore
3132
}
3233

3334

arctic/store/bson_store.py

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import logging
2+
from pymongo.errors import OperationFailure
3+
from ..decorators import mongo_retry
4+
from .._util import enable_sharding
5+
6+
logger = logging.getLogger(__name__)
7+
8+
BSON_STORE_TYPE = 'BSONStore'
9+
10+
class BSONStore(object):
11+
"""
12+
BSON Data Store. This stores any Python object that encodes to BSON correctly,
13+
and offers a vanilla pymongo interface. Note that strings myst be valid UTF-8.
14+
15+
See: https://api.mongodb.com/python/3.4.0/api/bson/index.html
16+
17+
Note that this neither defines nor ensures any indices, they are left to the user
18+
to create and manage according to the effective business schema applicable to their data.
19+
20+
Likewise, _id is left to the user to populate if they wish, and is exposed in documents. As
21+
is normally the case with pymongo, _id is set to unique ObjectId if left unspecified at
22+
document insert time.
23+
"""
24+
25+
def __init__(self, arctic_lib):
26+
self._arctic_lib = arctic_lib
27+
self._collection = self._arctic_lib.get_top_level_collection()
28+
29+
@classmethod
30+
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
31+
logger.info("Trying to enable sharding...")
32+
try:
33+
enable_sharding(arctic_lib.arctic, arctic_lib.get_name(), hashed=hashed)
34+
except OperationFailure as exception:
35+
logger.warning(("Library created, but couldn't enable sharding: "
36+
"%s. This is OK if you're not 'admin'"), exception)
37+
38+
@mongo_retry
39+
def stats(self):
40+
"""
41+
Store stats, necessary for quota to work.
42+
"""
43+
res = {}
44+
db = self._collection.database
45+
res['dbstats'] = db.command('dbstats')
46+
res['data'] = db.command('collstats', self._collection.name)
47+
res['totals'] = {'count': res['data']['count'],
48+
'size': res['data']['size']}
49+
return res
50+
51+
@mongo_retry
52+
def find(self, *args, **kwargs):
53+
"""
54+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find
55+
"""
56+
return self._collection.find(*args, **kwargs)
57+
58+
@mongo_retry
59+
def insert_one(self, value):
60+
"""
61+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_one
62+
"""
63+
self._arctic_lib.check_quota()
64+
return self._collection.insert_one(value)
65+
66+
@mongo_retry
67+
def insert_many(self, values):
68+
"""
69+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.insert_many
70+
"""
71+
self._arctic_lib.check_quota()
72+
return self._collection.insert_many(values)
73+
74+
@mongo_retry
75+
def delete_one(self, query):
76+
"""
77+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_one
78+
"""
79+
return self._collection.delete_one(query)
80+
81+
@mongo_retry
82+
def delete_many(self, query):
83+
"""
84+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.delete_many
85+
"""
86+
return self._collection.delete_many(query)
87+
88+
@mongo_retry
89+
def create_index(self, keys, **kwargs):
90+
"""
91+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.create_index
92+
"""
93+
return self._collection.create_index(keys, **kwargs)
94+
95+
@mongo_retry
96+
def drop_index(self, index_or_name):
97+
"""
98+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.drop_index
99+
"""
100+
return self._collection.drop_index(index_or_name)
101+
102+
@mongo_retry
103+
def index_information(self):
104+
"""
105+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.index_information
106+
"""
107+
return self._collection.index_information()

arctic/store/version_store.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
465465
sort=[('version', pymongo.DESCENDING)])
466466

467467
if len(data) == 0 and previous_version is not None:
468-
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version,
468+
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version['version'],
469469
metadata=version.pop('metadata', None), data=None)
470470

471471
if upsert and previous_version is None:

arctic/tickstore/tickstore.py

+12-13
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from bson.binary import Binary
55
import copy
66
from datetime import datetime as dt, timedelta
7-
import lz4
87
import numpy as np
98
import pandas as pd
109
from pandas.core.frame import _arrays_to_mgr
@@ -17,7 +16,7 @@
1716
from ..decorators import mongo_retry
1817
from ..exceptions import OverlappingDataException, NoDataFoundException, UnorderedDataException, UnhandledDtypeException, ArcticException
1918
from .._util import indent
20-
19+
from arctic._compression import compress, compressHC, decompress
2120

2221
logger = logging.getLogger(__name__)
2322

@@ -413,7 +412,7 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
413412
rtn = {}
414413
if doc[VERSION] != 3:
415414
raise ArcticException("Unhandled document version: %s" % doc[VERSION])
416-
rtn[INDEX] = np.cumsum(np.fromstring(lz4.decompress(doc[INDEX]), dtype='uint64'))
415+
rtn[INDEX] = np.cumsum(np.fromstring(decompress(doc[INDEX]), dtype='uint64'))
417416
doc_length = len(rtn[INDEX])
418417
column_set.update(doc[COLUMNS].keys())
419418

@@ -422,7 +421,7 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
422421
for c in column_set:
423422
try:
424423
coldata = doc[COLUMNS][c]
425-
mask = np.fromstring(lz4.decompress(coldata[ROWMASK]), dtype='uint8')
424+
mask = np.fromstring(decompress(coldata[ROWMASK]), dtype='uint8')
426425
union_mask = union_mask | mask
427426
except KeyError:
428427
rtn[c] = None
@@ -438,11 +437,11 @@ def _read_bucket(self, doc, column_set, column_dtypes, include_symbol, include_i
438437
try:
439438
coldata = doc[COLUMNS][c]
440439
dtype = np.dtype(coldata[DTYPE])
441-
values = np.fromstring(lz4.decompress(coldata[DATA]), dtype=dtype)
440+
values = np.fromstring(decompress(coldata[DATA]), dtype=dtype)
442441
self._set_or_promote_dtype(column_dtypes, c, dtype)
443442
rtn[c] = self._empty(rtn_length, dtype=column_dtypes[c])
444-
rowmask = np.unpackbits(np.fromstring(lz4.decompress(coldata[ROWMASK]),
445-
dtype='uint8'))[:doc_length].astype('bool')
443+
rowmask = np.unpackbits(np.fromstring(decompress(coldata[ROWMASK]),
444+
dtype='uint8'))[:doc_length].astype('bool')
446445
rowmask = rowmask[union_mask]
447446
rtn[c][rowmask] = values
448447
except KeyError:
@@ -644,18 +643,18 @@ def _pandas_to_bucket(df, symbol, initial_image):
644643
rtn[START] = start
645644

646645
logger.warning("NB treating all values as 'exists' - no longer sparse")
647-
rowmask = Binary(lz4.compressHC(np.packbits(np.ones(len(df), dtype='uint8'))))
646+
rowmask = Binary(compressHC(np.packbits(np.ones(len(df), dtype='uint8')).tostring()))
648647

649648
index_name = df.index.names[0] or "index"
650649
recs = df.to_records(convert_datetime64=False)
651650
for col in df:
652651
array = TickStore._ensure_supported_dtypes(recs[col])
653652
col_data = {}
654-
col_data[DATA] = Binary(lz4.compressHC(array.tostring()))
653+
col_data[DATA] = Binary(compressHC(array.tostring()))
655654
col_data[ROWMASK] = rowmask
656655
col_data[DTYPE] = TickStore._str_dtype(array.dtype)
657656
rtn[COLUMNS][col] = col_data
658-
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([recs[index_name][0].astype('datetime64[ms]').view('uint64')],
657+
rtn[INDEX] = Binary(compressHC(np.concatenate(([recs[index_name][0].astype('datetime64[ms]').view('uint64')],
659658
np.diff(recs[index_name].astype('datetime64[ms]').view('uint64')))).tostring()))
660659
return rtn, final_image
661660

@@ -686,13 +685,13 @@ def _to_bucket(ticks, symbol, initial_image):
686685
rowmask[k][i] = 1
687686
data[k] = [v]
688687

689-
rowmask = dict([(k, Binary(lz4.compressHC(np.packbits(v).tostring())))
688+
rowmask = dict([(k, Binary(compressHC(np.packbits(v).tostring())))
690689
for k, v in iteritems(rowmask)])
691690
for k, v in iteritems(data):
692691
if k != 'index':
693692
v = np.array(v)
694693
v = TickStore._ensure_supported_dtypes(v)
695-
rtn[COLUMNS][k] = {DATA: Binary(lz4.compressHC(v.tostring())),
694+
rtn[COLUMNS][k] = {DATA: Binary(compressHC(v.tostring())),
696695
DTYPE: TickStore._str_dtype(v.dtype),
697696
ROWMASK: rowmask[k]}
698697

@@ -705,7 +704,7 @@ def _to_bucket(ticks, symbol, initial_image):
705704
rtn[IMAGE_DOC] = {IMAGE_TIME: image_start, IMAGE: initial_image}
706705
rtn[END] = end
707706
rtn[START] = start
708-
rtn[INDEX] = Binary(lz4.compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
707+
rtn[INDEX] = Binary(compressHC(np.concatenate(([data['index'][0]], np.diff(data['index']))).tostring()))
709708
return rtn, final_image
710709

711710
def max_date(self, symbol):

setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def run_tests(self):
7777

7878
setup(
7979
name="arctic",
80-
version="1.43.0",
80+
version="1.44.0",
8181
author="Man AHL Technology",
8282
author_email="[email protected]",
8383
description=("AHL Research Versioned TimeSeries and Tick store"),
@@ -94,7 +94,6 @@ def run_tests(self):
9494
],
9595
install_requires=["decorator",
9696
"enum34",
97-
"lz4<=0.8.2",
9897
"mockextras",
9998
"pandas",
10099
"pymongo>=3.0",
@@ -109,6 +108,7 @@ def run_tests(self):
109108
"pytest-server-fixtures",
110109
"pytest-timeout",
111110
"pytest-xdist",
111+
"lz4"
112112
],
113113
entry_points={'console_scripts': [
114114
'arctic_init_library = arctic.scripts.arctic_init_library:main',

tests/integration/store/test_version_store.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,10 @@ def test_append_should_overwrite_after_delete(library):
274274

275275

276276
def test_append_empty_ts(library):
277-
library.append(symbol, ts1, upsert=True)
278-
library.append(symbol, pd.DataFrame(), upsert=True)
277+
data = library.append(symbol, ts1, upsert=True)
278+
assert(data.version == 1)
279+
data = library.append(symbol, pd.DataFrame(), upsert=True)
280+
assert(data.version == 1)
279281
assert len(library.read(symbol).data) == len(ts1)
280282

281283

0 commit comments

Comments
 (0)