KubeCluster (experimental)

Warning

The Dask Operator for Kubernetes is experimental. So any bug reports are appreciated!

Cluster manager

The operator has a new cluster manager called dask_kubernetes.experimental.KubeCluster that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask distributed.Client object to it directly and perform your work.

The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.

Under the hood the Python cluster manager will interact with ther Kubernetes API to create custom resources for us.

To create a cluster in the default namespace, run the following

from dask_kubernetes.experimental import KubeCluster

cluster = KubeCluster(name='foo')

You can change the default configuration of the cluster by passing additional args to the python class (namespace, n_workers, etc.) of your cluster. See the API reference API

You can scale the cluster

# Scale up the cluster
cluster.scale(5)

# Scale down the cluster
cluster.scale(1)

You can autoscale the cluster

# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)

# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)

You can connect to the client

from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

Finally delete the cluster by running

cluster.close()

Additional worker groups

Additional worker groups can also be created via the cluster manager in Python.

from dask_kubernetes.experimental import KubeCluster

cluster = KubeCluster(name='foo')

cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})

We can also scale the worker groups by name from the cluster object.

cluster.scale(5, worker_group="highmem")

Additional worker groups can also be deleted in Python.

cluster.delete_worker_group(name="highmem")

Any additional worker groups you create will be deleted when the cluster is deleted.

Custom cluster spec

The KubeCluster class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying DaskCluster resource can be much more complex and configured in many ways. Rather than exposing every possibility via keyword arguments instead you can pass a valid DaskCluster resource spec which will be used when creating the cluster. You can also generate a spec with make_cluster_spec() which KubeCluster uses internally and then modify it with your custom options.

from dask_kubernetes.experimental import KubeCluster, make_cluster_spec

config = {
   "name": "foo",
   "n_workers": 2,
   "resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}

cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))

You can also modify the spec before passing it to KubeCluster, for example if you want to set nodeSelector on your worker pods you could do it like this:

from dask_kubernetes.experimental import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}

cluster = KubeCluster(custom_cluster_spec=spec)

The cluster.add_worker_group() method also supports passing a custom_spec keyword argument which can be generated with make_worker_spec().

from dask_kubernetes.experimental import KubeCluster, make_worker_spec

cluster = KubeCluster(name="example")

worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}

cluster.add_worker_group(custom_spec=worker_spec)

API

KubeCluster([name, namespace, image, ...])

Launch a Dask Cluster on Kubernetes using the Operator

KubeCluster.scale(n[, worker_group])

Scale cluster to n workers

KubeCluster.adapt([minimum, maximum])

Turn on adaptivity

KubeCluster.get_logs()

Get logs for Dask scheduler and workers.

KubeCluster.add_worker_group(name[, ...])

Create a dask worker group by name

KubeCluster.delete_worker_group(name)

Delete a dask worker group by name

KubeCluster.close([timeout])

Delete the dask cluster

class dask_kubernetes.experimental.KubeCluster(name=None, namespace=None, image='ghcr.io/dask/dask:latest', n_workers=3, resources={}, env=[], worker_command='dask-worker', auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], port_forward_cluster_ip=None, create_mode=CreateMode.CREATE_OR_CONNECT, shutdown_on_close=None, resource_timeout=60, custom_cluster_spec=None, **kwargs)[source]

Launch a Dask Cluster on Kubernetes using the Operator

This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.

Parameters
name: str (required)

Name given the Dask cluster.

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

image: str (optional)

Image to run in Scheduler and Worker Pods.

n_workers: int

Number of workers on initial launch. Use scale to change this number in the future

resources: Dict[str, str]
env: List[dict] | Dict[str, str]

List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs

worker_command: List[str] | str

The command to use when starting the worker. If command consists of multiple words it should be passed as a list of strings. Defaults to "dask-worker".

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to <port>.

create_mode: CreateMode (optional)

How to handle cluster creation if the cluster resource already exists. Default behaviour is to create a new clustser if one with that name doesn’t exist, or connect to an existing one if it does. You can also set CreateMode.CREATE_ONLY to raise an exception if a cluster with that name already exists. Or CreateMode.CONNECT_ONLY to raise an exception if a cluster with that name doesn’t exist.

shutdown_on_close: bool (optional)

Whether or not to delete the cluster resource when this object is closed. Defaults to True when creating a cluster and False when connecting to an existing one.

resource_timeout: int (optional)

Time in seconds to wait for the controller to take action before giving up. If the DaskCluster resource that gets created isn’t moved into a known status.phase by the controller then it is likely the controller isn’t running or is malfunctioning and we time out and clean up with a useful error. Defaults to 60 seconds.

custom_cluster_spec: dict (optional)

A dictionary representation of a DaskCluster resource object which will be used to create the cluster instead of generating one from the other keyword arguments.

**kwargs: dict

Additional keyword arguments to pass to LocalCluster

Examples

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster(name="foo")

You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)

You can then resize the cluster with the scale method >>> cluster.scale(10)

And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)

You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)

You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)

You can also access cluster logs >>> cluster.get_logs()

You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)

Attributes
asynchronous

Are we running in the event loop?

dashboard_link
loop
name
observed
plan
requested
scheduler_address

Methods

adapt([minimum, maximum])

Turn on adaptivity

add_worker_group(name[, n_workers, image, ...])

Create a dask worker group by name

close([timeout])

Delete the dask cluster

delete_worker_group(name)

Delete a dask worker group by name

from_name(name, **kwargs)

Create an instance of this class to represent an existing cluster by name.

get_client()

Return client for the cluster

get_logs()

Get logs for Dask scheduler and workers.

scale(n[, worker_group])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

logs

adapt(minimum=None, maximum=None)[source]

Turn on adaptivity

Parameters
minimumint

Minimum number of workers

minimumint

Maximum number of workers

Examples

>>> cluster.adapt()  # Allow scheduler to add/remove workers within k8s cluster resource limits
>>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]

Create a dask worker group by name

Parameters
name: str

Name of the worker group

n_workers: int

Number of workers on initial launch. Use .scale(n_workers, worker_group=name) to change this number in the future.

image: str (optional)

Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.

resources: Dict[str, str]

Resources to be passed to the underlying pods. If ommitted will use the cluster default.

env: List[dict]

List of environment variables to pass to worker pod. If ommitted will use the cluster default.

custom_spec: dict (optional)

A dictionary representation of a worker spec which will be used to create the DaskWorkerGroup instead of generating one from the other keyword arguments.

Examples

>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
close(timeout=3600)[source]

Delete the dask cluster

delete_worker_group(name)[source]

Delete a dask worker group by name

Parameters
name: str

Name of the worker group

Examples

>>> cluster.delete_worker_group("high-mem-workers")
classmethod from_name(name, **kwargs)[source]

Create an instance of this class to represent an existing cluster by name.

Will fail if a cluster with that name doesn’t already exist.

Parameters
name: str

Name of the cluster to connect to

Examples

>>> cluster = KubeCluster.from_name(name="simple-cluster")
get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'foo': ...,
'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ...,
'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ...,
'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...}
Each log will be a string of all logs for that container. To view
it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.244.0.222:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n, worker_group='default')[source]

Scale cluster to n workers

Parameters
nint

Target number of workers

worker_groupstr

Worker group to scale

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
>>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers
dask_kubernetes.experimental.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker')[source]

Generate a DaskCluster kubernetes resource.

Populate a template with some common options to generate a DaskCluster kubernetes resource.

Parameters
name: str

Name of the cluster

image: str (optional)

Container image to use for the scheduler and workers

n_workers: int (optional)

Number of workers in the default worker group

resources: dict (optional)

Resource limits to set on scheduler and workers

env: dict (optional)

Environment variables to set on scheduler and workers

worker_command: str (optional)

Worker command to use when starting the workers

dask_kubernetes.experimental.make_worker_spec(cluster_name, image='ghcr.io/dask/dask:latest', n_workers=3, resources=None, env=None, worker_command='dask-worker')[source]