Skip to content

Commit 2e65722

Browse files
authored
Use cramjam and py3 speed for string decode (#580)
* Use cramjam and py3 speed for string decode * update env * include the code * Add cramjam LZ4 (block) codec * Omitted ttype attribute * Remove .c file This used to be the done thing ... before wheels! Now it's just bloat.
1 parent 39a9d7c commit 2e65722

21 files changed

+109
-8647
lines changed

.github/workflows/main.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ jobs:
1313
strategy:
1414
fail-fast: false
1515
matrix:
16-
CONDA_ENV: [py37, py38, py37z]
16+
CONDA_ENV: [py37, py38, py39]
1717
steps:
1818
- name: APT
19-
run: sudo apt-get install liblzo2-dev libsnappy-dev
19+
run: sudo apt-get install liblzo2-dev
2020

2121
- name: Checkout
2222
uses: actions/checkout@v2

ci/environment-py37.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ dependencies:
2020
- numpy
2121
- packaging
2222
- python-snappy
23+
- cramjam

ci/environment-py38.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ dependencies:
2020
- numpy
2121
- packaging
2222
- python-snappy
23+
- cramjam

ci/environment-py37z.yml renamed to ci/environment-py39.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ channels:
33
- conda-forge
44
- defaults
55
dependencies:
6-
- python=3.7
6+
- python=3.9
77
- brotli
88
- bson
99
- lz4
1010
- lzo
1111
- snappy
12-
- zstd
12+
- zstandard
1313
- pytest
1414
- numba
1515
- cython
@@ -20,3 +20,4 @@ dependencies:
2020
- numpy
2121
- packaging
2222
- python-snappy
23+
- cramjam

docs/source/install.rst

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ Required:
1010
- numpy
1111
- pandas
1212
- pytest
13+
- cramjam
1314

14-
Optional (compression algorithms; gzip is always available):
15+
`cramjam`_ provides compression codecs: gzip, snappy, lz4, brotli, zstd
16+
17+
.. _cramjam: https://github.com/milesgranger/pyrus-cramjam
18+
19+
Optional compression codec:
1520

16-
- python-snappy
1721
- python-lzo
18-
- brotli
1922

2023
Installation
2124
------------

fastparquet/api.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -662,12 +662,12 @@ def filter_out_stats(rg, filters, schema):
662662
s = column.meta_data.statistics
663663
if s.max is not None:
664664
b = ensure_bytes(s.max)
665-
vmax = encoding.read_plain(b, column.meta_data.type, 1)
665+
vmax = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
666666
if se.converted_type is not None:
667667
vmax = converted_types.convert(vmax, se)
668668
if s.min is not None:
669669
b = ensure_bytes(s.min)
670-
vmin = encoding.read_plain(b, column.meta_data.type, 1)
670+
vmin = encoding.read_plain(b, column.meta_data.type, 1, stat=True)
671671
if se.converted_type is not None:
672672
vmin = converted_types.convert(vmin, se)
673673
if filter_val(op, val, vmin, vmax):
@@ -708,7 +708,7 @@ def statistics(obj):
708708
rv['max'] = ensure_bytes(s.max)
709709
else:
710710
rv['max'] = encoding.read_plain(ensure_bytes(s.max),
711-
md.type, 1)[0]
711+
md.type, 1, stat=True)[0]
712712
except:
713713
rv['max'] = None
714714
if s.min is not None:
@@ -717,7 +717,7 @@ def statistics(obj):
717717
rv['min'] = ensure_bytes(s.min)
718718
else:
719719
rv['min'] = encoding.read_plain(ensure_bytes(s.min),
720-
md.type, 1)[0]
720+
md.type, 1, stat=True)[0]
721721
except:
722722
rv['min'] = None
723723
if s.null_count is not None:

fastparquet/benchmarks/columns.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def time_column():
3737
df = d[[col]]
3838
write(fn, df)
3939
with measure('%s: write, no nulls' % d.dtypes[col], result):
40-
write(fn, df, has_nulls=False)
40+
write(fn, df, has_nulls=False)#, compression="SNAPPY")
4141

4242
pf = ParquetFile(fn)
4343
pf.to_pandas() # warm-up
@@ -46,7 +46,7 @@ def time_column():
4646
pf.to_pandas()
4747

4848
with measure('%s: write, no nulls, has_null=True' % d.dtypes[col], result):
49-
write(fn, df, has_nulls=True)
49+
write(fn, df, has_nulls=True)#, compression="SNAPPY")
5050

5151
pf = ParquetFile(fn)
5252
pf.to_pandas() # warm-up
@@ -63,7 +63,7 @@ def time_column():
6363
else:
6464
d.loc[n//2, col] = None
6565
with measure('%s: write, with null, has_null=True' % d.dtypes[col], result):
66-
write(fn, df, has_nulls=True)
66+
write(fn, df, has_nulls=True)#, compression="SNAPPY")
6767

6868
pf = ParquetFile(fn)
6969
pf.to_pandas() # warm-up
@@ -72,7 +72,7 @@ def time_column():
7272
pf.to_pandas()
7373

7474
with measure('%s: write, with null, has_null=False' % d.dtypes[col], result):
75-
write(fn, df, has_nulls=False)
75+
write(fn, df, has_nulls=False)#, compression="SNAPPY")
7676

7777
pf = ParquetFile(fn)
7878
pf.to_pandas() # warm-up

fastparquet/compression.py

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
import gzip
2+
import cramjam
33
from .thrift_structures import parquet_thrift
44

55
# TODO: use stream/direct-to-buffer conversions instead of memcopy
@@ -16,24 +16,23 @@
1616

1717

1818
def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
19-
return gzip.compress(data, compresslevel=compresslevel)
19+
return cramjam.gzip.compress(data, level=compresslevel)
2020

2121

2222
def gzip_decompress(data, uncompressed_size):
23-
return gzip.decompress(data)
23+
return cramjam.gzip.decompress(data, output_len=uncompressed_size)
2424

2525

2626
compressions['GZIP'] = gzip_compress_v3
2727
decompressions['GZIP'] = gzip_decompress
2828

29-
try:
30-
import snappy
31-
def snappy_decompress(data, uncompressed_size):
32-
return snappy.decompress(data)
33-
compressions['SNAPPY'] = snappy.compress
34-
decompressions['SNAPPY'] = snappy_decompress
35-
except ImportError:
36-
pass
29+
30+
def snappy_decompress(data, uncompressed_size):
31+
return cramjam.snappy.decompress_raw(data)
32+
33+
34+
compressions['SNAPPY'] = cramjam.snappy.compress_raw
35+
decompressions['SNAPPY'] = snappy_decompress
3736
try:
3837
import lzo
3938
def lzo_decompress(data, uncompressed_size):
@@ -42,51 +41,24 @@ def lzo_decompress(data, uncompressed_size):
4241
decompressions['LZO'] = lzo_decompress
4342
except ImportError:
4443
pass
45-
try:
46-
import brotli
47-
def brotli_decompress(data, uncompressed_size):
48-
return brotli.decompress(data)
49-
compressions['BROTLI'] = brotli.compress
50-
decompressions['BROTLI'] = brotli_decompress
51-
except ImportError:
52-
pass
53-
try:
54-
import lz4.block
55-
def lz4_compress(data, **kwargs):
56-
kwargs['store_size'] = False
57-
return lz4.block.compress(data, **kwargs)
58-
def lz4_decompress(data, uncompressed_size):
59-
return lz4.block.decompress(data, uncompressed_size=uncompressed_size)
60-
compressions['LZ4'] = lz4_compress
61-
decompressions['LZ4'] = lz4_decompress
62-
except ImportError:
63-
pass
64-
try:
65-
import zstandard
66-
def zstd_compress(data, **kwargs):
67-
kwargs['write_content_size'] = False
68-
cctx = zstandard.ZstdCompressor(**kwargs)
69-
return cctx.compress(data)
70-
def zstd_decompress(data, uncompressed_size):
71-
dctx = zstandard.ZstdDecompressor()
72-
return dctx.decompress(data, max_output_size=uncompressed_size)
73-
compressions['ZSTD'] = zstd_compress
74-
decompressions['ZSTD'] = zstd_decompress
75-
except ImportError:
76-
pass
77-
if 'ZSTD' not in compressions:
78-
try:
79-
import zstd
80-
def zstd_compress(data, level=None):
81-
if level is not None:
82-
return zstd.compress(data, level)
83-
return zstd.compress(data)
84-
def zstd_decompress(data, _uncompressed_size=None):
85-
return zstd.decompress(data)
86-
compressions['ZSTD'] = zstd_compress
87-
decompressions['ZSTD'] = zstd_decompress
88-
except ImportError:
89-
pass
44+
compressions['BROTLI'] = cramjam.brotli.compress
45+
decompressions['BROTLI'] = cramjam.brotli.decompress
46+
47+
48+
def lz4_compress(data, **kwargs):
49+
kwargs['store_size'] = False
50+
return cramjam.lz4.compress_block(data, **kwargs)
51+
52+
53+
compressions['LZ4'] = lz4_compress
54+
decompressions['LZ4'] = cramjam.lz4.decompress_block
55+
56+
# LZ4 is actually LZ4 block, aka "raw", see
57+
# https://github.com/apache/parquet-format/commit/7f06e838cbd1b7dbd722ff2580b9c2525e37fc46
58+
compressions['LZ4_RAW'] = lz4_compress
59+
decompressions['LZ4_RAW'] = cramjam.lz4.decompress_block
60+
compressions['ZSTD'] = cramjam.zstd.compress
61+
decompressions['ZSTD'] = cramjam.zstd.decompress
9062

9163
compressions = {k.upper(): v for k, v in compressions.items()}
9264
decompressions = {k.upper(): v for k, v in decompressions.items()}
@@ -119,6 +91,7 @@ def compress_data(data, compression='gzip'):
11991
raise ValueError("args dict entry is not a dict")
12092
return compressions[algorithm.upper()](data, **args)
12193

94+
12295
def decompress_data(data, uncompressed_size, algorithm='gzip'):
12396
if isinstance(algorithm, int):
12497
algorithm = rev_map[algorithm]

fastparquet/converted_types.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import sys
1616

1717
from .thrift_structures import parquet_thrift
18-
from .speedups import array_decode_utf8
1918

2019
logger = logging.getLogger('parquet') # pylint: disable=invalid-name
2120

@@ -93,9 +92,10 @@ def convert(data, se, timestamp96=True):
9392
if ctype is None:
9493
return data
9594
if ctype == parquet_thrift.ConvertedType.UTF8:
96-
if isinstance(data, list) or data.dtype != "O":
97-
data = np.asarray(data, dtype="O")
98-
return array_decode_utf8(data)
95+
if data.dtype != "O":
96+
# stats pairs
97+
return np.array([o.decode() for o in data])
98+
return np.array(data) # was already converted in speedups
9999
if ctype == parquet_thrift.ConvertedType.DECIMAL:
100100
scale_factor = 10**-se.scale
101101
if data.dtype.kind in ['i', 'f']:

fastparquet/core.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,20 @@ def read_data_page(f, helper, header, metadata, skip_nulls=False,
110110
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
111111

112112
nval = daph.num_values - num_nulls
113+
se = helper.schema_element(metadata.path_in_schema)
113114
if daph.encoding == parquet_thrift.Encoding.PLAIN:
115+
114116
width = helper.schema_element(metadata.path_in_schema).type_length
115-
values = encoding.read_plain(raw_bytes[io_obj.loc:],
117+
values = encoding.read_plain(bytearray(raw_bytes)[io_obj.loc:],
116118
metadata.type,
117119
int(daph.num_values - num_nulls),
118-
width=width)
120+
width=width,
121+
utf=se.converted_type == 0)
119122
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
120123
parquet_thrift.Encoding.RLE]:
121124
# bit_width is stored as single byte.
122125
if daph.encoding == parquet_thrift.Encoding.RLE:
123-
bit_width = helper.schema_element(
124-
metadata.path_in_schema).type_length
126+
bit_width = se.type_length
125127
else:
126128
bit_width = io_obj.read_byte()
127129
if bit_width in [8, 16, 32] and selfmade:
@@ -149,15 +151,15 @@ def skip_definition_bytes(io_obj, num):
149151
n //= 128
150152

151153

152-
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
154+
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, utf=False):
153155
"""Read a page containing dictionary data.
154156
155157
Consumes data using the plain encoding and returns an array of values.
156158
"""
157159
raw_bytes = _read_page(file_obj, page_header, column_metadata)
158160
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
159-
values = np.array(unpack_byte_array(raw_bytes,
160-
page_header.dictionary_page_header.num_values), dtype='object')
161+
values = np.array(unpack_byte_array(bytearray(raw_bytes),
162+
page_header.dictionary_page_header.num_values, utf=utf), dtype='object')
161163
else:
162164
width = schema_helper.schema_element(
163165
column_metadata.path_in_schema).type_length
@@ -196,7 +198,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
196198

197199
dic = None
198200
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
199-
dic = read_dictionary_page(infile, schema_helper, ph, cmd)
201+
dic = read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0)
200202
ph = read_thrift(infile, parquet_thrift.PageHeader)
201203
dic = convert(dic, se)
202204
if grab_dict:
@@ -229,7 +231,7 @@ def read_col(column, schema_helper, infile, use_cat=False,
229231
row_idx = 0
230232
while True:
231233
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
232-
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd))
234+
dic2 = np.array(read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0))
233235
dic2 = convert(dic2, se)
234236
if use_cat and (dic2 != dic).any():
235237
raise RuntimeError("Attempt to read as categorical a column"

fastparquet/encoding.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def read_plain_boolean(raw_bytes, count):
3535
}
3636

3737

38-
def read_plain(raw_bytes, type_, count, width=0):
38+
def read_plain(raw_bytes, type_, count, width=0, utf=False, stat=False):
3939
if type_ in DECODE_TYPEMAP:
4040
dtype = DECODE_TYPEMAP[type_]
4141
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
@@ -46,15 +46,13 @@ def read_plain(raw_bytes, type_, count, width=0):
4646
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
4747
if type_ == parquet_thrift.Type.BOOLEAN:
4848
return read_plain_boolean(raw_bytes, count)
49-
# variable byte arrays (rare)
50-
try:
51-
return np.array(unpack_byte_array(raw_bytes, count), dtype='O')
52-
except RuntimeError:
53-
if count == 1:
54-
# e.g., for statistics
55-
return np.array([raw_bytes], dtype='O')
56-
else:
57-
raise
49+
if type_ == parquet_thrift.Type.BYTE_ARRAY:
50+
if stat:
51+
if utf:
52+
return np.array([bytes(raw_bytes).decode()], dtype='O')
53+
else:
54+
return np.array([bytes(raw_bytes)], dtype='O')
55+
return np.array(unpack_byte_array(raw_bytes, count, utf=utf))
5856

5957

6058
@numba.jit(nogil=True)

fastparquet/parquet.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ enum CompressionCodec {
366366
BROTLI = 4;
367367
LZ4 = 5;
368368
ZSTD = 6;
369+
LZ4_RAW = 7;
369370
}
370371

371372
enum PageType {

fastparquet/parquet_thrift/parquet/ttypes.py

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)