Out of core computation with dask¶
xarray integrates with dask to support streaming computation on datasets that don’t fit into memory.
Currently, dask is an entirely optional feature for xarray. However, the benefits of using dask are sufficiently strong that dask may become a required dependency in a future version of xarray.
For a full example of how to use xarray’s dask integration, read the blog post introducing xarray and dask.
What is a dask array?¶
Dask divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory.
Unlike NumPy, which has eager evaluation, operations on dask arrays are lazy. Operations queue up a series of tasks mapped over blocks, and no computation is performed until you actually ask values to be computed (e.g., to print results to your screen or write to disk). At that point, data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.
The actual computation is controlled by a multi-processing or thread pool, which allows dask to take full advantage of multiple processers available on most modern computers.
For more details on dask, read its documentation.
Reading and writing data¶
The usual way to create a dataset filled with dask arrays is to load the
data from a netCDF file or files. You can do this by supplying a chunks
argument to open_dataset()
or using the
open_mfdataset()
function.
In [1]: ds = xr.open_dataset('example-data.nc', chunks={'time': 10})
In [2]: ds
Out[2]:
<xarray.Dataset>
Dimensions: (latitude: 180, longitude: 360, time: 365)
Coordinates:
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
temperature (time, latitude, longitude) float64 0.4691 -0.2829 -1.509 ...
In this example latitude
and longitude
do not appear in the
chunks
dict, so only one chunk will be used along those dimensions. It
is also entirely equivalent to open a dataset using open_dataset
and
then chunk the data use the chunk
method, e.g.,
xr.open_dataset('example-data.nc').chunk({'time': 10})
.
To open multiple files simultaneously, use open_mfdataset()
:
xr.open_mfdataset('my/files/*.nc')
This function will automatically concatenate and merge dataset into one in
the simple cases that it understands (see auto_combine()
for the full disclaimer). By default, open_mfdataset
will chunk each
netCDF file into a single dask array; again, supply the chunks
argument to
control the size of the resulting dask arrays. In more complex cases, you can
open each file individually using open_dataset
and merge the result, as
described in Combining data.
You’ll notice that printing a dataset still shows a preview of array values, even if they are actually dask arrays. We can do this quickly with dask because we only need to the compute the first few values (typically from the first block). To reveal the true nature of an array, print a DataArray:
In [3]: ds.temperature
Out[3]:
<xarray.DataArray 'temperature' (time: 365, latitude: 180, longitude: 360)>
dask.array<open_da..., shape=(365, 180, 360), dtype=float64, chunksize=(10, 180, 360)>
Coordinates:
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ...
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Once you’ve manipulated a dask array, you can still write a dataset too big to
fit into memory back to disk by using to_netcdf()
in the
usual way.
Using dask with xarray¶
Nearly all existing xarray methods (including those for indexing, computation, concatenating and grouped operations) have been extended to work automatically with dask arrays. When you load data as a dask array in an xarray data structure, almost all xarray operations will keep it as a dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory. Converting a dask array into memory generally requires an explicit conversion step. One noteable exception is indexing operations: to enable label based indexing, xarray will automatically load coordinate labels into memory.
The easiest way to convert an xarray data structure from lazy dask arrays into
eager, in-memory numpy arrays is to use the load()
method:
In [4]: ds.load()
Out[4]:
<xarray.Dataset>
Dimensions: (latitude: 180, longitude: 360, time: 365)
Coordinates:
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
temperature (time, latitude, longitude) float64 0.4691 -0.2829 -1.509 ...
You can also access values
, which will always be a
numpy array:
In [5]: ds.temperature.values
Out[5]:
array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...
# truncated for brevity
Explicit conversion by wrapping a DataArray with np.asarray
also works:
In [6]: np.asarray(ds.temperature)
Out[6]:
array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...
Alternatively you can load the data into memory but keep the arrays as dask arrays using the ~xarray.Dataset.persist method:
This is particularly useful when using a distributed cluster because the data will be loaded into distributed memory across your machines and be much faster to use than reading repeatedly from disk. Warning that on a single machine this operation will try to load all of your data into memory. You should make sure that your dataset is not larger than available memory.
For performance you may wish to consider chunk sizes. The correct choice of
chunk size depends both on your data and on the operations you want to perform.
With xarray, both converting data to a dask arrays and converting the chunk
sizes of dask arrays is done with the chunk()
method:
In [7]: rechunked = ds.chunk({'latitude': 100, 'longitude': 100})
You can view the size of existing chunks on an array by viewing the
chunks
attribute:
In [8]: rechunked.chunks
Out[8]: Frozen(SortedKeysDict({'longitude': (100, 100, 100, 60), 'latitude': (100, 80), 'time': (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 5)}))
If there are not consistent chunksizes between all the arrays in a dataset
along a particular dimension, an exception is raised when you try to access
.chunks
.
Note
In the future, we would like to enable automatic alignment of dask chunksizes (but not the other way around). We might also require that all arrays in a dataset share the same chunking alignment. Neither of these are currently done.
NumPy ufuncs like np.sin
currently only work on eagerly evaluated arrays
(this will change with the next major NumPy release). We have provided
replacements that also work on all xarray objects, including those that store
lazy dask arrays, in the xarray.ufuncs module:
In [9]: import xarray.ufuncs as xu
In [10]: xu.sin(rechunked)
Out[10]:
<xarray.Dataset>
Dimensions: (latitude: 180, longitude: 360, time: 365)
Coordinates:
* longitude (longitude) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ...
* latitude (latitude) float64 89.5 88.5 87.5 86.5 85.5 84.5 83.5 82.5 ...
* time (time) datetime64[ns] 2015-01-01 2015-01-02 2015-01-03 ...
Data variables:
temperature (time, latitude, longitude) float64 0.4521 -0.2791 -0.9981 ...
To access dask arrays directly, use the new
DataArray.data
attribute. This attribute exposes
array data either as a dask array or as a numpy array, depending on whether it has been
loaded into dask or not:
In [11]: ds.temperature.data
Out[11]: dask.array<xarray-..., shape=(365, 180, 360), dtype=float64, chunksize=(10, 180, 360)>
Note
In the future, we may extend .data
to support other “computable” array
backends beyond dask and numpy (e.g., to support sparse arrays).
Chunking and performance¶
The chunks
parameter has critical performance implications when using dask
arrays. If your chunks are too small, queueing up operations will be extremely
slow, because dask will translates each operation into a huge number of
operations mapped across chunks. Computation on dask arrays with small chunks
can also be slow, because each operation on a chunk has some fixed overhead
from the Python interpreter and the dask task executor.
Conversely, if your chunks are too big, some of your computation may be wasted, because dask only computes results one chunk at a time.
A good rule of thumb to create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the cost of queueing up dask operations can be noticeable, and you may need even larger chunksizes.
Optimization Tips¶
With analysis pipelines involving both spatial subsetting and temporal resampling, dask performance can become very slow in certain cases. Here are some optimization tips we have found through experience:
- Do your spatial and temporal indexing (e.g.
.sel()
or.isel()
) early in the pipeline, especially before callingresample()
orgroupby()
. Grouping and rasampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn’t been implemented in dask yet. (See dask issue #746). - Save intermediate results to disk as a netCDF files (using
to_netcdf()
) and then load them again withopen_dataset()
for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See dask issue #874) - Specify smaller chunks across space when using
open_mfdataset()
(e.g.,chunks={'latitude': 10, 'longitude': 10}
). This makes spatial subsetting easier, because there’s no risk you will load chunks of data referring to different chunks (probably not necessary if you follow suggestion 1).