forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_network.py
200 lines (164 loc) · 7.53 KB
/
test_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# -*- coding: utf-8 -*-
"""
Tests parsers ability to read and parse non-local files
and hence require a network connection to be read.
"""
import logging
import pytest
import numpy as np
import pandas.util.testing as tm
import pandas.util._test_decorators as td
from pandas import DataFrame
from pandas.io.parsers import read_csv, read_table
from pandas.compat import BytesIO, StringIO
@pytest.mark.network
@pytest.mark.parametrize(
"compress_type, extension", [
('gzip', '.gz'), ('bz2', '.bz2'), ('zip', '.zip'),
pytest.param('xz', '.xz', marks=td.skip_if_no_lzma)
]
)
@pytest.mark.parametrize('mode', ['explicit', 'infer'])
@pytest.mark.parametrize('engine', ['python', 'c'])
def test_compressed_urls(salaries_table, compress_type, extension, mode,
engine):
check_compressed_urls(salaries_table, compress_type, extension, mode,
engine)
@tm.network
def check_compressed_urls(salaries_table, compression, extension, mode,
engine):
# test reading compressed urls with various engines and
# extension inference
base_url = ('https://github.com/pandas-dev/pandas/raw/master/'
'pandas/tests/io/parser/data/salaries.csv')
url = base_url + extension
if mode != 'explicit':
compression = mode
url_table = read_table(url, compression=compression, engine=engine)
tm.assert_frame_equal(url_table, salaries_table)
@pytest.fixture
def tips_df(datapath):
"""DataFrame with the tips dataset."""
return read_csv(datapath('io', 'parser', 'data', 'tips.csv'))
@pytest.mark.usefixtures("s3_resource")
class TestS3(object):
def test_parse_public_s3_bucket(self, tips_df):
pytest.importorskip('s3fs')
# more of an integration test due to the not-public contents portion
# can probably mock this though.
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = read_csv('s3://pandas-test/tips.csv' +
ext, compression=comp)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(df, tips_df)
# Read public file from bucket with not-public contents
df = read_csv('s3://cant_get_it/tips.csv')
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(df, tips_df)
def test_parse_public_s3n_bucket(self, tips_df):
# Read from AWS s3 as "s3n" URL
df = read_csv('s3n://pandas-test/tips.csv', nrows=10)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(tips_df.iloc[:10], df)
def test_parse_public_s3a_bucket(self, tips_df):
# Read from AWS s3 as "s3a" URL
df = read_csv('s3a://pandas-test/tips.csv', nrows=10)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(tips_df.iloc[:10], df)
def test_parse_public_s3_bucket_nrows(self, tips_df):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = read_csv('s3://pandas-test/tips.csv' +
ext, nrows=10, compression=comp)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(tips_df.iloc[:10], df)
def test_parse_public_s3_bucket_chunked(self, tips_df):
# Read with a chunksize
chunksize = 5
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df_reader = read_csv('s3://pandas-test/tips.csv' + ext,
chunksize=chunksize, compression=comp)
assert df_reader.chunksize == chunksize
for i_chunk in [0, 1, 2]:
# Read a couple of chunks and make sure we see them
# properly.
df = df_reader.get_chunk()
assert isinstance(df, DataFrame)
assert not df.empty
true_df = tips_df.iloc[
chunksize * i_chunk: chunksize * (i_chunk + 1)]
tm.assert_frame_equal(true_df, df)
def test_parse_public_s3_bucket_chunked_python(self, tips_df):
# Read with a chunksize using the Python parser
chunksize = 5
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df_reader = read_csv('s3://pandas-test/tips.csv' + ext,
chunksize=chunksize, compression=comp,
engine='python')
assert df_reader.chunksize == chunksize
for i_chunk in [0, 1, 2]:
# Read a couple of chunks and make sure we see them properly.
df = df_reader.get_chunk()
assert isinstance(df, DataFrame)
assert not df.empty
true_df = tips_df.iloc[
chunksize * i_chunk: chunksize * (i_chunk + 1)]
tm.assert_frame_equal(true_df, df)
def test_parse_public_s3_bucket_python(self, tips_df):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
compression=comp)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(df, tips_df)
def test_infer_s3_compression(self, tips_df):
for ext in ['', '.gz', '.bz2']:
df = read_csv('s3://pandas-test/tips.csv' + ext,
engine='python', compression='infer')
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(df, tips_df)
def test_parse_public_s3_bucket_nrows_python(self, tips_df):
for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
nrows=10, compression=comp)
assert isinstance(df, DataFrame)
assert not df.empty
tm.assert_frame_equal(tips_df.iloc[:10], df)
def test_s3_fails(self):
with pytest.raises(IOError):
read_csv('s3://nyqpug/asdf.csv')
# Receive a permission error when trying to read a private bucket.
# It's irrelevant here that this isn't actually a table.
with pytest.raises(IOError):
read_csv('s3://cant_get_it/')
def test_read_csv_handles_boto_s3_object(self,
s3_resource,
tips_file):
# see gh-16135
s3_object = s3_resource.meta.client.get_object(
Bucket='pandas-test',
Key='tips.csv')
result = read_csv(BytesIO(s3_object["Body"].read()), encoding='utf8')
assert isinstance(result, DataFrame)
assert not result.empty
expected = read_csv(tips_file)
tm.assert_frame_equal(result, expected)
def test_read_csv_chunked_download(self, s3_resource, caplog):
# 8 MB, S3FS usees 5MB chunks
df = DataFrame(np.random.randn(100000, 4), columns=list('abcd'))
buf = BytesIO()
str_buf = StringIO()
df.to_csv(str_buf)
buf = BytesIO(str_buf.getvalue().encode('utf-8'))
s3_resource.Bucket("pandas-test").put_object(
Key="large-file.csv",
Body=buf)
with caplog.at_level(logging.DEBUG, logger='s3fs.core'):
read_csv("s3://pandas-test/large-file.csv", nrows=5)
# log of fetch_range (start, stop)
assert ((0, 5505024) in {x.args[-2:] for x in caplog.records})