HelmCluster

Quickstart

helm repo add dask https://helm.dask.org
helm repo update

helm install myrelease dask/dask
from dask_kubernetes import HelmCluster

cluster = HelmCluster(release_name="myrelease")
cluster.scale(10)  # specify number of workers explicitly

For more information see the HelmCluster API reference.

Warning

It is not possible to use HelmCluster from the Jupyter session which is deployed as part of the Helm Chart without first copying your ~/.kube/config file to that Jupyter session.

API

HelmCluster([release_name, auth, namespace, …]) Connect to a Dask cluster deployed via the Helm Chart.
HelmCluster.scale(n_workers) Scale cluster to n workers.
HelmCluster.adapt(*args, **kwargs) Turn on adaptivity (Not recommended).
HelmCluster.logs(*args, **kwargs)
class dask_kubernetes.HelmCluster(release_name=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, loop=None, asynchronous=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, **kwargs)[source]

Connect to a Dask cluster deployed via the Helm Chart.

This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.

Parameters:
release_name: str

Name of the helm release to connect to.

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are using HelmCluster from the Jupyter session that was installed by the helm chart this should be False. If you are running it locally it should be the port you are forwarding to <port>.

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

scheduler_name: str (optional)

Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”.

worker_name: str (optional)

Name of the Dask worker deployment in the current release. Defaults to “worker”.

node_host: str (optional)

A node address. Can be provided in case scheduler service type is NodePort and you want to manually specify which node to connect to.

node_port: int (optional)

A node address. Can be provided in case scheduler service type is NodePort and you want to manually specify which port to connect to.

**kwargs: dict

Additional keyword arguments to pass to Cluster.

See also

HelmCluster.scale
HelmCluster.logs

Examples

>>> from dask_kubernetes import HelmCluster
>>> cluster = HelmCluster(release_name="myhelmrelease")

You can then resize the cluster with the scale method

>>> cluster.scale(10)

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can also access cluster logs

>>> cluster.get_logs()
Attributes:
asynchronous
dashboard_link
name
observed
plan
requested
scheduler_address

Methods

adapt(*args, **kwargs) Turn on adaptivity (Not recommended).
get_logs() Get logs for Dask scheduler and workers.
scale(n_workers) Scale cluster to n workers.
close  
from_name  
logs  
sync  
adapt(*args, **kwargs)[source]

Turn on adaptivity (Not recommended).

get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'testdask-scheduler-5c8ffb6b7b-sjgrg': ...,
'testdask-worker-64c8b78cc-992z8': ...,
'testdask-worker-64c8b78cc-hzpdc': ...,
'testdask-worker-64c8b78cc-wbk4f': ...}

Each log will be a string of all logs for that container. To view it is recommeded that you print each log.

>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.1.6.131:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n_workers)[source]

Scale cluster to n workers.

This sets the Dask worker deployment size to the requested number. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.

Examples

>>> cluster
HelmCluster('tcp://localhost:8786', workers=3, threads=18, memory=18.72 GB)
>>> cluster.scale(4)
>>> cluster
HelmCluster('tcp://localhost:8786', workers=4, threads=24, memory=24.96 GB)