KubeCluster

Note

As of 2022.10.0 the default KubeCluster class requires the Dask Kubernetes Operator. For documentation on the classic KubeCluster implementation see here.

Cluster manager

The operator has a new cluster manager called dask_kubernetes.operator.KubeCluster that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask distributed.Client object to it directly and perform your work.

The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.

Under the hood the Python cluster manager will interact with ther Kubernetes API to create custom resources for us.

To create a cluster in the default namespace, run the following

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name='foo')

You can change the default configuration of the cluster by passing additional args to the python class (namespace, n_workers, etc.) of your cluster. See the API reference API

You can scale the cluster

# Scale up the cluster
cluster.scale(5)

# Scale down the cluster
cluster.scale(1)

You can autoscale the cluster

# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)

# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)

You can connect to the client

from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

Finally delete the cluster by running

cluster.close()

Additional worker groups

Additional worker groups can also be created via the cluster manager in Python.

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name='foo')

cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})

We can also scale the worker groups by name from the cluster object.

cluster.scale(5, worker_group="highmem")

Additional worker groups can also be deleted in Python.

cluster.delete_worker_group(name="highmem")

Any additional worker groups you create will be deleted when the cluster is deleted.

Customising your cluster

The KubeCluster class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying DaskCluster resource can be much more complex and configured in many ways. Rather than exposing every possibility via keyword arguments instead you can pass a valid DaskCluster resource spec which will be used when creating the cluster. You can also generate a spec with make_cluster_spec() which KubeCluster uses internally and then modify it with your custom options.

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

config = {
   "name": "foo",
   "n_workers": 2,
   "resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}

cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))

You can also modify the spec before passing it to KubeCluster, for example if you want to set nodeSelector on your worker pods you could do it like this:

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}

cluster = KubeCluster(custom_cluster_spec=spec)

You could also have the scheduler run a Jupyter server. With this configuration you can access a Jupyter server via the Dask dashboard.

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="jupyter-example", n_workers=2, env={"EXTRA_PIP_PACKAGES": "jupyterlab"})
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"].append("--jupyter")

cluster = KubeCluster(custom_cluster_spec=spec)

The cluster.add_worker_group() method also supports passing a custom_spec keyword argument which can be generated with make_worker_spec().

from dask_kubernetes.operator import KubeCluster, make_worker_spec

cluster = KubeCluster(name="example")

worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}

cluster.add_worker_group(custom_spec=worker_spec)

Private container registry

One common use case where make_cluster_spec comes in handy is when pulling container images from a private registry. The Kubernetes documentation suggests creating a Secret with your registry credentials and then set the imagePullSecrets option in the Pod spec. The KubeCluster class doesn’t expose any way to set imagePullSecrets so we will need to generate a spec and update it before creating the cluster. Thankfully make_pod_spec makes this quick and painless.

$ kubectl create secret docker-registry regcred \
      --docker-server=<your-registry-server> \
      --docker-username=<your-name> \
      --docker-password=<your-pword> \
      --docker-email=<your-email>
from dask_kubernetes.operator import KubeCluster, make_cluster_spec

# Generate the spec
spec = make_cluster_spec(name="custom", image="foo.com/jacobtomlinson/dask:latest")

# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]

# Create the cluster
cluster = KubeCluster(custom_cluster_spec=spec)

Role-Based Access Control (RBAC)

In order to spawn a Dask cluster from a pod that runs on the cluster, the service account creating that pod will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following ClusterRole to that ServiceAccount via a ClusterRoleBinding:

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: dask-cluster-role
rules:
  # Application: watching & handling for the custom resource we declare.
  - apiGroups: [kubernetes.dask.org]
    resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
    verbs: [get, list, watch, patch, create, delete]

  # Application: other resources it needs to watch and get information from.
  - apiGroups:
    - ""  # indicates the core API group
    resources: [pods, pods/status]
    verbs:
    - "get"
    - "list"
    - "watch"

  - apiGroups:
    - ""  # indicates the core API group
    resources: [services]
    verbs:
    - "get"
    - "list"
    - "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: dask-cluster-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: dask-cluster-role
subjects:
  - kind: ServiceAccount
    name: dask-sa  # adjust name based on the service account you created

API

KubeCluster(*[, name, namespace, image, ...])

Launch a Dask Cluster on Kubernetes using the Operator

KubeCluster.scale(n[, worker_group])

Scale cluster to n workers

KubeCluster.adapt([minimum, maximum])

Turn on adaptivity

KubeCluster.get_logs()

Get logs for Dask scheduler and workers.

KubeCluster.add_worker_group(name[, ...])

Create a dask worker group by name

KubeCluster.delete_worker_group(name)

Delete a dask worker group by name

KubeCluster.close([timeout])

Delete the dask cluster

class dask_kubernetes.operator.KubeCluster(*, name: Optional[str] = None, namespace: Optional[str] = None, image: Optional[str] = None, n_workers: Optional[int] = None, resources: Optional[Dict[str, str]] = None, env: Optional[Union[List[dict], Dict[str, str]]] = None, worker_command: Optional[List[str]] = None, port_forward_cluster_ip: Optional[bool] = None, create_mode: Optional[dask_kubernetes.operator.kubecluster.kubecluster.CreateMode] = None, shutdown_on_close: Optional[bool] = None, idle_timeout: Optional[int] = None, resource_timeout: Optional[int] = None, scheduler_service_type: Optional[str] = None, custom_cluster_spec: Optional[Union[str, dict]] = None, scheduler_forward_port: Optional[int] = None, jupyter: bool = False, loop: Optional[tornado.ioloop.IOLoop] = None, asynchronous: bool = False, quiet: bool = False, **kwargs)[source]

Launch a Dask Cluster on Kubernetes using the Operator

This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.

Parameters
name: str

Name given the Dask cluster. Required except when custom_cluster_spec is passed, in which case it’s ignored in favor of custom_cluster_spec[“metadata”][“name”].

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

image: str (optional)

Image to run in Scheduler and Worker Pods.

n_workers: int

Number of workers on initial launch. Use scale to change this number in the future

resources: Dict[str, str]
env: List[dict] | Dict[str, str]

List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs

worker_command: List[str] | str

The command to use when starting the worker. If command consists of multiple words it should be passed as a list of strings. Defaults to "dask-worker".

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to <port>.

create_mode: CreateMode (optional)

How to handle cluster creation if the cluster resource already exists. Default behavior is to create a new cluster if one with that name doesn’t exist, or connect to an existing one if it does. You can also set CreateMode.CREATE_ONLY to raise an exception if a cluster with that name already exists. Or CreateMode.CONNECT_ONLY to raise an exception if a cluster with that name doesn’t exist.

shutdown_on_close: bool (optional)

Whether or not to delete the cluster resource when this object is closed. Defaults to True when creating a cluster and False when connecting to an existing one.

idle_timeout: int (optional)

If set Kubernetes will delete the cluster automatically if the scheduler is idle for longer than this timeout in seconds.

resource_timeout: int (optional)

Time in seconds to wait for Kubernetes resources to enter their expected state. Example: If the DaskCluster resource that gets created isn’t moved into a known status.phase by the controller then it is likely the controller isn’t running or is malfunctioning and we time out and clean up with a useful error. Example 2: If the scheduler Pod enters a CrashBackoffLoop state for longer than this timeout we give up with a useful error. Defaults to 60 seconds.

scheduler_service_type: str (optional)

Kubernetes service type to use for the scheduler. Defaults to ClusterIP.

jupyter: bool (optional)

Start Jupyter on the scheduler node.

custom_cluster_spec: str | dict (optional)

Path to a YAML manifest or a dictionary representation of a DaskCluster resource object which will be used to create the cluster instead of generating one from the other keyword arguments.

scheduler_forward_port: int (optional)

The port to use when forwarding the scheduler dashboard. Will utilize a random port by default

quiet: bool

If enabled, suppress all printed output. Defaults to False.

**kwargs: dict

Additional keyword arguments to pass to LocalCluster

Examples

>>> from dask_kubernetes.operator import KubeCluster
>>> cluster = KubeCluster(name="foo")

You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)

You can then resize the cluster with the scale method >>> cluster.scale(10)

And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)

You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)

You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)

You can also access cluster logs >>> cluster.get_logs()

You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)

Attributes
asynchronous

Are we running in the event loop?

called_from_running_loop
dashboard_link
jupyter_link
loop
name
observed
plan
requested
scheduler_address

Methods

adapt([minimum, maximum])

Turn on adaptivity

add_worker_group(name[, n_workers, image, ...])

Create a dask worker group by name

close([timeout])

Delete the dask cluster

delete_worker_group(name)

Delete a dask worker group by name

from_name(name, **kwargs)

Create an instance of this class to represent an existing cluster by name.

get_client()

Return client for the cluster

get_logs()

Get logs for Dask scheduler and workers.

scale(n[, worker_group])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

wait_for_workers(n_workers[, timeout])

Blocking call to wait for n workers before continuing

generate_rich_output

logs

adapt(minimum=None, maximum=None)[source]

Turn on adaptivity

Parameters
minimumint

Minimum number of workers

minimumint

Maximum number of workers

Examples

>>> cluster.adapt()  # Allow scheduler to add/remove workers within k8s cluster resource limits
>>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]

Create a dask worker group by name

Parameters
name: str

Name of the worker group

n_workers: int

Number of workers on initial launch. Use .scale(n_workers, worker_group=name) to change this number in the future.

image: str (optional)

Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.

resources: Dict[str, str]

Resources to be passed to the underlying pods. If ommitted will use the cluster default.

env: List[dict]

List of environment variables to pass to worker pod. If ommitted will use the cluster default.

custom_spec: dict (optional)

A dictionary representation of a worker spec which will be used to create the DaskWorkerGroup instead of generating one from the other keyword arguments.

Examples

>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
close(timeout=3600)[source]

Delete the dask cluster

delete_worker_group(name)[source]

Delete a dask worker group by name

Parameters
name: str

Name of the worker group

Examples

>>> cluster.delete_worker_group("high-mem-workers")
classmethod from_name(name, **kwargs)[source]

Create an instance of this class to represent an existing cluster by name.

Will fail if a cluster with that name doesn’t already exist.

Parameters
name: str

Name of the cluster to connect to

Examples

>>> cluster = KubeCluster.from_name(name="simple-cluster")
get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'foo': ...,
'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ...,
'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ...,
'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...}
Each log will be a string of all logs for that container. To view
it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.244.0.222:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n, worker_group='default')[source]

Scale cluster to n workers

Parameters
nint

Target number of workers

worker_groupstr

Worker group to scale

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
>>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers
dask_kubernetes.operator.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker', scheduler_service_type='ClusterIP', idle_timeout=0, jupyter=False)[source]

Generate a DaskCluster kubernetes resource.

Populate a template with some common options to generate a DaskCluster kubernetes resource.

Parameters
name: str

Name of the cluster

image: str (optional)

Container image to use for the scheduler and workers

n_workers: int (optional)

Number of workers in the default worker group

resources: dict (optional)

Resource limits to set on scheduler and workers

env: dict (optional)

Environment variables to set on scheduler and workers

worker_command: str (optional)

Worker command to use when starting the workers

idle_timeout: int (optional)

Timeout to cleanup idle cluster

jupyter: bool (optional)

Start Jupyter on the Dask scheduler

dask_kubernetes.operator.make_worker_spec(image='ghcr.io/dask/dask:latest', n_workers=3, resources=None, env=None, worker_command='dask-worker')[source]