@@ -156,7 +156,7 @@ fits in memory, you can work with datasets that are much larger than memory.
156
156
157
157
Chunking works well when the operation you're performing requires zero or minimal
158
158
coordination between chunks. For more complicated workflows, you're better off
159
- :ref: `using another library <scale.other_libraries >`.
159
+ :ref: `using other libraries <scale.other_libraries >`.
160
160
161
161
Suppose we have an even larger "logical dataset" on disk that's a directory of parquet
162
162
files. Each file in the directory represents a different year of the entire dataset.
@@ -219,188 +219,10 @@ different library that implements these out-of-core algorithms for you.
219
219
220
220
.. _scale.other_libraries :
221
221
222
- Use Dask
223
- --------
222
+ Use Other Libraries
223
+ -------------------
224
224
225
- pandas is just one library offering a DataFrame API. Because of its popularity,
226
- pandas' API has become something of a standard that other libraries implement.
227
- The pandas documentation maintains a list of libraries implementing a DataFrame API
228
- in `the ecosystem page <https://pandas.pydata.org/community/ecosystem.html >`_.
229
-
230
- For example, `Dask `_, a parallel computing library, has `dask.dataframe `_, a
231
- pandas-like API for working with larger than memory datasets in parallel. Dask
232
- can use multiple threads or processes on a single machine, or a cluster of
233
- machines to process data in parallel.
234
-
235
-
236
- We'll import ``dask.dataframe `` and notice that the API feels similar to pandas.
237
- We can use Dask's ``read_parquet `` function, but provide a globstring of files to read in.
238
-
239
- .. ipython :: python
240
- :okwarning:
241
-
242
- import dask.dataframe as dd
243
-
244
- ddf = dd.read_parquet(" data/timeseries/ts*.parquet" , engine = " pyarrow" )
245
- ddf
246
-
247
- Inspecting the ``ddf `` object, we see a few things
248
-
249
- * There are familiar attributes like ``.columns `` and ``.dtypes ``
250
- * There are familiar methods like ``.groupby ``, ``.sum ``, etc.
251
- * There are new attributes like ``.npartitions `` and ``.divisions ``
252
-
253
- The partitions and divisions are how Dask parallelizes computation. A **Dask **
254
- DataFrame is made up of many pandas :class: `pandas.DataFrame `. A single method call on a
255
- Dask DataFrame ends up making many pandas method calls, and Dask knows how to
256
- coordinate everything to get the result.
257
-
258
- .. ipython :: python
259
-
260
- ddf.columns
261
- ddf.dtypes
262
- ddf.npartitions
263
-
264
- One major difference: the ``dask.dataframe `` API is *lazy *. If you look at the
265
- repr above, you'll notice that the values aren't actually printed out; just the
266
- column names and dtypes. That's because Dask hasn't actually read the data yet.
267
- Rather than executing immediately, doing operations build up a **task graph **.
268
-
269
- .. ipython :: python
270
- :okwarning:
271
-
272
- ddf
273
- ddf[" name" ]
274
- ddf[" name" ].value_counts()
275
-
276
- Each of these calls is instant because the result isn't being computed yet.
277
- We're just building up a list of computation to do when someone needs the
278
- result. Dask knows that the return type of a :class: `pandas.Series.value_counts `
279
- is a pandas :class: `pandas.Series ` with a certain dtype and a certain name. So the Dask version
280
- returns a Dask Series with the same dtype and the same name.
281
-
282
- To get the actual result you can call ``.compute() ``.
283
-
284
- .. ipython :: python
285
- :okwarning:
286
-
287
- % time ddf[" name" ].value_counts().compute()
288
-
289
- At that point, you get back the same thing you'd get with pandas, in this case
290
- a concrete pandas :class: `pandas.Series ` with the count of each ``name ``.
291
-
292
- Calling ``.compute `` causes the full task graph to be executed. This includes
293
- reading the data, selecting the columns, and doing the ``value_counts ``. The
294
- execution is done *in parallel * where possible, and Dask tries to keep the
295
- overall memory footprint small. You can work with datasets that are much larger
296
- than memory, as long as each partition (a regular pandas :class: `pandas.DataFrame `) fits in memory.
297
-
298
- By default, ``dask.dataframe `` operations use a threadpool to do operations in
299
- parallel. We can also connect to a cluster to distribute the work on many
300
- machines. In this case we'll connect to a local "cluster" made up of several
301
- processes on this single machine.
302
-
303
- .. code-block :: python
304
-
305
- >> > from dask.distributed import Client, LocalCluster
306
-
307
- >> > cluster = LocalCluster()
308
- >> > client = Client(cluster)
309
- >> > client
310
- < Client: ' tcp://127.0.0.1:53349' processes= 4 threads= 8 , memory= 17.18 GB >
311
-
312
- Once this ``client `` is created, all of Dask's computation will take place on
313
- the cluster (which is just processes in this case).
314
-
315
- Dask implements the most used parts of the pandas API. For example, we can do
316
- a familiar groupby aggregation.
317
-
318
- .. ipython :: python
319
- :okwarning:
320
-
321
- % time ddf.groupby(" name" )[[" x" , " y" ]].mean().compute().head()
322
-
323
- The grouping and aggregation is done out-of-core and in parallel.
324
-
325
- When Dask knows the ``divisions `` of a dataset, certain optimizations are
326
- possible. When reading parquet datasets written by dask, the divisions will be
327
- known automatically. In this case, since we created the parquet files manually,
328
- we need to supply the divisions manually.
329
-
330
- .. ipython :: python
331
- :okwarning:
332
-
333
- N = 12
334
- starts = [f " 20 { i:>02d } -01-01 " for i in range (N)]
335
- ends = [f " 20 { i:>02d } -12-13 " for i in range (N)]
336
-
337
- divisions = tuple (pd.to_datetime(starts)) + (pd.Timestamp(ends[- 1 ]),)
338
- ddf.divisions = divisions
339
- ddf
340
-
341
- Now we can do things like fast random access with ``.loc ``.
342
-
343
- .. ipython :: python
344
- :okwarning:
345
-
346
- ddf.loc[" 2002-01-01 12:01" :" 2002-01-01 12:05" ].compute()
347
-
348
- Dask knows to just look in the 3rd partition for selecting values in 2002. It
349
- doesn't need to look at any other data.
350
-
351
- Many workflows involve a large amount of data and processing it in a way that
352
- reduces the size to something that fits in memory. In this case, we'll resample
353
- to daily frequency and take the mean. Once we've taken the mean, we know the
354
- results will fit in memory, so we can safely call ``compute `` without running
355
- out of memory. At that point it's just a regular pandas object.
356
-
357
- .. ipython :: python
358
- :okwarning:
359
-
360
- @savefig dask_resample.png
361
- ddf[[" x" , " y" ]].resample(" 1D" ).mean().cumsum().compute().plot()
362
-
363
- .. ipython :: python
364
- :suppress:
365
-
366
- import shutil
367
-
368
- shutil.rmtree(" data/timeseries" )
369
-
370
- These Dask examples have all be done using multiple processes on a single
371
- machine. Dask can be `deployed on a cluster
372
- <https://docs.dask.org/en/latest/setup.html> `_ to scale up to even larger
373
- datasets.
374
-
375
- You see more dask examples at https://examples.dask.org.
376
-
377
- Use Modin
378
- ---------
379
-
380
- Modin _ is a scalable dataframe library, which aims to be a drop-in replacement API for pandas and
381
- provides the ability to scale pandas workflows across nodes and CPUs available. It is also able
382
- to work with larger than memory datasets. To start working with Modin you just need
383
- to replace a single line of code, namely, the import statement.
384
-
385
- .. code-block :: ipython
386
-
387
- # import pandas as pd
388
- import modin.pandas as pd
389
-
390
- After you have changed the import statement, you can proceed using the well-known pandas API
391
- to scale computation. Modin distributes computation across nodes and CPUs available utilizing
392
- an execution engine it runs on. At the time of Modin 0.27.0 the following execution engines are supported
393
- in Modin: Ray _, Dask _, `MPI through unidist `_, HDK _. The partitioning schema of a Modin DataFrame partitions it
394
- along both columns and rows because it gives Modin flexibility and scalability in both the number of columns and
395
- the number of rows.
396
-
397
- For more information refer to `Modin's documentation `_ or the `Modin's tutorials `_.
398
-
399
- .. _Modin : https://github.com/modin-project/modin
400
- .. _`Modin's documentation` : https://modin.readthedocs.io/en/latest
401
- .. _`Modin's tutorials` : https://github.com/modin-project/modin/tree/master/examples/tutorial/jupyter/execution
402
- .. _Ray : https://github.com/ray-project/ray
403
- .. _Dask : https://dask.org
404
- .. _`MPI through unidist` : https://github.com/modin-project/unidist
405
- .. _HDK : https://github.com/intel-ai/hdk
406
- .. _dask.dataframe : https://docs.dask.org/en/latest/dataframe.html
225
+ There are other libraries which provide similar APIs to pandas and work nicely with pandas DataFrame,
226
+ and can give you the ability to scale your large dataset processing and analytics
227
+ by parallel runtime, distributed memory, clustering, etc. You can find more information
228
+ in `the ecosystem page <https://pandas.pydata.org/community/ecosystem.html#out-of-core >`_.
0 commit comments