HelmCluster
Contents
HelmCluster¶
HelmCluster is for managing an existing Dask cluster which has been deployed using Helm.
Quickstart¶
First you must install the Dask Helm chart with helm
and have the cluster running.
helm repo add dask https://helm.dask.org
helm repo update
helm install myrelease dask/dask
You can then create a HelmCluster
object in Python to manage scaling the cluster and retrieve logs.
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="myrelease")
cluster.scale(10) # specify number of workers explicitly
With this cluster object you can conveniently connect a Dask dask.distributed.Client
object to the cluster
and perform your work. Provided you have API access to Kubernetes and can run the kubectl
command then
connectivity to the Dask cluster is handled automatically for you via services or port forwarding.
# Example usage
from dask.distributed import Client
import dask.array as da
# Connect Dask to the cluster
client = Client(cluster)
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute()) # Should print 1.0
For more information see the HelmCluster
API reference.
Warning
It is not possible to use HelmCluster
from the Jupyter session
which is deployed as part of the Helm Chart without first copying your
~/.kube/config
file to that Jupyter session.
API¶
|
Connect to a Dask cluster deployed via the Helm Chart. |
|
Scale cluster to n workers. |
|
Turn on adaptivity (Not recommended). |
|
- class dask_kubernetes.HelmCluster(release_name=None, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, name=None, **kwargs)[source]¶
Connect to a Dask cluster deployed via the Helm Chart.
This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.
- Parameters
- release_name: str
Name of the helm release to connect to.
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are using
HelmCluster
from the Jupyter session that was installed by the helm chart this should beFalse
. If you are running it locally it should be the port you are forwarding to<port>
.- auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
[InCluster(), KubeConfig()]
.- scheduler_name: str (optional)
Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”.
- worker_name: str (optional)
Name of the Dask worker deployment in the current release. Defaults to “worker”.
- node_host: str (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which node to connect to.- node_port: int (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which port to connect to.- **kwargs: dict
Additional keyword arguments to pass to Cluster.
See also
HelmCluster.scale
HelmCluster.logs
Examples
>>> from dask_kubernetes import HelmCluster >>> cluster = HelmCluster(release_name="myhelmrelease")
You can then resize the cluster with the scale method
>>> cluster.scale(10)
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs
>>> cluster.get_logs()
- Attributes
asynchronous
Are we running in the event loop?
- dashboard_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
(*args, **kwargs)Turn on adaptivity (Not recommended).
get_client
()Return client for the cluster
get_logs
()Get logs for Dask scheduler and workers.
scale
(n_workers[, worker_group])Scale cluster to n workers.
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
([n_workers, timeout])Blocking call to wait for n workers before continuing
close
from_name
logs
- get_logs()[source]¶
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'testdask-scheduler-5c8ffb6b7b-sjgrg': ..., 'testdask-worker-64c8b78cc-992z8': ..., 'testdask-worker-64c8b78cc-hzpdc': ..., 'testdask-worker-64c8b78cc-wbk4f': ...}
Each log will be a string of all logs for that container. To view it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.1.6.131:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- scale(n_workers, worker_group=None)[source]¶
Scale cluster to n workers.
This sets the Dask worker deployment size to the requested number. It also allows you to set the worker deployment size of another worker group. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.
Examples
>>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=4, threads=241, memory=2.95 TiB) >>> cluster.scale(4) >>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=5, threads=321, memory=3.94 TiB) >>> cluster.scale(5, worker_group="high-mem-workers") >>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=9, threads=325, memory=3.94 TiB)