Skip to content

Commit 4661d77

Browse files
TomAugspurgerJosiah Baker
authored and
Josiah Baker
committed
DOC: Add scaling to large datasets section (pandas-dev#28577)
* DOC: Add scaling to large datasets section Closes pandas-dev#28315
1 parent 225d5e2 commit 4661d77

File tree

9 files changed

+481
-3
lines changed

9 files changed

+481
-3
lines changed

doc/.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
data/
2+
timeseries.csv
3+
timeseries.parquet
4+
timeseries_wide.parquet

doc/source/index.rst.template

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ See the :ref:`overview` for more detail about what's in the library.
8383
* :doc:`user_guide/style`
8484
* :doc:`user_guide/options`
8585
* :doc:`user_guide/enhancingperf`
86+
* :doc:`user_guide/scale`
8687
* :doc:`user_guide/sparse`
8788
* :doc:`user_guide/gotchas`
8889
* :doc:`user_guide/cookbook`

doc/source/user_guide/index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Further information on any specific method can be obtained in the
3838
style
3939
options
4040
enhancingperf
41+
scale
4142
sparse
4243
gotchas
4344
cookbook

doc/source/user_guide/scale.rst

+373
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
.. _scale:
2+
3+
*************************
4+
Scaling to large datasets
5+
*************************
6+
7+
Pandas provides data structures for in-memory analytics, which makes using pandas
8+
to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets
9+
that are a sizable fraction of memory become unwieldy, as some pandas operations need
10+
to make intermediate copies.
11+
12+
This document provides a few recommendations for scaling your analysis to larger datasets.
13+
It's a complement to :ref:`enhancingperf`, which focuses on speeding up analysis
14+
for datasets that fit in memory.
15+
16+
But first, it's worth considering *not using pandas*. Pandas isn't the right
17+
tool for all situations. If you're working with very large datasets and a tool
18+
like PostgreSQL fits your needs, then you should probably be using that.
19+
Assuming you want or need the expressiveness and power of pandas, let's carry on.
20+
21+
.. ipython:: python
22+
23+
import pandas as pd
24+
import numpy as np
25+
26+
.. ipython:: python
27+
:suppress:
28+
29+
from pandas.util.testing import _make_timeseries
30+
31+
# Make a random in-memory dataset
32+
ts = _make_timeseries(freq="30S", seed=0)
33+
ts.to_csv("timeseries.csv")
34+
ts.to_parquet("timeseries.parquet")
35+
36+
37+
Load less data
38+
--------------
39+
40+
.. ipython:: python
41+
:suppress:
42+
43+
# make a similar dataset with many columns
44+
timeseries = [
45+
_make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}")
46+
for i in range(10)
47+
]
48+
ts_wide = pd.concat(timeseries, axis=1)
49+
ts_wide.to_parquet("timeseries_wide.parquet")
50+
51+
Suppose our raw dataset on disk has many columns::
52+
53+
id_0 name_0 x_0 y_0 id_1 name_1 x_1 ... name_8 x_8 y_8 id_9 name_9 x_9 y_9
54+
timestamp ...
55+
2000-01-01 00:00:00 1015 Michael -0.399453 0.095427 994 Frank -0.176842 ... Dan -0.315310 0.713892 1025 Victor -0.135779 0.346801
56+
2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275 1003 Laura 0.459153 ... Ursula 0.913244 -0.630308 1047 Wendy -0.886285 0.035852
57+
2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710 1046 Michael 0.524994 ... Ray -0.656593 0.692568 1064 Yvonne 0.070426 0.432047
58+
2000-01-01 00:03:00 939 Alice -0.746004 -0.908008 996 Ingrid -0.414523 ... Jerry -0.958994 0.608210 978 Wendy 0.855949 -0.648988
59+
2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504 1048 Jerry -0.569235 ... Frank -0.577022 -0.409088 994 Bob -0.270132 0.335176
60+
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
61+
2000-12-30 23:56:00 999 Tim 0.162578 0.512817 973 Kevin -0.403352 ... Tim -0.380415 0.008097 1041 Charlie 0.191477 -0.599519
62+
2000-12-30 23:57:00 970 Laura -0.433586 -0.600289 958 Oliver -0.966577 ... Zelda 0.971274 0.402032 1038 Ursula 0.574016 -0.930992
63+
2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540 971 Tim 0.158484 ... Alice -0.222079 -0.919274 1022 Dan 0.031345 -0.657755
64+
2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974 981 Hannah 0.607517 ... Sarah -0.424440 -0.117274 990 George -0.375530 0.563312
65+
2000-12-31 00:00:00 937 Ursula -0.906523 0.943178 1018 Alice -0.564513 ... Jerry 0.236837 0.807650 985 Oliver 0.777642 0.783392
66+
67+
[525601 rows x 40 columns]
68+
69+
70+
To load the columns we want, we have two options.
71+
Option 1 loads in all the data and then filters to what we need.
72+
73+
.. ipython:: python
74+
75+
columns = ['id_0', 'name_0', 'x_0', 'y_0']
76+
77+
pd.read_parquet("timeseries_wide.parquet")[columns]
78+
79+
Option 2 only loads the columns we request.
80+
81+
.. ipython:: python
82+
83+
pd.read_parquet("timeseries_wide.parquet", columns=columns)
84+
85+
If we were to measure the memory usage of the two calls, we'd see that specifying
86+
``columns`` uses about 1/10th the memory in this case.
87+
88+
With :func:`pandas.read_csv`, you can specify ``usecols`` to limit the columns
89+
read into memory. Not all file formats that can be read by pandas provide an option
90+
to read a subset of columns.
91+
92+
Use efficient datatypes
93+
-----------------------
94+
95+
The default pandas data types are not the most memory efficient. This is
96+
especially true for high-cardinality text data (columns with relatively few
97+
unique values). By using more efficient data types you can store larger datasets
98+
in memory.
99+
100+
.. ipython:: python
101+
102+
ts = pd.read_parquet("timeseries.parquet")
103+
ts
104+
105+
Now, let's inspect the data types and memory usage to see where we should focus our
106+
attention.
107+
108+
.. ipython:: python
109+
110+
ts.dtypes
111+
112+
.. ipython:: python
113+
114+
ts.memory_usage(deep=True) # memory usage in bytes
115+
116+
117+
The ``name`` column is taking up much more memory than any other. It has just a
118+
few unique values, so it's a good candidate for converting to a
119+
:class:`Categorical`. With a Categorical, we store each unique name once and use
120+
space-efficient integers to know which specific name is used in each row.
121+
122+
123+
.. ipython:: python
124+
125+
ts2 = ts.copy()
126+
ts2['name'] = ts2['name'].astype('category')
127+
ts2.memory_usage(deep=True)
128+
129+
We can go a bit further and downcast the numeric columns to their smallest types
130+
using :func:`pandas.to_numeric`.
131+
132+
.. ipython:: python
133+
134+
ts2['id'] = pd.to_numeric(ts2['id'], downcast='unsigned')
135+
ts2[['x', 'y']] = ts2[['x', 'y']].apply(pd.to_numeric, downcast='float')
136+
ts2.dtypes
137+
138+
.. ipython:: python
139+
140+
ts2.memory_usage(deep=True)
141+
142+
.. ipython:: python
143+
144+
reduction = (ts2.memory_usage(deep=True).sum()
145+
/ ts.memory_usage(deep=True).sum())
146+
print(f"{reduction:0.2f}")
147+
148+
In all, we've reduced the in-memory footprint of this dataset to 1/5 of its
149+
original size.
150+
151+
See :ref:`categorical` for more on ``Categorical`` and :ref:`basics.dtypes`
152+
for an overview of all of pandas' dtypes.
153+
154+
Use chunking
155+
------------
156+
157+
Some workloads can be achieved with chunking: splitting a large problem like "convert this
158+
directory of CSVs to parquet" into a bunch of small problems ("convert this individual CSV
159+
file into a Parquet file. Now repeat that for each file in this directory."). As long as each chunk
160+
fits in memory, you can work with datasets that are much larger than memory.
161+
162+
.. note::
163+
164+
Chunking works well when the operation you're performing requires zero or minimal
165+
coordination between chunks. For more complicated workflows, you're better off
166+
:ref:`using another library <scale.other_libraries>`.
167+
168+
Suppose we have an even larger "logical dataset" on disk that's a directory of parquet
169+
files. Each file in the directory represents a different year of the entire dataset.
170+
171+
.. ipython:: python
172+
:suppress:
173+
174+
import pathlib
175+
176+
N = 12
177+
starts = [f'20{i:>02d}-01-01' for i in range(N)]
178+
ends = [f'20{i:>02d}-12-13' for i in range(N)]
179+
180+
pathlib.Path("data/timeseries").mkdir(exist_ok=True)
181+
182+
for i, (start, end) in enumerate(zip(starts, ends)):
183+
ts = _make_timeseries(start=start, end=end, freq='1T', seed=i)
184+
ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet")
185+
186+
187+
::
188+
189+
data
190+
└── timeseries
191+
├── ts-00.parquet
192+
├── ts-01.parquet
193+
├── ts-02.parquet
194+
├── ts-03.parquet
195+
├── ts-04.parquet
196+
├── ts-05.parquet
197+
├── ts-06.parquet
198+
├── ts-07.parquet
199+
├── ts-08.parquet
200+
├── ts-09.parquet
201+
├── ts-10.parquet
202+
└── ts-11.parquet
203+
204+
Now we'll implement an out-of-core ``value_counts``. The peak memory usage of this
205+
workflow is the single largest chunk, plus a small series storing the unique value
206+
counts up to this point. As long as each individual file fits in memory, this will
207+
work for arbitrary-sized datasets.
208+
209+
.. ipython:: python
210+
211+
%%time
212+
files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
213+
counts = pd.Series(dtype=int)
214+
for path in files:
215+
# Only one dataframe is in memory at a time...
216+
df = pd.read_parquet(path)
217+
# ... plus a small Series `counts`, which is updated.
218+
counts = counts.add(df['name'].value_counts(), fill_value=0)
219+
counts.astype(int)
220+
221+
Some readers, like :meth:`pandas.read_csv`, offer parameters to control the
222+
``chunksize`` when reading a single file.
223+
224+
Manually chunking is an OK option for workflows that don't
225+
require too sophisticated of operations. Some operations, like ``groupby``, are
226+
much harder to do chunkwise. In these cases, you may be better switching to a
227+
different library that implements these out-of-core algorithms for you.
228+
229+
.. _scale.other_libraries:
230+
231+
Use other libraries
232+
-------------------
233+
234+
Pandas is just one library offering a DataFrame API. Because of its popularity,
235+
pandas' API has become something of a standard that other libraries implement.
236+
The pandas documentation maintains a list of libraries implementing a DataFrame API
237+
in :ref:`our ecosystem page <ecosystem.out-of-core>`.
238+
239+
For example, `Dask`_, a parallel computing library, has `dask.dataframe`_, a
240+
pandas-like API for working with larger than memory datasets in parallel. Dask
241+
can use multiple threads or processes on a single machine, or a cluster of
242+
machines to process data in parallel.
243+
244+
245+
We'll import ``dask.dataframe`` and notice that the API feels similar to pandas.
246+
We can use Dask's ``read_parquet`` function, but provide a globstring of files to read in.
247+
248+
.. ipython:: python
249+
250+
import dask.dataframe as dd
251+
252+
ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
253+
ddf
254+
255+
Inspecting the ``ddf`` object, we see a few things
256+
257+
* There are familiar attributes like ``.columns`` and ``.dtypes``
258+
* There are familiar methods like ``.groupby``, ``.sum``, etc.
259+
* There are new attributes like ``.npartitions`` and ``.divisions``
260+
261+
The partitions and divisions are how Dask parallizes computation. A **Dask**
262+
DataFrame is made up of many **Pandas** DataFrames. A single method call on a
263+
Dask DataFrame ends up making many pandas method calls, and Dask knows how to
264+
coordinate everything to get the result.
265+
266+
.. ipython:: python
267+
268+
ddf.columns
269+
ddf.dtypes
270+
ddf.npartitions
271+
272+
One major difference: the ``dask.dataframe`` API is *lazy*. If you look at the
273+
repr above, you'll notice that the values aren't actually printed out; just the
274+
column names and dtypes. That's because Dask hasn't actually read the data yet.
275+
Rather than executing immediately, doing operations build up a **task graph**.
276+
277+
.. ipython:: python
278+
279+
ddf
280+
ddf['name']
281+
ddf['name'].value_counts()
282+
283+
Each of these calls is instant because the result isn't being computed yet.
284+
We're just building up a list of computation to do when someone needs the
285+
result. Dask knows that the return type of a ``pandas.Series.value_counts``
286+
is a pandas Series with a certain dtype and a certain name. So the Dask version
287+
returns a Dask Series with the same dtype and the same name.
288+
289+
To get the actual result you can call ``.compute()``.
290+
291+
.. ipython:: python
292+
293+
%time ddf['name'].value_counts().compute()
294+
295+
At that point, you get back the same thing you'd get with pandas, in this case
296+
a concrete pandas Series with the count of each ``name``.
297+
298+
Calling ``.compute`` causes the full task graph to be executed. This includes
299+
reading the data, selecting the columns, and doing the ``value_counts``. The
300+
execution is done *in parallel* where possible, and Dask tries to keep the
301+
overall memory footprint small. You can work with datasets that are much larger
302+
than memory, as long as each partition (a regular pandas DataFrame) fits in memory.
303+
304+
By default, ``dask.dataframe`` operations use a threadpool to do operations in
305+
parallel. We can also connect to a cluster to distribute the work on many
306+
machines. In this case we'll connect to a local "cluster" made up of several
307+
processes on this single machine.
308+
309+
.. code-block:: python
310+
311+
>>> from dask.distributed import Client, LocalCluster
312+
313+
>>> cluster = LocalCluster()
314+
>>> client = Client(cluster)
315+
>>> client
316+
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>
317+
318+
Once this ``client`` is created, all of Dask's computation will take place on
319+
the cluster (which is just processes in this case).
320+
321+
Dask implements the most used parts of the pandas API. For example, we can do
322+
a familiar groupby aggregation.
323+
324+
.. ipython:: python
325+
326+
%time ddf.groupby('name')[['x', 'y']].mean().compute().head()
327+
328+
The grouping and aggregation is done out-of-core and in parallel.
329+
330+
When Dask knows the ``divisions`` of a dataset, certain optimizations are
331+
possible. When reading parquet datasets written by dask, the divisions will be
332+
known automatically. In this case, since we created the parquet files manually,
333+
we need to supply the divisions manually.
334+
335+
.. ipython:: python
336+
337+
N = 12
338+
starts = [f'20{i:>02d}-01-01' for i in range(N)]
339+
ends = [f'20{i:>02d}-12-13' for i in range(N)]
340+
341+
divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
342+
ddf.divisions = divisions
343+
ddf
344+
345+
Now we can do things like fast random access with ``.loc``.
346+
347+
.. ipython:: python
348+
349+
ddf.loc['2002-01-01 12:01':'2002-01-01 12:05'].compute()
350+
351+
Dask knows to just look in the 3rd partition for selecting values in `2002`. It
352+
doesn't need to look at any other data.
353+
354+
Many workflows involve a large amount of data and processing it in a way that
355+
reduces the size to something that fits in memory. In this case, we'll resample
356+
to daily frequency and take the mean. Once we've taken the mean, we know the
357+
results will fit in memory, so we can safely call ``compute`` without running
358+
out of memory. At that point it's just a regular pandas object.
359+
360+
.. ipython:: python
361+
362+
@savefig dask_resample.png
363+
ddf[['x', 'y']].resample("1D").mean().cumsum().compute().plot()
364+
365+
These Dask examples have all be done using multiple processes on a single
366+
machine. Dask can be `deployed on a cluster
367+
<https://docs.dask.org/en/latest/setup.html>`_ to scale up to even larger
368+
datasets.
369+
370+
You see more dask examples at https://examples.dask.org.
371+
372+
.. _Dask: https://dask.org
373+
.. _dask.dataframe: https://docs.dask.org/en/latest/dataframe.html

0 commit comments

Comments
 (0)