HelmCluster¶
Quickstart¶
helm repo add dask https://helm.dask.org
helm repo update
helm install myrelease dask/dask
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="myrelease")
cluster.scale(10) # specify number of workers explicitly
For more information see the HelmCluster
API reference.
Warning
It is not possible to use HelmCluster
from the Jupyter session
which is deployed as part of the Helm Chart without first copying your
~/.kube/config
file to that Jupyter session.
API¶
HelmCluster ([release_name, auth, namespace, …]) |
Connect to a Dask cluster deployed via the Helm Chart. |
HelmCluster.scale (n_workers) |
Scale cluster to n workers. |
HelmCluster.adapt (*args, **kwargs) |
Turn on adaptivity (Not recommended). |
HelmCluster.logs (*args, **kwargs) |
-
class
dask_kubernetes.
HelmCluster
(release_name=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, loop=None, asynchronous=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, **kwargs)[source]¶ Connect to a Dask cluster deployed via the Helm Chart.
This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.
Parameters: - release_name: str
Name of the helm release to connect to.
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are using
HelmCluster
from the Jupyter session that was installed by the helm chart this should beFalse
. If you are running it locally it should be the port you are forwarding to<port>
.- auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
[InCluster(), KubeConfig()]
.- scheduler_name: str (optional)
Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”.
- worker_name: str (optional)
Name of the Dask worker deployment in the current release. Defaults to “worker”.
- node_host: str (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which node to connect to.- node_port: int (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which port to connect to.- **kwargs: dict
Additional keyword arguments to pass to Cluster.
See also
HelmCluster.scale
HelmCluster.logs
Examples
>>> from dask_kubernetes import HelmCluster >>> cluster = HelmCluster(release_name="myhelmrelease")
You can then resize the cluster with the scale method
>>> cluster.scale(10)
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs
>>> cluster.get_logs()
Attributes: - asynchronous
- dashboard_link
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
(*args, **kwargs)Turn on adaptivity (Not recommended). get_logs
()Get logs for Dask scheduler and workers. scale
(n_workers)Scale cluster to n workers. close from_name logs sync -
get_logs
()[source]¶ Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'testdask-scheduler-5c8ffb6b7b-sjgrg': ..., 'testdask-worker-64c8b78cc-992z8': ..., 'testdask-worker-64c8b78cc-hzpdc': ..., 'testdask-worker-64c8b78cc-wbk4f': ...}
Each log will be a string of all logs for that container. To view it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.1.6.131:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
-
scale
(n_workers)[source]¶ Scale cluster to n workers.
This sets the Dask worker deployment size to the requested number. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.
Examples
>>> cluster HelmCluster('tcp://localhost:8786', workers=3, threads=18, memory=18.72 GB) >>> cluster.scale(4) >>> cluster HelmCluster('tcp://localhost:8786', workers=4, threads=24, memory=24.96 GB)