KubeCluster (classic)

Warning

This implementation of KubeCluster is being retired and we recommend migrating to the operator based implementation.

KubeCluster deploys Dask clusters on Kubernetes clusters using native Kubernetes APIs. It is designed to dynamically launch ad-hoc deployments.

Quickstart

To launch a Dask cluster on Kubernetes with KubeCluster you need to first configure your worker pod specification. Then create a cluster with that spec.

from dask_kubernetes.classic import KubeCluster, make_pod_spec

pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1)

cluster = KubeCluster(pod_spec)

cluster.scale(10)  # specify number of workers explicitly
cluster.adapt(minimum=1, maximum=100)  # or dynamically scale based on current workload

You can then connect a Dask dask.distributed.Client object to the cluster and perform your work.

# Example usage
from dask.distributed import Client
import dask.array as da

# Connect Dask to the cluster
client = Client(cluster)

# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0

You can alternatively define your worker specification via YAML by creating a pod manifest that will be used as a template.

# worker-spec.yml

kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: ghcr.io/dask/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
    name: dask-worker
    env:
      - name: EXTRA_PIP_PACKAGES
        value: git+https://github.com/dask/distributed
    resources:
      limits:
        cpu: "2"
        memory: 6G
      requests:
        cpu: "2"
        memory: 6G
from dask_kubernetes.classic import KubeCluster

cluster = KubeCluster('worker-spec.yml')
cluster.scale(10)

For more information see the KubeCluster API reference.

Best Practices

  1. Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See dask_kubernetes.classic.KubeCluster docstring for guidance on how to check and modify this.

  2. Your Kubernetes resource limits and requests should match the --memory-limit and --nthreads parameters given to the dask-worker command. Otherwise your workers may get killed by Kubernetes as they pack into the same node and overwhelm that nodes’ available memory, leading to KilledWorker errors.

  3. We recommend adding the --death-timeout, '60' arguments and the restartPolicy: Never attribute to your worker specification. This ensures that these pods will clean themselves up if your Python process disappears unexpectedly.

GPUs

Because dask-kubernetes uses standard kubernetes pod specifications, we can use kubernetes device plugins and add resource limits defining the number of GPUs per pod/worker. Additionally, we can also use tools like dask-cuda for optimized Dask/GPU interactions.

kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: nvcr.io/nvidia/rapidsai/rapidsai-core:23.04-cuda11.8-runtime-ubuntu22.04-py3.10
    imagePullPolicy: IfNotPresent
    args: [dask-cuda-worker, $(DASK_SCHEDULER_ADDRESS), --rmm-pool-size, 10GB]
    name: dask-cuda
    resources:
      limits:
        cpu: "2"
        memory: 6G
        nvidia.com/gpu: 1 # requesting 1 GPU
      requests:
        cpu: "2"
        memory: 6G
        nvidia.com/gpu: 1 # requesting 1 GPU

Configuration

You can use Dask’s configuration to control the behavior of Dask-kubernetes. You can see a full set of configuration options here. Some notable ones are described below:

  1. kubernetes.worker-template-path: a path to a YAML file that holds a Pod spec for the worker. If provided then this will be used when dask_kubernetes.classic.KubeCluster is called with no arguments:

    cluster = KubeCluster()  # reads provided yaml file
    
  2. distributed.dashboard.link: a Python pre-formatted string that shows the location of Dask’s dashboard. This string will receive values for host, port, and all environment variables.

    For example this is useful when using dask-kubernetes with JupyterHub and nbserverproxy to route the dashboard link to a proxied address as follows:

    "{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"
    
  3. kubernetes.worker-name: a Python pre-formatted string to use when naming dask worker pods. This string will receive values for user, uuid, and all environment variables. This is useful when you want to have control over the naming convention for your pods and use other tokens from the environment. For example when using zero-to-jupyterhub every user is called jovyan and so you may wish to use dask-{JUPYTERHUB_USER}-{uuid} instead of dask-{user}-{uuid}. Ensure you keep the ``uuid`` somewhere in the template.

Role-Based Access Control (RBAC)

In order to spawn a Dask cluster, the service account creating those pods will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following Role to that ServiceAccount via a RoleBinding:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: daskKubernetes
rules:
- apiGroups:
  - ""  # indicates the core API group
  resources:
  - "pods"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"
- apiGroups:
  - ""  # indicates the core API group
  resources:
  - "pods/log"
  verbs:
  - "get"
  - "list"
- apiGroups:
  - "" # indicates the core API group
  resources:
  - "services"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"
- apiGroups:
  - "policy"  # indicates the policy API group
  resources:
  - "poddisruptionbudgets"
  verbs:
  - "get"
  - "list"
  - "watch"
  - "create"
  - "delete"

Docker Images

Example Dask docker images ghcr.io/dask/dask and ghcr.io/dask/dask-notebook are available on https://github.com/orgs/dask/packages . More information about these images is available at the Dask documentation.

Note that these images can be further customized with extra packages using EXTRA_PIP_PACKAGES, EXTRA_APT_PACKAGES, and EXTRA_CONDA_PACKAGES as described in the Extensibility section.

Deployment Details

Scheduler

Before workers are created a scheduler will be deployed with the following resources:

  • A pod with a scheduler running

  • A service (svc) to expose scheduler and dashboard ports

  • A PodDisruptionBudget avoid voluntary disruptions of the scheduler pod

By default the Dask configuration option kubernetes.scheduler-service-type is set to ClusterIp. In order to connect to the scheduler the KubeCluster will first attempt to connect directly, but this will only be successful if dask-kubernetes is being run from within the Kubernetes cluster. If it is unsuccessful it will attempt to port forward the service locally using the kubectl utility.

If you update the service type to NodePort. The scheduler will be exposed on the same random high port on all nodes in the cluster. In this case KubeCluster will attempt to list nodes in order to get an IP to connect on and requires additional permissions to do so.

- apiGroups:
  - ""  # indicates the core API group
  resources:
  - "nodes"
  verbs:
  - "get"
  - "list"

If you set the service type to LoadBalancer then KubeCluster will connect to the external address of the assigned loadbalancer, but this does require that your Kubernetes cluster has the appropriate operator to assign loadbalancers.

Legacy mode

For backward compatibility with previous versions of dask-kubernetes it is also possible to run the scheduler locally. A local scheduler is created where the Dask client will be created.

from dask_kubernetes.classic import KubeCluster
from dask.distributed import Client

cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='local')
cluster.scale(10)
client = Client(cluster)

In this mode the Dask workers will attempt to connect to the machine where you are running dask-kubernetes. Generally this will need to be within the Kubernetes cluster in order for the workers to make a successful connection.

Workers

Workers are created directly as simple pods. These worker pods are configured to shutdown if they are unable to connect to the scheduler for 60 seconds. The pods are cleaned up when close() is called, or the scheduler process exits.

The pods are created with two default tolerations:

  • k8s.dask.org/dedicated=worker:NoSchedule

  • k8s.dask.org_dedicated=worker:NoSchedule

If you have nodes with the corresponding taints, then the worker pods will schedule to those nodes (and no other pods will be able to schedule to those nodes).

API

KubeCluster([pod_template, name, namespace, ...])

Launch a Dask cluster on Kubernetes

KubeCluster.adapt([Adaptive, minimum, ...])

Turn on adaptivity

KubeCluster.from_dict(pod_spec, **kwargs)

Create cluster with worker pod spec defined by Python dictionary

KubeCluster.from_yaml(yaml_path, **kwargs)

Create cluster with worker pod spec defined by a YAML file

KubeCluster.get_logs([cluster, scheduler, ...])

Return logs for the cluster, scheduler and workers

KubeCluster.scale(n)

Scale cluster to n workers

InCluster()

Configure the Kubernetes connection from a container's environment.

KubeConfig([config_file, context, ...])

Configure the Kubernetes connection from a kubeconfig file.

KubeAuth(host, **kwargs)

Configure the Kubernetes connection explicitly.

make_pod_spec(image[, labels, ...])

Create generic pod template from input parameters

class dask_kubernetes.KubeCluster(pod_template=None, name=None, namespace=None, n_workers=None, host=None, port=None, env=None, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], idle_timeout=None, deploy_mode=None, interface=None, protocol=None, dashboard_address=None, security=None, scheduler_service_wait_timeout=None, scheduler_service_name_resolution_retries=None, scheduler_pod_template=None, apply_default_affinity='preferred', **kwargs)[source]

Launch a Dask cluster on Kubernetes

This starts a local Dask scheduler and then dynamically launches Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken to be either the current one on which this code is running, or as a fallback, the default one configured in a kubeconfig file.

Environments

Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See examples below for suggestions on how to manage and check for this.

Network

Since the Dask scheduler is launched locally, for it to work, we need to be able to open network connections between this local node and all the workers nodes on the Kubernetes cluster. If the current process is not already on a Kubernetes node, some network configuration will likely be required to make this work.

Resources

Your Kubernetes resource limits and requests should match the --memory-limit and --nthreads parameters given to the dask-worker command.

Parameters
pod_template: (kubernetes.client.V1Pod, dict, str)

A Kubernetes specification for a Pod for a dask worker. Can be either a V1Pod, a dict representation of a pod, or a path to a yaml file containing a pod specification.

scheduler_pod_template: kubernetes.client.V1Pod (optional)

A Kubernetes specification for a Pod for a dask scheduler. Defaults to the pod_template.

name: str (optional)

Name given to the pods. Defaults to dask-$USER-random

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

n_workers: int

Number of workers on initial launch. Use scale to change this number in the future

env: Dict[str, str]

Dictionary of environment variables to pass to worker pod

host: str

Listen address for local scheduler. Defaults to 0.0.0.0

port: int

Port of local scheduler

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

idle_timeout: str (optional)

The scheduler task will exit after this amount of time if there are no requests from the client. Default is to never timeout.

scheduler_service_wait_timeout: int (optional)

Timeout, in seconds, to wait for the remote scheduler service to be ready. Defaults to 30 seconds. Set to 0 to disable the timeout (not recommended).

scheduler_service_name_resolution_retries: int (optional)

Number of retries to resolve scheduler service name when running from within the Kubernetes cluster. Defaults to 20. Must be set to 1 or greater.

deploy_mode: str (optional)

Run the scheduler as “local” or “remote”. Defaults to "remote".

apply_default_affinity: str (optional)

Apply a default affinity to pods: “required”, “preferred” or “none” Defaults to "preferred".

**kwargs: dict

Additional keyword arguments to pass to SpecCluster

See also

KubeCluster.adapt

Examples

>>> from dask_kubernetes.classic import KubeCluster, make_pod_spec
>>> pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
...                          memory_limit='4G', memory_request='4G',
...                          cpu_limit=1, cpu_request=1,
...                          env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})
>>> cluster = KubeCluster(pod_spec)
>>> cluster.scale(10)

You can also create clusters with worker pod specifications as dictionaries or stored in YAML files

>>> cluster = KubeCluster('worker-template.yml')
>>> cluster = KubeCluster({...})

Rather than explicitly setting a number of workers you can also ask the cluster to allocate workers dynamically based on current workload

>>> cluster.adapt()

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can verify that your local environment matches your worker environments by calling client.get_versions(check=True). This will raise an informative error if versions do not match.

>>> client.get_versions(check=True)

The ghcr.io/dask/dask docker images support EXTRA_PIP_PACKAGES, EXTRA_APT_PACKAGES and EXTRA_CONDA_PACKAGES environment variables to help with small adjustments to the worker environments. We recommend the use of pip over conda in this case due to a much shorter startup time. These environment variables can be modified directly from the KubeCluster constructor methods using the env= keyword. You may list as many packages as you like in a single string like the following:

>>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed'
>>> conda = '-c conda-forge scikit-learn'
>>> KubeCluster(..., env={'EXTRA_PIP_PACKAGES': pip,
...                                 'EXTRA_CONDA_PACKAGES': conda})

You can also start a KubeCluster with no arguments if the worker template is specified in the Dask config files, either as a full template in kubernetes.worker-template or a path to a YAML file in kubernetes.worker-template-path.

See https://docs.dask.org/en/latest/configuration.html for more information about setting configuration values.:

$ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml
>>> cluster = KubeCluster()  # automatically finds 'worker_template.yaml'
Attributes
asynchronous

Are we running in the event loop?

dashboard_link
loop
name
observed
plan
requested
scheduler_address

Methods

adapt([Adaptive, minimum, maximum, ...])

Turn on adaptivity

from_dict(pod_spec, **kwargs)

Create cluster with worker pod spec defined by Python dictionary

from_name(name)

Create an instance of this class to represent an existing cluster by name.

from_yaml(yaml_path, **kwargs)

Create cluster with worker pod spec defined by a YAML file

get_client()

Return client for the cluster

get_logs([cluster, scheduler, workers])

Return logs for the cluster, scheduler and workers

new_worker_spec()

Return name and spec for the next worker

scale(n)

Scale cluster to n workers

scale_up([n, memory, cores])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

wait_for_workers([n_workers, timeout])

Blocking call to wait for n workers before continuing

close

logs

scale_down

classmethod from_dict(pod_spec, **kwargs)[source]

Create cluster with worker pod spec defined by Python dictionary

Deprecated, please use the KubeCluster constructor directly.

Examples

>>> spec = {
...     'metadata': {},
...     'spec': {
...         'containers': [{
...             'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)',
...                      '--nthreads', '1',
...                      '--death-timeout', '60'],
...             'command': None,
...             'image': 'ghcr.io/dask/dask:latest',
...             'name': 'dask-worker',
...         }],
...     'restartPolicy': 'Never',
...     }
... }
>>> cluster = KubeCluster.from_dict(spec, namespace='my-ns')  
classmethod from_yaml(yaml_path, **kwargs)[source]

Create cluster with worker pod spec defined by a YAML file

Deprecated, please use the KubeCluster constructor directly.

We can start a cluster with pods defined in an accompanying YAML file like the following:

kind: Pod
metadata:
  labels:
    foo: bar
    baz: quux
spec:
  containers:
  - image: ghcr.io/dask/dask:latest
    name: dask-worker
    args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB]
  restartPolicy: Never

Examples

>>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns')  
scale(n)[source]

Scale cluster to n workers

Parameters
nint

Target number of workers

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
class dask_kubernetes.ClusterAuth[source]

An abstract base class for methods for configuring a connection to a Kubernetes API server.

Examples

>>> from dask_kubernetes import KubeConfig
>>> auth = KubeConfig(context='minikube')
>>> from dask_kubernetes import KubeAuth
>>> auth = KubeAuth(host='https://localhost', username='superuser', password='pass')

Methods

load()

Load Kubernetes configuration and set as default

load_first([auth])

Load the first valid configuration in the list auth.

async load()[source]

Load Kubernetes configuration and set as default

Raises
kubernetes.client.KubeConfigException
async static load_first(auth=None)[source]

Load the first valid configuration in the list auth. A single configuration method can be passed.

Parameters
auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

class dask_kubernetes.InCluster[source]

Configure the Kubernetes connection from a container’s environment.

This authentication method is intended for use when the client is running in a container started by Kubernetes with an authorized service account. This loads the mounted service account token and discovers the Kubernetes API via Kubernetes service discovery.

Methods

load()

Load Kubernetes configuration and set as default

load_first([auth])

Load the first valid configuration in the list auth.

class dask_kubernetes.KubeConfig(config_file=None, context=None, persist_config=True)[source]

Configure the Kubernetes connection from a kubeconfig file.

Parameters
config_file: str (optional)

The path of the kubeconfig file to load. Defaults to the value of the KUBECONFIG environment variable, or the string ~/.kube/config.

context: str (optional)

The kubeconfig context to use. Defaults to the value of current-context in the configuration file.

persist_config: bool (optional)

Whether changes to the configuration will be saved back to disk (e.g. GCP token refresh). Defaults to True.

Methods

get_kube_config_loader_for_yaml_file()

load()

Load Kubernetes configuration and set as default

load_first([auth])

Load the first valid configuration in the list auth.

load_kube_config()

class dask_kubernetes.KubeAuth(host, **kwargs)[source]

Configure the Kubernetes connection explicitly.

Parameters
host: str

The base URL of the Kubernetes host to connect

username: str (optional)

Username for HTTP basic authentication

password: str (optional)

Password for HTTP basic authentication

debug: bool (optional)

Debug switch

verify_ssl: bool (optional)

Set this to false to skip verifying SSL certificate when calling API from https server. Defaults to True.

ssl_ca_cert: str (optional)

Set this to customize the certificate file to verify the peer.

cert_file: str (optional)

Client certificate file

key_file: str (optional)

Client key file

assert_hostname: bool (optional)

Set this to True/False to enable/disable SSL hostname verification. Defaults to True.

proxy: str (optional)

URL for a proxy to connect through

Methods

load()

Load Kubernetes configuration and set as default

load_first([auth])

Load the first valid configuration in the list auth.

dask_kubernetes.make_pod_spec(image, labels={}, threads_per_worker=1, env={}, extra_container_config={}, extra_pod_config={}, resources=None, memory_limit=None, memory_request=None, cpu_limit=None, cpu_request=None, gpu_limit=None, annotations={})[source]

Create generic pod template from input parameters

Parameters
imagestr

Docker image name

labelsdict

Dict of labels to pass to V1ObjectMeta

threads_per_workerint

Number of threads per each worker

envdict

Dict of environment variables to pass to V1Container

extra_container_configdict

Extra config attributes to set on the container object

extra_pod_configdict

Extra config attributes to set on the pod object

resourcesstr

Resources for task constraints like “GPU=2 MEM=10e9”. Resources are applied separately to each worker process (only relevant when starting multiple worker processes. Passed to the –resources option in dask-worker.

memory_limitint, float, or str

Bytes of memory per process that the worker can use (applied to both dask-worker --memory-limit and spec.containers[].resources.limits.memory). This can be:

  • an integer (bytes), note 0 is a special case for no memory management.

  • a float (bytes). Note: fraction of total system memory is not supported by k8s.

  • a string (like 5GiB or 5000M). Note: ‘GB’ is not supported by k8s.

  • ‘auto’ for automatically computing the memory limit. [default: auto]

memory_requestint, float, or str

Like memory_limit (applied only to spec.containers[].resources.requests.memory and ignored by dask-worker).

cpu_limitfloat or str

CPU resource limits (applied to spec.containers[].resources.limits.cpu).

cpu_requestfloat or str

CPU resource requests (applied to spec.containers[].resources.requests.cpu).

gpu_limitint

GPU resource limits (applied to spec.containers[].resources.limits."nvidia.com/gpu").

annotationsdict

Dict of annotations passed to V1ObjectMeta

Returns
podV1PodSpec

Examples

>>> make_pod_spec(image='ghcr.io/dask/dask:latest', memory_limit='4G', memory_request='4G')