HelmCluster

HelmCluster

HelmCluster is for managing an existing Dask cluster which has been deployed using Helm.

Quickstart

First you must install the Dask Helm chart with helm and have the cluster running.

helm repo add dask https://helm.dask.org
helm repo update

helm install myrelease dask/dask

You can then create a HelmCluster object in Python to manage scaling the cluster and retrieve logs.

from dask_kubernetes import HelmCluster

cluster = HelmCluster(release_name="myrelease")
cluster.scale(10)  # specify number of workers explicitly

With this cluster object you can conveniently connect a Dask dask.distributed.Client object to the cluster and perform your work. Provided you have API access to Kubernetes and can run the kubectl command then connectivity to the Dask cluster is handled automatically for you via services or port forwarding.

# Example usage
from dask.distributed import Client
import dask.array as da

# Connect Dask to the cluster
client = Client(cluster)

# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0

For more information see the HelmCluster API reference.

Warning

It is not possible to use HelmCluster from the Jupyter session which is deployed as part of the Helm Chart without first copying your ~/.kube/config file to that Jupyter session.

API

HelmCluster([release_name, auth, namespace, ...])

Connect to a Dask cluster deployed via the Helm Chart.

HelmCluster.scale(n_workers[, worker_group])

Scale cluster to n workers.

HelmCluster.adapt(*args, **kwargs)

Turn on adaptivity (Not recommended).

HelmCluster.logs(*args, **kwargs)

class dask_kubernetes.HelmCluster(release_name=None, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, name=None, **kwargs)[source]

Connect to a Dask cluster deployed via the Helm Chart.

This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.

Parameters
release_name: str

Name of the helm release to connect to.

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are using HelmCluster from the Jupyter session that was installed by the helm chart this should be False. If you are running it locally it should be the port you are forwarding to <port>.

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

scheduler_name: str (optional)

Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”.

worker_name: str (optional)

Name of the Dask worker deployment in the current release. Defaults to “worker”.

node_host: str (optional)

A node address. Can be provided in case scheduler service type is NodePort and you want to manually specify which node to connect to.

node_port: int (optional)

A node address. Can be provided in case scheduler service type is NodePort and you want to manually specify which port to connect to.

**kwargs: dict

Additional keyword arguments to pass to Cluster.

See also

HelmCluster.scale
HelmCluster.logs

Examples

>>> from dask_kubernetes import HelmCluster
>>> cluster = HelmCluster(release_name="myhelmrelease")

You can then resize the cluster with the scale method

>>> cluster.scale(10)

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can also access cluster logs

>>> cluster.get_logs()
Attributes
asynchronous

Are we running in the event loop?

called_from_running_loop
dashboard_link
loop
name
observed
plan
requested
scheduler_address

Methods

adapt(*args, **kwargs)

Turn on adaptivity (Not recommended).

get_client()

Return client for the cluster

get_logs()

Get logs for Dask scheduler and workers.

scale(n_workers[, worker_group])

Scale cluster to n workers.

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

wait_for_workers(n_workers[, timeout])

Blocking call to wait for n workers before continuing

close

from_name

logs

adapt(*args, **kwargs)[source]

Turn on adaptivity (Not recommended).

get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'testdask-scheduler-5c8ffb6b7b-sjgrg': ...,
'testdask-worker-64c8b78cc-992z8': ...,
'testdask-worker-64c8b78cc-hzpdc': ...,
'testdask-worker-64c8b78cc-wbk4f': ...}

Each log will be a string of all logs for that container. To view it is recommeded that you print each log.

>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.1.6.131:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n_workers, worker_group=None)[source]

Scale cluster to n workers.

This sets the Dask worker deployment size to the requested number. It also allows you to set the worker deployment size of another worker group. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.

Examples

>>> cluster
HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=4, threads=241, memory=2.95 TiB)
>>> cluster.scale(4)
>>> cluster
HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=5, threads=321, memory=3.94 TiB)
>>> cluster.scale(5, worker_group="high-mem-workers")
>>> cluster
HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=9, threads=325, memory=3.94 TiB)