KubeCluster([pod_template, name, namespace, …]) Launch a Dask cluster on Kubernetes
KubeCluster.adapt(**kwargs) Turn on adaptivity
KubeCluster.from_dict(pod_spec, **kwargs) Create cluster with worker pod spec defined by Python dictionary
KubeCluster.from_yaml(yaml_path, **kwargs) Create cluster with worker pod spec defined by a YAML file
KubeCluster.logs([pod]) Logs from a worker pod
KubeCluster.pods() A list of kubernetes pods corresponding to current workers
KubeCluster.scale(n) Scale cluster to n workers
class dask_kubernetes.KubeCluster(pod_template=None, name=None, namespace=None, n_workers=None, host=None, port=None, env=None, **kwargs)[source]

Launch a Dask cluster on Kubernetes

This starts a local Dask scheduler and then dynamically launches Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken to be either the current one on which this code is running, or as a fallback, the default one configured in a kubeconfig file.


Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See examples below for suggestions on how to manage and check for this.


Since the Dask scheduler is launched locally, for it to work, we need to be able to open network connections between this local node and all the workers nodes on the Kubernetes cluster. If the current process is not already on a Kubernetes node, some network configuration will likely be required to make this work.


Your Kubernetes resource limits and requests should match the --memory-limit and --nthreads parameters given to the dask-worker command.

pod_template: kubernetes.client.V1PodSpec

A Kubernetes specification for a Pod for a dask worker.

name: str (optional)

Name given to the pods. Defaults to dask-$USER-random

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

n_workers: int

Number of workers on initial launch. Use scale_up to increase this number in the future

env: Dict[str, str]

Dictionary of environment variables to pass to worker pod

host: str

Listen address for local scheduler. Defaults to

port: int

Port of local scheduler

**kwargs: dict

Additional keyword arguments to pass to LocalCluster

See also

KubeCluster.from_yaml, KubeCluster.from_dict, KubeCluster.adapt


>>> from dask_kubernetes import KubeCluster, make_pod_spec
>>> pod_spec = make_pod_spec(image='daskdev/dask:latest',
...                          memory_limit='4G', memory_request='4G',
...                          cpu_limit=1, cpu_request=1,
...                          env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})
>>> cluster = KubeCluster(pod_spec)
>>> cluster.scale_up(10)

You can also create clusters with worker pod specifications as dictionaries or stored in YAML files

>>> cluster = KubeCluster.from_yaml('worker-template.yml')
>>> cluster = KubeCluster.from_dict({...})

Rather than explicitly setting a number of workers you can also ask the cluster to allocate workers dynamically based on current workload

>>> cluster.adapt()

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can verify that your local environment matches your worker environments by calling client.get_versions(check=True). This will raise an informative error if versions do not match.

>>> client.get_versions(check=True)

The daskdev/dask docker images support EXTRA_PIP_PACKAGES, EXTRA_APT_PACKAGES and EXTRA_CONDA_PACKAGES environment variables to help with small adjustments to the worker environments. We recommend the use of pip over conda in this case due to a much shorter startup time. These environment variables can be modified directly from the KubeCluster constructor methods using the env= keyword. You may list as many packages as you like in a single string like the following:

>>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed'
>>> conda = '-c conda-forge scikit-learn'
>>> KubeCluster.from_yaml(..., env={'EXTRA_PIP_PACKAGES': pip,
...                                 'EXTRA_CONDA_PACKAGES': conda})

You can also start a KubeCluster with no arguments if the worker template is specified in the Dask config files, either as a full template in kubernetes.worker-template or a path to a YAML file in kubernetes.worker-template-path.

See https://docs.dask.org/en/latest/configuration.html for more information about setting configuration values.:

$ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml
>>> cluster = KubeCluster()  # automatically finds 'worker_template.yaml'


adapt(**kwargs) Turn on adaptivity
close() Close this cluster
from_dict(pod_spec, **kwargs) Create cluster with worker pod spec defined by Python dictionary
from_yaml(yaml_path, **kwargs) Create cluster with worker pod spec defined by a YAML file
logs([pod]) Logs from a worker pod
pods() A list of kubernetes pods corresponding to current workers
scale(n) Scale cluster to n workers
scale_down(workers[, pods]) Remove the pods for the requested list of workers
scale_up(n[, pods]) Make sure we have n dask-workers available for this cluster

Close this cluster

classmethod from_dict(pod_spec, **kwargs)[source]

Create cluster with worker pod spec defined by Python dictionary


>>> spec = {
...     'metadata': {},
...     'spec': {
...         'containers': [{
...             'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)',
...                      '--nthreads', '1',
...                      '--death-timeout', '60'],
...             'command': None,
...             'image': 'daskdev/dask:latest',
...             'name': 'dask-worker',
...         }],
...     'restartPolicy': 'Never',
...     }
... }
>>> cluster = KubeCluster.from_dict(spec, namespace='my-ns')  
classmethod from_yaml(yaml_path, **kwargs)[source]

Create cluster with worker pod spec defined by a YAML file

We can start a cluster with pods defined in an accompanying YAML file like the following:

kind: Pod
    foo: bar
    baz: quux
  - image: daskdev/dask:latest
    name: dask-worker
    args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB]
  restartPolicy: Never


>>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns')  

Logs from a worker pod

You can get this pod object from the pods method.

If no pod is specified all pod logs will be returned. On large clusters this could end up being rather large.

pod: kubernetes.client.V1Pod

The pod from which we want to collect logs.

See also

KubeCluster.pods, Client.get_worker_logs


A list of kubernetes pods corresponding to current workers

See also



Scale cluster to n workers

n: int

Target number of workers

scale_down(workers, pods=None)[source]

Remove the pods for the requested list of workers

When scale_down is called by the _adapt async loop, the workers are assumed to have been cleanly closed first and in-memory data has been migrated to the remaining workers.

Note that when the worker process exits, Kubernetes leaves the pods in a ‘Succeeded’ state that we collect here.

If some workers have not been closed, we just delete the pods with matching ip addresses.

workers: List[str] List of addresses of workers to close
scale_up(n, pods=None, **kwargs)[source]

Make sure we have n dask-workers available for this cluster


>>> cluster.scale_up(20)  # ask for twenty workers