Parallel computing with Dask

xarray integrates with Dask to support parallel computations and streaming computation on datasets that don’t fit into memory. Currently, Dask is an entirely optional feature for xarray. However, the benefits of using Dask are sufficiently strong that Dask may become a required dependency in a future version of xarray.

For a full example of how to use xarray’s Dask integration, read the blog post introducing xarray and Dask. More up-to-date examples may be found at the Pangeo project’s use-cases and at the Dask examples website.

What is a Dask array?

A Dask array

Dask divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory.

Unlike NumPy, which has eager evaluation, operations on Dask arrays are lazy. Operations queue up a series of tasks mapped over blocks, and no computation is performed until you actually ask values to be computed (e.g., to print results to your screen or write to disk). At that point, data is loaded into memory and computation proceeds in a streaming fashion, block-by-block.

The actual computation is controlled by a multi-processing or thread pool, which allows Dask to take full advantage of multiple processors available on most modern computers.

For more details on Dask, read its documentation. Note that xarray only makes use of dask.array and dask.delayed.

Reading and writing data

The usual way to create a Dataset filled with Dask arrays is to load the data from a netCDF file or files. You can do this by supplying a chunks argument to open_dataset() or using the open_mfdataset() function.

In [1]: ds = xr.open_dataset("", chunks={"time": 10})

In [2]: ds
Dimensions:      (latitude: 180, longitude: 180, time: 30)
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude    (longitude) int64 0 1 2 3 4 5 6 ... 173 174 175 176 177 178 179
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5
Data variables:
    temperature  (time, latitude, longitude) float64 dask.array<chunksize=(10, 180, 180), meta=np.ndarray>

In this example latitude and longitude do not appear in the chunks dict, so only one chunk will be used along those dimensions. It is also entirely equivalent to opening a dataset using open_dataset() and then chunking the data using the chunk method, e.g., xr.open_dataset('').chunk({'time': 10}).

To open multiple files simultaneously in parallel using Dask delayed, use open_mfdataset():

xr.open_mfdataset('my/files/*.nc', parallel=True)

This function will automatically concatenate and merge datasets into one in the simple cases that it understands (see combine_by_coords() for the full disclaimer). By default, open_mfdataset() will chunk each netCDF file into a single Dask array; again, supply the chunks argument to control the size of the resulting Dask arrays. In more complex cases, you can open each file individually using open_dataset() and merge the result, as described in Combining data. Passing the keyword argument parallel=True to open_mfdataset() will speed up the reading of large multi-file datasets by executing those read tasks in parallel using dask.delayed.

You’ll notice that printing a dataset still shows a preview of array values, even if they are actually Dask arrays. We can do this quickly with Dask because we only need to compute the first few values (typically from the first block). To reveal the true nature of an array, print a DataArray:

In [3]: ds.temperature
<xarray.DataArray 'temperature' (time: 30, latitude: 180, longitude: 180)>
dask.array<open_dataset-27feea6bc4439e113156e9cc98abe51ftemperature, shape=(30, 180, 180), dtype=float64, chunksize=(10, 180, 180), chunktype=numpy.ndarray>
  * time       (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude  (longitude) int64 0 1 2 3 4 5 6 7 ... 173 174 175 176 177 178 179
  * latitude   (latitude) float64 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5

Once you’ve manipulated a Dask array, you can still write a dataset too big to fit into memory back to disk by using to_netcdf() in the usual way.

In [4]: ds.to_netcdf("")

By setting the compute argument to False, to_netcdf() will return a dask.delayed object that can be computed later.

In [5]: from dask.diagnostics import ProgressBar

# or distributed.progress when using the distributed scheduler
In [6]: delayed_obj = ds.to_netcdf("", compute=False)

In [7]: with ProgressBar():
   ...:     results = delayed_obj.compute()

[                                        ] | 0% Completed |  0.0s
[########################################] | 100% Completed |  0.1s


When using Dask’s distributed scheduler to write NETCDF4 files, it may be necessary to set the environment variable HDF5_USE_FILE_LOCKING=FALSE to avoid competing locks within the HDF5 SWMR file locking scheme. Note that writing netCDF files with Dask’s distributed scheduler is only supported for the netcdf4 backend.

A dataset can also be converted to a Dask DataFrame using to_dask_dataframe().

In [8]: df = ds.to_dask_dataframe()

In [9]: df
Dask DataFrame Structure:
              latitude longitude            time temperature
0              float64     int64  datetime64[ns]     float64
324000             ...       ...             ...         ...
648000             ...       ...             ...         ...
971999             ...       ...             ...         ...
Dask Name: concat-indexed, 115 tasks

Dask DataFrames do not support multi-indexes so the coordinate variables from the dataset are included as columns in the Dask DataFrame.

Using Dask with xarray

Nearly all existing xarray methods (including those for indexing, computation, concatenating and grouped operations) have been extended to work automatically with Dask arrays. When you load data as a Dask array in an xarray data structure, almost all xarray operations will keep it as a Dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory. Converting a Dask array into memory generally requires an explicit conversion step. One notable exception is indexing operations: to enable label based indexing, xarray will automatically load coordinate labels into memory.


By default, dask uses its multi-threaded scheduler, which distributes work across multiple cores and allows for processing some datasets that do not fit into memory. For running across a cluster, setup the distributed scheduler.

The easiest way to convert an xarray data structure from lazy Dask arrays into eager, in-memory NumPy arrays is to use the load() method:

In [10]: ds.load()
Dimensions:      (latitude: 180, longitude: 180, time: 30)
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude    (longitude) int64 0 1 2 3 4 5 6 ... 173 174 175 176 177 178 179
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5
Data variables:
    temperature  (time, latitude, longitude) float64 0.4691 -0.2829 ... -0.2467

You can also access values, which will always be a NumPy array:

In [11]: ds.temperature.values
array([[[  4.691e-01,  -2.829e-01, ...,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00, ...,   8.726e-01,  -1.538e+00],
# truncated for brevity

Explicit conversion by wrapping a DataArray with np.asarray also works:

In [12]: np.asarray(ds.temperature)
array([[[  4.691e-01,  -2.829e-01, ...,  -5.577e-01,   3.814e-01],
        [  1.337e+00,  -1.531e+00, ...,   8.726e-01,  -1.538e+00],

Alternatively you can load the data into memory but keep the arrays as Dask arrays using the persist() method:

In [13]: ds = ds.persist()

persist() is particularly useful when using a distributed cluster because the data will be loaded into distributed memory across your machines and be much faster to use than reading repeatedly from disk.


On a single machine persist() will try to load all of your data into memory. You should make sure that your dataset is not larger than available memory.


For more on the differences between persist() and compute() see this Stack Overflow answer and the Dask documentation.

For performance you may wish to consider chunk sizes. The correct choice of chunk size depends both on your data and on the operations you want to perform. With xarray, both converting data to a Dask arrays and converting the chunk sizes of Dask arrays is done with the chunk() method:

In [14]: rechunked = ds.chunk({"latitude": 100, "longitude": 100})

You can view the size of existing chunks on an array by viewing the chunks attribute:

In [15]: rechunked.chunks
Out[15]: Frozen(SortedKeysDict({'time': (10, 10, 10), 'latitude': (100, 80), 'longitude': (100, 80)}))

If there are not consistent chunksizes between all the arrays in a dataset along a particular dimension, an exception is raised when you try to access .chunks.


In the future, we would like to enable automatic alignment of Dask chunksizes (but not the other way around). We might also require that all arrays in a dataset share the same chunking alignment. Neither of these are currently done.

NumPy ufuncs like np.sin currently only work on eagerly evaluated arrays (this will change with the next major NumPy release). We have provided replacements that also work on all xarray objects, including those that store lazy Dask arrays, in the xarray.ufuncs module:

In [16]: import xarray.ufuncs as xu

In [17]: xu.sin(rechunked)
Dimensions:      (latitude: 180, longitude: 180, time: 30)
  * time         (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude    (longitude) int64 0 1 2 3 4 5 6 ... 173 174 175 176 177 178 179
  * latitude     (latitude) float64 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5
Data variables:
    temperature  (time, latitude, longitude) float64 dask.array<chunksize=(10, 100, 100), meta=np.ndarray>

To access Dask arrays directly, use the new attribute. This attribute exposes array data either as a Dask array or as a NumPy array, depending on whether it has been loaded into Dask or not:

In [18]:
Out[18]: dask.array<xarray-temperature, shape=(30, 180, 180), dtype=float64, chunksize=(10, 180, 180), chunktype=numpy.ndarray>


In the future, we may extend .data to support other “computable” array backends beyond Dask and NumPy (e.g., to support sparse arrays).

Automatic parallelization with apply_ufunc and map_blocks

Almost all of xarray’s built-in operations work on Dask arrays. If you want to use a function that isn’t wrapped by xarray, and have it applied in parallel on each block of your xarray object, you have three options:

  1. Extract Dask arrays from xarray objects (.data) and use Dask directly.

  2. Use apply_ufunc() to apply functions that consume and return NumPy arrays.

  3. Use map_blocks(), Dataset.map_blocks() or DataArray.map_blocks() to apply functions that consume and return xarray objects.


Another option is to use xarray’s apply_ufunc(), which can automate embarrassingly parallel “map” type operations where a function written for processing NumPy arrays should be repeatedly applied to xarray objects containing Dask arrays. It works similarly to dask.array.map_blocks() and dask.array.blockwise(), but without requiring an intermediate layer of abstraction.

For the best performance when using Dask’s multi-threaded scheduler, wrap a function that already releases the global interpreter lock, which fortunately already includes most NumPy and Scipy functions. Here we show an example using NumPy operations and a fast function from bottleneck, which we use to calculate Spearman’s rank-correlation coefficient:

import numpy as np
import xarray as xr
import bottleneck

def covariance_gufunc(x, y):
    return (
        (x - x.mean(axis=-1, keepdims=True)) * (y - y.mean(axis=-1, keepdims=True))

def pearson_correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))

def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return pearson_correlation_gufunc(x_ranks, y_ranks)

def spearman_correlation(x, y, dim):
    return xr.apply_ufunc(
        input_core_dims=[[dim], [dim]],

The only aspect of this example that is different from standard usage of apply_ufunc() is that we needed to supply the output_dtypes arguments. (Read up on Wrapping custom computation for an explanation of the “core dimensions” listed in input_core_dims.)

Our new spearman_correlation() function achieves near linear speedup when run on large arrays across the four cores on my laptop. It would also work as a streaming operation, when run on arrays loaded from disk:

In [19]: rs = np.random.RandomState(0)

In [20]: array1 = xr.DataArray(rs.randn(1000, 100000), dims=["place", "time"])  # 800MB

In [21]: array2 = array1 + 0.5 * rs.randn(1000, 100000)

# using one core, on NumPy arrays
In [22]: %time _ = spearman_correlation(array1, array2, 'time')
CPU times: user 21.6 s, sys: 2.84 s, total: 24.5 s
Wall time: 24.9 s

In [23]: chunked1 = array1.chunk({"place": 10})

In [24]: chunked2 = array2.chunk({"place": 10})

# using all my laptop's cores, with Dask
In [25]: r = spearman_correlation(chunked1, chunked2, "time").compute()

In [26]: %time _ = r.compute()
CPU times: user 30.9 s, sys: 1.74 s, total: 32.6 s
Wall time: 4.59 s

One limitation of apply_ufunc() is that it cannot be applied to arrays with multiple chunks along a core dimension:

In [27]: spearman_correlation(chunked1, chunked2, "place")
ValueError: dimension 'place' on 0th function argument to apply_ufunc with
dask='parallelized' consists of multiple chunks, but is also a core
dimension. To fix, rechunk into a single Dask array chunk along this
dimension, i.e., ``.rechunk({'place': -1})``, but beware that this may
significantly increase memory usage.

This reflects the nature of core dimensions, in contrast to broadcast (non-core) dimensions that allow operations to be split into arbitrary chunks for application.


For the majority of NumPy functions that are already wrapped by Dask, it’s usually a better idea to use the pre-existing dask.array function, by using either a pre-existing xarray methods or apply_ufunc() with dask='allowed'. Dask can often have a more efficient implementation that makes use of the specialized structure of a problem, unlike the generic speedups offered by dask='parallelized'.


Functions that consume and return xarray objects can be easily applied in parallel using map_blocks(). Your function will receive an xarray Dataset or DataArray subset to one chunk along each chunked dimension.

In [28]: ds.temperature
<xarray.DataArray 'temperature' (time: 30, latitude: 180, longitude: 180)>
dask.array<xarray-temperature, shape=(30, 180, 180), dtype=float64, chunksize=(10, 180, 180), chunktype=numpy.ndarray>
  * time       (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30
  * longitude  (longitude) int64 0 1 2 3 4 5 6 7 ... 173 174 175 176 177 178 179
  * latitude   (latitude) float64 89.5 88.5 87.5 86.5 ... -87.5 -88.5 -89.5

This DataArray has 3 chunks each with length 10 along the time dimension. At compute time, a function applied with map_blocks() will receive a DataArray corresponding to a single block of shape 10x180x180 (time x latitude x longitude) with values loaded. The following snippet illustrates how to check the shape of the object received by the applied function.

In [29]: def func(da):
   ....:     print(da.sizes)
   ....:     return da.time

In [30]: mapped = xr.map_blocks(func, ds.temperature)
Frozen({'time': 0, 'latitude': 0, 'longitude': 0})

In [31]: mapped
<xarray.DataArray 'time' (time: 30)>
dask.array<func-526c153bf7c5528ab18f3f3e50f64aed-<this, shape=(30,), dtype=datetime64[ns], chunksize=(10,), chunktype=numpy.ndarray>
  * time     (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30

Notice that the map_blocks() call printed Frozen({'time': 0, 'latitude': 0, 'longitude': 0}) to screen. func is received 0-sized blocks! map_blocks() needs to know what the final result looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped inputs (automated inference). This works in many cases, but not all. If automatic inference does not work for your function, provide the template kwarg (see below).

In this case, automatic inference has worked so let’s check that the result is as expected.

In [32]: mapped.load(scheduler="single-threaded")
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
<xarray.DataArray 'time' (time: 30)>
array(['2015-01-01T00:00:00.000000000', '2015-01-02T00:00:00.000000000',
       '2015-01-03T00:00:00.000000000', '2015-01-04T00:00:00.000000000',
       '2015-01-05T00:00:00.000000000', '2015-01-06T00:00:00.000000000',
       '2015-01-07T00:00:00.000000000', '2015-01-08T00:00:00.000000000',
       '2015-01-09T00:00:00.000000000', '2015-01-10T00:00:00.000000000',
       '2015-01-11T00:00:00.000000000', '2015-01-12T00:00:00.000000000',
       '2015-01-13T00:00:00.000000000', '2015-01-14T00:00:00.000000000',
       '2015-01-15T00:00:00.000000000', '2015-01-16T00:00:00.000000000',
       '2015-01-17T00:00:00.000000000', '2015-01-18T00:00:00.000000000',
       '2015-01-19T00:00:00.000000000', '2015-01-20T00:00:00.000000000',
       '2015-01-21T00:00:00.000000000', '2015-01-22T00:00:00.000000000',
       '2015-01-23T00:00:00.000000000', '2015-01-24T00:00:00.000000000',
       '2015-01-25T00:00:00.000000000', '2015-01-26T00:00:00.000000000',
       '2015-01-27T00:00:00.000000000', '2015-01-28T00:00:00.000000000',
       '2015-01-29T00:00:00.000000000', '2015-01-30T00:00:00.000000000'], dtype='datetime64[ns]')
  * time     (time) datetime64[ns] 2015-01-01 2015-01-02 ... 2015-01-30

In [33]: mapped.identical(ds.time)
Out[33]: True

Note that we use .load(scheduler="single-threaded") to execute the computation. This executes the Dask graph in serial using a for loop, but allows for printing to screen and other debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and the returned result is identical to ds.time as expected.

Here is a common example where automated inference will not work.

In [34]: def func(da):
   ....:     print(da.sizes)
   ....:     return da.isel(time=[1])

In [35]: mapped = xr.map_blocks(func, ds.temperature)
Frozen({'time': 0, 'latitude': 0, 'longitude': 0})
IndexError                                Traceback (most recent call last)
~/checkouts/ in infer_template(func, obj, *args, **kwargs)
    129     try:
--> 130         template = func(*meta_args, **kwargs)
    131     except Exception as e:

<ipython-input-34-06454b7a0fdb> in func(da)
      2     print(da.sizes)
----> 3     return da.isel(time=[1])

~/checkouts/ in isel(self, indexers, drop, missing_dims, **indexers_kwargs)
   1113             if coord_indexers:
-> 1114                 coord_value = coord_value.isel(coord_indexers)
   1115                 if drop and coord_value.ndim == 0:

~/checkouts/ in isel(self, indexers, missing_dims, **indexers_kwargs)
   1119         key = tuple(indexers.get(dim, slice(None)) for dim in self.dims)
-> 1120         return self[key]

~/checkouts/ in __getitem__(self, key)
    768         dims, indexer, new_order = self._broadcast_indexes(key)
--> 769         data = as_indexable(self._data)[indexer]
    770         if new_order:

~/checkouts/ in __getitem__(self, indexer)
-> 1442         result = self.array[key]

~/checkouts/ in __getitem__(self, key)
    214     def __getitem__(self, key):
--> 215         result = self._data[key]
    216         if isinstance(result, type(self._data)):

~/checkouts/ in __getitem__(self, key)
    558         freq = self._get_getitem_freq(key)
--> 559         result = self._data[key]
    560         if lib.is_scalar(result):

IndexError: index 1 is out of bounds for axis 0 with size 0

The above exception was the direct cause of the following exception:

Exception                                 Traceback (most recent call last)
<ipython-input-35-0c0dc80c25a9> in <module>
----> 1 mapped = xr.map_blocks(func, ds.temperature)

~/checkouts/ in map_blocks(func, obj, args, kwargs, template)
    368     if template is None:
    369         # infer template by providing zero-shaped arrays
--> 370         template = infer_template(func, aligned[0], *args, **kwargs)
    371         template_indexes = set(template.indexes)
    372         preserved_indexes = template_indexes & set(input_indexes)

~/checkouts/ in infer_template(func, obj, *args, **kwargs)
    130         template = func(*meta_args, **kwargs)
    131     except Exception as e:
--> 132         raise Exception(
    133             "Cannot infer object returned from running user provided function. "
    134             "Please supply the 'template' kwarg to map_blocks."

Exception: Cannot infer object returned from running user provided function. Please supply the 'template' kwarg to map_blocks.

func cannot be run on 0-shaped inputs because it is not possible to extract element 1 along a dimension of size 0. In this case we need to tell map_blocks() what the returned result looks like using the template kwarg. template must be an xarray Dataset or DataArray (depending on what the function returns) with dimensions, shapes, chunk sizes, attributes, coordinate variables and data variables that look exactly like the expected result. The variables should be dask-backed and hence not incur much memory cost.


Note that when template is provided, attrs from template are copied over to the result. Any attrs set in func will be ignored.

In [36]: template = ds.temperature.isel(time=[1, 11, 21])

In [37]: mapped = xr.map_blocks(func, ds.temperature, template=template)

Notice that the 0-shaped sizes were not printed to screen. Since template has been provided map_blocks() does not need to infer it by running func on 0-shaped inputs.

In [38]: mapped.identical(template)
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
Frozen({'time': 10, 'latitude': 180, 'longitude': 180})
Out[38]: True

map_blocks() also allows passing args and kwargs down to the user function func. func will be executed as func(block_xarray, *args, **kwargs) so args must be a list and kwargs must be a dictionary.

In [39]: def func(obj, a, b=0):
   ....:     return obj + a + b

In [40]: mapped = ds.map_blocks(func, args=[10], kwargs={"b": 10})

In [41]: expected = ds + 10 + 10

In [42]: mapped.identical(expected)
Out[42]: True

Chunking and performance

The chunks parameter has critical performance implications when using Dask arrays. If your chunks are too small, queueing up operations will be extremely slow, because Dask will translate each operation into a huge number of operations mapped across chunks. Computation on Dask arrays with small chunks can also be slow, because each operation on a chunk has some fixed overhead from the Python interpreter and the Dask task executor.

Conversely, if your chunks are too big, some of your computation may be wasted, because Dask only computes results one chunk at a time.

A good rule of thumb is to create arrays with a minimum chunksize of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the cost of queueing up Dask operations can be noticeable, and you may need even larger chunksizes.


Check out the dask documentation on chunks.

Optimization Tips

With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance can become very slow in certain cases. Here are some optimization tips we have found through experience:

  1. Do your spatial and temporal indexing (e.g. .sel() or .isel()) early in the pipeline, especially before calling resample() or groupby(). Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn’t been implemented in Dask yet. (See Dask issue #746).

  2. Save intermediate results to disk as a netCDF files (using to_netcdf()) and then load them again with open_dataset() for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See Dask issue #874)

  3. Specify smaller chunks across space when using open_mfdataset() (e.g., chunks={'latitude': 10, 'longitude': 10}). This makes spatial subsetting easier, because there’s no risk you will load chunks of data referring to different chunks (probably not necessary if you follow suggestion 1).

  4. Using the h5netcdf package by passing engine='h5netcdf' to open_mfdataset() can be quicker than the default engine='netcdf4' that uses the netCDF4 package.

  5. Some dask-specific tips may be found here.

  6. The dask diagnostics can be useful in identifying performance bottlenecks.