KubeCluster (experimental)
Contents
KubeCluster (experimental)¶
Warning
The Dask Operator for Kubernetes is experimental. So any bug reports are appreciated!
Cluster manager¶
The operator has a new cluster manager called dask_kubernetes.experimental.KubeCluster
that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask distributed.Client
object to it directly and perform your work.
The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.
Under the hood the Python cluster manager will interact with ther Kubernetes API to create custom resources for us.
To create a cluster in the default namespace, run the following
from dask_kubernetes.experimental import KubeCluster
cluster = KubeCluster(name='foo')
You can change the default configuration of the cluster by passing additional args
to the python class (namespace
, n_workers
, etc.) of your cluster. See the API reference API
You can scale the cluster
# Scale up the cluster
cluster.scale(5)
# Scale down the cluster
cluster.scale(1)
You can connect to the client
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
Finally delete the cluster by running
cluster.close()
Additional worker groups¶
Additional worker groups can also be created via the cluster manager in Python.
from dask_kubernetes.experimental import KubeCluster
cluster = KubeCluster(name='foo')
cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})
We can also scale the worker groups by name from the cluster object.
cluster.scale(5, worker_group="highmem")
Additional worker groups can also be deleted in Python.
cluster.delete_worker_group(name="highmem")
Any additional worker groups you create will be deleted when the cluster is deleted.
API¶
|
Launch a Dask Cluster on Kubernetes using the Operator |
|
Scale cluster to n workers |
Get logs for Dask scheduler and workers. |
|
|
Create a dask worker group by name |
Delete a dask worker group by name |
|
|
Delete the dask cluster |
- class dask_kubernetes.experimental.KubeCluster(name, namespace=None, image='ghcr.io/dask/dask:latest', n_workers=3, resources={}, env=[], worker_command='dask-worker', auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], port_forward_cluster_ip=None, create_mode=CreateMode.CREATE_OR_CONNECT, shutdown_on_close=None, **kwargs)[source]¶
Launch a Dask Cluster on Kubernetes using the Operator
This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.
- Parameters
- name: str (required)
Name given the Dask cluster.
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- image: str (optional)
Image to run in Scheduler and Worker Pods.
- n_workers: int
Number of workers on initial launch. Use
scale
to change this number in the future- resources: Dict[str, str]
- env: List[dict] | Dict[str, str]
List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs
- worker_command: List[str] | str
The command to use when starting the worker. If command consists of multiple words it should be passed as a list of strings. Defaults to
"dask-worker"
.- auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
[InCluster(), KubeConfig()]
.- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to
<port>
.- create_mode: CreateMode (optional)
How to handle cluster creation if the cluster resource already exists. Default behaviour is to create a new clustser if one with that name doesn’t exist, or connect to an existing one if it does. You can also set
CreateMode.CREATE_ONLY
to raise an exception if a cluster with that name already exists. OrCreateMode.CONNECT_ONLY
to raise an exception if a cluster with that name doesn’t exist.- shutdown_on_close: bool (optional)
Whether or not to delete the cluster resource when this object is closed. Defaults to
True
when creating a cluster andFalse
when connecting to an existing one.- **kwargs: dict
Additional keyword arguments to pass to LocalCluster
See also
Examples
>>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster(name="foo")
You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)
You can then resize the cluster with the scale method >>> cluster.scale(10)
And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)
You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)
You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs >>> cluster.get_logs()
You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)
- Attributes
asynchronous
Are we running in the event loop?
- cluster_name
- dashboard_link
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
(*args, **kwargs)Turn on adaptivity
add_worker_group
(name[, n_workers, image, ...])Create a dask worker group by name
close
([timeout])Delete the dask cluster
delete_worker_group
(name)Delete a dask worker group by name
from_name
(name, **kwargs)Create an instance of this class to represent an existing cluster by name.
get_logs
()Get logs for Dask scheduler and workers.
scale
(n[, worker_group])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
logs
- add_worker_group(name, n_workers=3, image=None, resources=None, env=None)[source]¶
Create a dask worker group by name
- Parameters
- name: str
Name of the worker group
- n_workers: int
Number of workers on initial launch. Use
.scale(n_workers, worker_group=name)
to change this number in the future.- image: str (optional)
Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.
- resources: Dict[str, str]
Resources to be passed to the underlying pods. If ommitted will use the cluster default.
- env: List[dict]
List of environment variables to pass to worker pod. If ommitted will use the cluster default.
Examples
>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
- delete_worker_group(name)[source]¶
Delete a dask worker group by name
- Parameters
- name: str
Name of the worker group
Examples
>>> cluster.delete_worker_group("high-mem-workers")
- classmethod from_name(name, **kwargs)[source]¶
Create an instance of this class to represent an existing cluster by name.
Will fail if a cluster with that name doesn’t already exist.
- Parameters
- name: str
Name of the cluster to connect to
Examples
>>> cluster = KubeCluster.from_name(name="simple-cluster")
- get_logs()[source]¶
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'foo-cluster-scheduler': ..., 'foo-cluster-default-worker-group-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., 'foo-cluster-default-worker-group-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., 'foo-cluster-default-worker-group-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} Each log will be a string of all logs for that container. To view it is recommeded that you print each log. >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.244.0.222:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- scale(n, worker_group='default')[source]¶
Scale cluster to n workers
- Parameters
- nint
Target number of workers
- worker_groupstr
Worker group to scale
Examples
>>> cluster.scale(10) # scale cluster to ten workers >>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers