Parallel computing with dask¶
xarray integrates with dask to support parallel computations and streaming computation on datasets that don’t fit into memory.
Currently, dask is an entirely optional feature for xarray. However, the benefits of using dask are sufficiently strong that dask may become a required dependency in a future version of xarray.
For a full example of how to use xarray’s dask integration, read the blog post introducing xarray and dask.
What is a dask array?¶
Dask divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory.
Unlike NumPy, which has eager evaluation, operations on dask arrays are lazy. Operations queue up a series of tasks mapped over blocks, and no computation is performed until you actually ask values to be computed (e.g., to print results to your screen or write to disk). At that point, data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.
The actual computation is controlled by a multi-processing or thread pool, which allows dask to take full advantage of multiple processors available on most modern computers.
For more details on dask, read its documentation.
Reading and writing data¶
The usual way to create a dataset filled with dask arrays is to load the
data from a netCDF file or files. You can do this by supplying a chunks
argument to open_dataset()
or using the
open_mfdataset()
function.
In [1]: ds = xr.open_dataset('example-data.nc', chunks={'time': 10})
In [2]: ds
Out[2]:
<xarray.Dataset>
Dimensions: (latitude: 180, longitude: 360, time: 365)
Coordinates:
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
Data variables:
temperature (time, latitude, longitude) float64 dask.array<shape=(365, 180, 360), chunksize=(10, 180, 360)>
In this example latitude
and longitude
do not appear in the
chunks
dict, so only one chunk will be used along those dimensions. It
is also entirely equivalent to open a dataset using open_dataset
and
then chunk the data use the chunk
method, e.g.,
xr.open_dataset('example-data.nc').chunk({'time': 10})
.
To open multiple files simultaneously, use open_mfdataset()
:
xr.open_mfdataset('my/files/*.nc')
This function will automatically concatenate and merge dataset into one in
the simple cases that it understands (see auto_combine()
for the full disclaimer). By default, open_mfdataset
will chunk each
netCDF file into a single dask array; again, supply the chunks
argument to
control the size of the resulting dask arrays. In more complex cases, you can
open each file individually using open_dataset
and merge the result, as
described in Combining data.
You’ll notice that printing a dataset still shows a preview of array values, even if they are actually dask arrays. We can do this quickly with dask because we only need to the compute the first few values (typically from the first block). To reveal the true nature of an array, print a DataArray:
In [3]: ds.temperature
Out[3]:
<xarray.DataArray 'temperature' (time: 365, latitude: 180, longitude: 360)>
dask.array<shape=(365, 180, 360), dtype=float64, chunksize=(10, 180, 360)>
Coordinates:
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
Once you’ve manipulated a dask array, you can still write a dataset too big to
fit into memory back to disk by using to_netcdf()
in the
usual way.
A dataset can also be converted to a dask DataFrame using to_dask_dataframe()
.
In [4]: df = ds.to_dask_dataframe()
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-4-c60bcdd06b80> in <module>()
----> 1 df = ds.to_dask_dataframe()
/home/docs/checkouts/readthedocs.org/user_builds/xray/conda/v0.10.0/lib/python3.5/site-packages/xarray-0.10.0-py3.5.egg/xarray/core/dataset.py in to_dask_dataframe(self, dim_order, set_index)
2693
2694 import dask.array as da
-> 2695 import dask.dataframe as dd
2696
2697 if dim_order is None:
/home/docs/checkouts/readthedocs.org/user_builds/xray/conda/v0.10.0/lib/python3.5/site-packages/dask/dataframe/__init__.py in <module>()
1 from __future__ import print_function, division, absolute_import
2
----> 3 from .core import (DataFrame, Series, Index, _Frame, map_partitions,
4 repartition)
5 from .io import (from_array, from_pandas, from_bcolz,
/home/docs/checkouts/readthedocs.org/user_builds/xray/conda/v0.10.0/lib/python3.5/site-packages/dask/dataframe/core.py in <module>()
36 no_default = '__no_default__'
37
---> 38 pd.computation.expressions.set_use_numexpr(False)
39
40
AttributeError: module 'pandas' has no attribute 'computation'
In [5]: df