diff --git a/doc/.gitignore b/doc/.gitignore new file mode 100644 index 0000000000000..e23892d6100e8 --- /dev/null +++ b/doc/.gitignore @@ -0,0 +1,4 @@ +data/ +timeseries.csv +timeseries.parquet +timeseries_wide.parquet diff --git a/doc/source/index.rst.template b/doc/source/index.rst.template index f5669626aa2b3..6ff42eee9dad2 100644 --- a/doc/source/index.rst.template +++ b/doc/source/index.rst.template @@ -83,6 +83,7 @@ See the :ref:`overview` for more detail about what's in the library. * :doc:`user_guide/style` * :doc:`user_guide/options` * :doc:`user_guide/enhancingperf` + * :doc:`user_guide/scale` * :doc:`user_guide/sparse` * :doc:`user_guide/gotchas` * :doc:`user_guide/cookbook` diff --git a/doc/source/user_guide/index.rst b/doc/source/user_guide/index.rst index 05df83decbd7e..b86961a71433b 100644 --- a/doc/source/user_guide/index.rst +++ b/doc/source/user_guide/index.rst @@ -38,6 +38,7 @@ Further information on any specific method can be obtained in the style options enhancingperf + scale sparse gotchas cookbook diff --git a/doc/source/user_guide/scale.rst b/doc/source/user_guide/scale.rst new file mode 100644 index 0000000000000..7b590a3a1fcc8 --- /dev/null +++ b/doc/source/user_guide/scale.rst @@ -0,0 +1,373 @@ +.. _scale: + +************************* +Scaling to large datasets +************************* + +Pandas provides data structures for in-memory analytics, which makes using pandas +to analyze datasets that are larger than memory datasets somewhat tricky. Even datasets +that are a sizable fraction of memory become unwieldy, as some pandas operations need +to make intermediate copies. + +This document provides a few recommendations for scaling your analysis to larger datasets. +It's a complement to :ref:`enhancingperf`, which focuses on speeding up analysis +for datasets that fit in memory. + +But first, it's worth considering *not using pandas*. Pandas isn't the right +tool for all situations. If you're working with very large datasets and a tool +like PostgreSQL fits your needs, then you should probably be using that. +Assuming you want or need the expressiveness and power of pandas, let's carry on. + +.. ipython:: python + + import pandas as pd + import numpy as np + +.. ipython:: python + :suppress: + + from pandas.util.testing import _make_timeseries + + # Make a random in-memory dataset + ts = _make_timeseries(freq="30S", seed=0) + ts.to_csv("timeseries.csv") + ts.to_parquet("timeseries.parquet") + + +Load less data +-------------- + +.. ipython:: python + :suppress: + + # make a similar dataset with many columns + timeseries = [ + _make_timeseries(freq="1T", seed=i).rename(columns=lambda x: f"{x}_{i}") + for i in range(10) + ] + ts_wide = pd.concat(timeseries, axis=1) + ts_wide.to_parquet("timeseries_wide.parquet") + +Suppose our raw dataset on disk has many columns:: + + id_0 name_0 x_0 y_0 id_1 name_1 x_1 ... name_8 x_8 y_8 id_9 name_9 x_9 y_9 + timestamp ... + 2000-01-01 00:00:00 1015 Michael -0.399453 0.095427 994 Frank -0.176842 ... Dan -0.315310 0.713892 1025 Victor -0.135779 0.346801 + 2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275 1003 Laura 0.459153 ... Ursula 0.913244 -0.630308 1047 Wendy -0.886285 0.035852 + 2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710 1046 Michael 0.524994 ... Ray -0.656593 0.692568 1064 Yvonne 0.070426 0.432047 + 2000-01-01 00:03:00 939 Alice -0.746004 -0.908008 996 Ingrid -0.414523 ... Jerry -0.958994 0.608210 978 Wendy 0.855949 -0.648988 + 2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504 1048 Jerry -0.569235 ... Frank -0.577022 -0.409088 994 Bob -0.270132 0.335176 + ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... + 2000-12-30 23:56:00 999 Tim 0.162578 0.512817 973 Kevin -0.403352 ... Tim -0.380415 0.008097 1041 Charlie 0.191477 -0.599519 + 2000-12-30 23:57:00 970 Laura -0.433586 -0.600289 958 Oliver -0.966577 ... Zelda 0.971274 0.402032 1038 Ursula 0.574016 -0.930992 + 2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540 971 Tim 0.158484 ... Alice -0.222079 -0.919274 1022 Dan 0.031345 -0.657755 + 2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974 981 Hannah 0.607517 ... Sarah -0.424440 -0.117274 990 George -0.375530 0.563312 + 2000-12-31 00:00:00 937 Ursula -0.906523 0.943178 1018 Alice -0.564513 ... Jerry 0.236837 0.807650 985 Oliver 0.777642 0.783392 + + [525601 rows x 40 columns] + + +To load the columns we want, we have two options. +Option 1 loads in all the data and then filters to what we need. + +.. ipython:: python + + columns = ['id_0', 'name_0', 'x_0', 'y_0'] + + pd.read_parquet("timeseries_wide.parquet")[columns] + +Option 2 only loads the columns we request. + +.. ipython:: python + + pd.read_parquet("timeseries_wide.parquet", columns=columns) + +If we were to measure the memory usage of the two calls, we'd see that specifying +``columns`` uses about 1/10th the memory in this case. + +With :func:`pandas.read_csv`, you can specify ``usecols`` to limit the columns +read into memory. Not all file formats that can be read by pandas provide an option +to read a subset of columns. + +Use efficient datatypes +----------------------- + +The default pandas data types are not the most memory efficient. This is +especially true for high-cardinality text data (columns with relatively few +unique values). By using more efficient data types you can store larger datasets +in memory. + +.. ipython:: python + + ts = pd.read_parquet("timeseries.parquet") + ts + +Now, let's inspect the data types and memory usage to see where we should focus our +attention. + +.. ipython:: python + + ts.dtypes + +.. ipython:: python + + ts.memory_usage(deep=True) # memory usage in bytes + + +The ``name`` column is taking up much more memory than any other. It has just a +few unique values, so it's a good candidate for converting to a +:class:`Categorical`. With a Categorical, we store each unique name once and use +space-efficient integers to know which specific name is used in each row. + + +.. ipython:: python + + ts2 = ts.copy() + ts2['name'] = ts2['name'].astype('category') + ts2.memory_usage(deep=True) + +We can go a bit further and downcast the numeric columns to their smallest types +using :func:`pandas.to_numeric`. + +.. ipython:: python + + ts2['id'] = pd.to_numeric(ts2['id'], downcast='unsigned') + ts2[['x', 'y']] = ts2[['x', 'y']].apply(pd.to_numeric, downcast='float') + ts2.dtypes + +.. ipython:: python + + ts2.memory_usage(deep=True) + +.. ipython:: python + + reduction = (ts2.memory_usage(deep=True).sum() + / ts.memory_usage(deep=True).sum()) + print(f"{reduction:0.2f}") + +In all, we've reduced the in-memory footprint of this dataset to 1/5 of its +original size. + +See :ref:`categorical` for more on ``Categorical`` and :ref:`basics.dtypes` +for an overview of all of pandas' dtypes. + +Use chunking +------------ + +Some workloads can be achieved with chunking: splitting a large problem like "convert this +directory of CSVs to parquet" into a bunch of small problems ("convert this individual CSV +file into a Parquet file. Now repeat that for each file in this directory."). As long as each chunk +fits in memory, you can work with datasets that are much larger than memory. + +.. note:: + + Chunking works well when the operation you're performing requires zero or minimal + coordination between chunks. For more complicated workflows, you're better off + :ref:`using another library `. + +Suppose we have an even larger "logical dataset" on disk that's a directory of parquet +files. Each file in the directory represents a different year of the entire dataset. + +.. ipython:: python + :suppress: + + import pathlib + + N = 12 + starts = [f'20{i:>02d}-01-01' for i in range(N)] + ends = [f'20{i:>02d}-12-13' for i in range(N)] + + pathlib.Path("data/timeseries").mkdir(exist_ok=True) + + for i, (start, end) in enumerate(zip(starts, ends)): + ts = _make_timeseries(start=start, end=end, freq='1T', seed=i) + ts.to_parquet(f"data/timeseries/ts-{i:0>2d}.parquet") + + +:: + + data + └── timeseries + ├── ts-00.parquet + ├── ts-01.parquet + ├── ts-02.parquet + ├── ts-03.parquet + ├── ts-04.parquet + ├── ts-05.parquet + ├── ts-06.parquet + ├── ts-07.parquet + ├── ts-08.parquet + ├── ts-09.parquet + ├── ts-10.parquet + └── ts-11.parquet + +Now we'll implement an out-of-core ``value_counts``. The peak memory usage of this +workflow is the single largest chunk, plus a small series storing the unique value +counts up to this point. As long as each individual file fits in memory, this will +work for arbitrary-sized datasets. + +.. ipython:: python + + %%time + files = pathlib.Path("data/timeseries/").glob("ts*.parquet") + counts = pd.Series(dtype=int) + for path in files: + # Only one dataframe is in memory at a time... + df = pd.read_parquet(path) + # ... plus a small Series `counts`, which is updated. + counts = counts.add(df['name'].value_counts(), fill_value=0) + counts.astype(int) + +Some readers, like :meth:`pandas.read_csv`, offer parameters to control the +``chunksize`` when reading a single file. + +Manually chunking is an OK option for workflows that don't +require too sophisticated of operations. Some operations, like ``groupby``, are +much harder to do chunkwise. In these cases, you may be better switching to a +different library that implements these out-of-core algorithms for you. + +.. _scale.other_libraries: + +Use other libraries +------------------- + +Pandas is just one library offering a DataFrame API. Because of its popularity, +pandas' API has become something of a standard that other libraries implement. +The pandas documentation maintains a list of libraries implementing a DataFrame API +in :ref:`our ecosystem page `. + +For example, `Dask`_, a parallel computing library, has `dask.dataframe`_, a +pandas-like API for working with larger than memory datasets in parallel. Dask +can use multiple threads or processes on a single machine, or a cluster of +machines to process data in parallel. + + +We'll import ``dask.dataframe`` and notice that the API feels similar to pandas. +We can use Dask's ``read_parquet`` function, but provide a globstring of files to read in. + +.. ipython:: python + + import dask.dataframe as dd + + ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow") + ddf + +Inspecting the ``ddf`` object, we see a few things + +* There are familiar attributes like ``.columns`` and ``.dtypes`` +* There are familiar methods like ``.groupby``, ``.sum``, etc. +* There are new attributes like ``.npartitions`` and ``.divisions`` + +The partitions and divisions are how Dask parallizes computation. A **Dask** +DataFrame is made up of many **Pandas** DataFrames. A single method call on a +Dask DataFrame ends up making many pandas method calls, and Dask knows how to +coordinate everything to get the result. + +.. ipython:: python + + ddf.columns + ddf.dtypes + ddf.npartitions + +One major difference: the ``dask.dataframe`` API is *lazy*. If you look at the +repr above, you'll notice that the values aren't actually printed out; just the +column names and dtypes. That's because Dask hasn't actually read the data yet. +Rather than executing immediately, doing operations build up a **task graph**. + +.. ipython:: python + + ddf + ddf['name'] + ddf['name'].value_counts() + +Each of these calls is instant because the result isn't being computed yet. +We're just building up a list of computation to do when someone needs the +result. Dask knows that the return type of a ``pandas.Series.value_counts`` +is a pandas Series with a certain dtype and a certain name. So the Dask version +returns a Dask Series with the same dtype and the same name. + +To get the actual result you can call ``.compute()``. + +.. ipython:: python + + %time ddf['name'].value_counts().compute() + +At that point, you get back the same thing you'd get with pandas, in this case +a concrete pandas Series with the count of each ``name``. + +Calling ``.compute`` causes the full task graph to be executed. This includes +reading the data, selecting the columns, and doing the ``value_counts``. The +execution is done *in parallel* where possible, and Dask tries to keep the +overall memory footprint small. You can work with datasets that are much larger +than memory, as long as each partition (a regular pandas DataFrame) fits in memory. + +By default, ``dask.dataframe`` operations use a threadpool to do operations in +parallel. We can also connect to a cluster to distribute the work on many +machines. In this case we'll connect to a local "cluster" made up of several +processes on this single machine. + +.. code-block:: python + + >>> from dask.distributed import Client, LocalCluster + + >>> cluster = LocalCluster() + >>> client = Client(cluster) + >>> client + + +Once this ``client`` is created, all of Dask's computation will take place on +the cluster (which is just processes in this case). + +Dask implements the most used parts of the pandas API. For example, we can do +a familiar groupby aggregation. + +.. ipython:: python + + %time ddf.groupby('name')[['x', 'y']].mean().compute().head() + +The grouping and aggregation is done out-of-core and in parallel. + +When Dask knows the ``divisions`` of a dataset, certain optimizations are +possible. When reading parquet datasets written by dask, the divisions will be +known automatically. In this case, since we created the parquet files manually, +we need to supply the divisions manually. + +.. ipython:: python + + N = 12 + starts = [f'20{i:>02d}-01-01' for i in range(N)] + ends = [f'20{i:>02d}-12-13' for i in range(N)] + + divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),) + ddf.divisions = divisions + ddf + +Now we can do things like fast random access with ``.loc``. + +.. ipython:: python + + ddf.loc['2002-01-01 12:01':'2002-01-01 12:05'].compute() + +Dask knows to just look in the 3rd partition for selecting values in `2002`. It +doesn't need to look at any other data. + +Many workflows involve a large amount of data and processing it in a way that +reduces the size to something that fits in memory. In this case, we'll resample +to daily frequency and take the mean. Once we've taken the mean, we know the +results will fit in memory, so we can safely call ``compute`` without running +out of memory. At that point it's just a regular pandas object. + +.. ipython:: python + + @savefig dask_resample.png + ddf[['x', 'y']].resample("1D").mean().cumsum().compute().plot() + +These Dask examples have all be done using multiple processes on a single +machine. Dask can be `deployed on a cluster +`_ to scale up to even larger +datasets. + +You see more dask examples at https://examples.dask.org. + +.. _Dask: https://dask.org +.. _dask.dataframe: https://docs.dask.org/en/latest/dataframe.html diff --git a/doc/source/whatsnew/v1.0.0.rst b/doc/source/whatsnew/v1.0.0.rst index a78bc07ac2715..a6abe39f24ac3 100644 --- a/doc/source/whatsnew/v1.0.0.rst +++ b/doc/source/whatsnew/v1.0.0.rst @@ -111,6 +111,13 @@ Other API changes - :meth:`MultiIndex.from_arrays` will no longer infer names from arrays if ``names=None`` is explicitly provided (:issue:`27292`) - +.. _whatsnew_1000.api.documentation: + +Documentation Improvements +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Added new section on :ref:`scale` (:issue:`28315`). + .. _whatsnew_1000.deprecations: Deprecations diff --git a/environment.yml b/environment.yml index 7629fa52e7829..7c3ec9064cba3 100644 --- a/environment.yml +++ b/environment.yml @@ -35,6 +35,12 @@ dependencies: - nbconvert>=5.4.1 - nbsphinx - pandoc + # Dask and its dependencies + - dask-core + - toolz>=0.7.3 + - fsspec>=0.5.1 + - partd>=0.3.10 + - cloudpickle>=0.2.1 # web (jinja2 is also needed, but it's also an optional pandas dependency) - markdown @@ -76,7 +82,7 @@ dependencies: - html5lib # pandas.read_html - lxml # pandas.read_html - openpyxl # pandas.read_excel, DataFrame.to_excel, pandas.ExcelWriter, pandas.ExcelFile - - pyarrow>=0.9.0 # pandas.read_paquet, DataFrame.to_parquet, pandas.read_feather, DataFrame.to_feather + - pyarrow>=0.13.1 # pandas.read_paquet, DataFrame.to_parquet, pandas.read_feather, DataFrame.to_feather - pyqt>=5.9.2 # pandas.read_clipboard - pytables>=3.4.2 # pandas.read_hdf, DataFrame.to_hdf - python-snappy # required by pyarrow diff --git a/pandas/util/testing.py b/pandas/util/testing.py index aee58f808d9e6..1c0a8dbc19ccd 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -1651,6 +1651,87 @@ def makeMultiIndex(k=10, names=None, **kwargs): return MultiIndex.from_product((("foo", "bar"), (1, 2)), names=names, **kwargs) +_names = [ + "Alice", + "Bob", + "Charlie", + "Dan", + "Edith", + "Frank", + "George", + "Hannah", + "Ingrid", + "Jerry", + "Kevin", + "Laura", + "Michael", + "Norbert", + "Oliver", + "Patricia", + "Quinn", + "Ray", + "Sarah", + "Tim", + "Ursula", + "Victor", + "Wendy", + "Xavier", + "Yvonne", + "Zelda", +] + + +def _make_timeseries(start="2000-01-01", end="2000-12-31", freq="1D", seed=None): + """ + Make a DataFrame with a DatetimeIndex + + Parameters + ---------- + start : str or Timestamp, default "2000-01-01" + The start of the index. Passed to date_range with `freq`. + end : str or Timestamp, default "2000-12-31" + The end of the index. Passed to date_range with `freq`. + freq : str or Freq + The frequency to use for the DatetimeIndex + seed : int, optional + The random state seed. + + * name : object dtype with string names + * id : int dtype with + * x, y : float dtype + + Examples + -------- + >>> _make_timeseries() + id name x y + timestamp + 2000-01-01 982 Frank 0.031261 0.986727 + 2000-01-02 1025 Edith -0.086358 -0.032920 + 2000-01-03 982 Edith 0.473177 0.298654 + 2000-01-04 1009 Sarah 0.534344 -0.750377 + 2000-01-05 963 Zelda -0.271573 0.054424 + ... ... ... ... ... + 2000-12-27 980 Ingrid -0.132333 -0.422195 + 2000-12-28 972 Frank -0.376007 -0.298687 + 2000-12-29 1009 Ursula -0.865047 -0.503133 + 2000-12-30 1000 Hannah -0.063757 -0.507336 + 2000-12-31 972 Tim -0.869120 0.531685 + """ + index = pd.date_range(start=start, end=end, freq=freq, name="timestamp") + n = len(index) + state = np.random.RandomState(seed) + columns = { + "name": state.choice(_names, size=n), + "id": state.poisson(1000, size=n), + "x": state.rand(n) * 2 - 1, + "y": state.rand(n) * 2 - 1, + } + df = pd.DataFrame(columns, index=index, columns=sorted(columns)) + if df.index[-1] == end: + df = df.iloc[:-1] + return df + + def all_index_generator(k=10): """Generator which can be iterated over to get instances of all the various index classes. diff --git a/requirements-dev.txt b/requirements-dev.txt index fd8e6378240b4..698e4f3aea094 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -17,6 +17,11 @@ numpydoc>=0.9.0 nbconvert>=5.4.1 nbsphinx pandoc +dask-core +toolz>=0.7.3 +fsspec>=0.5.1 +partd>=0.3.10 +cloudpickle>=0.2.1 markdown feedparser pyyaml @@ -48,7 +53,7 @@ fastparquet>=0.2.1 html5lib lxml openpyxl -pyarrow>=0.9.0 +pyarrow>=0.13.1 pyqt5>=5.9.2 tables>=3.4.2 python-snappy diff --git a/scripts/generate_pip_deps_from_conda.py b/scripts/generate_pip_deps_from_conda.py index 29fe8bf84c12b..44fe50b99560a 100755 --- a/scripts/generate_pip_deps_from_conda.py +++ b/scripts/generate_pip_deps_from_conda.py @@ -20,7 +20,7 @@ import yaml EXCLUDE = {"python=3"} -RENAME = {"pytables": "tables", "pyqt": "pyqt5"} +RENAME = {"pytables": "tables", "pyqt": "pyqt5", "dask-core": "dask"} def conda_package_to_pip(package):