KubeCluster
Contents
KubeCluster¶
Cluster manager¶
The operator has a cluster manager called dask_kubernetes.operator.KubeCluster
that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask distributed.Client
object to it directly and perform your work.
The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.
Under the hood the Python cluster manager will interact with ther Kubernetes API to create custom resources for us.
To create a cluster in the default namespace, run the following
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
You can change the default configuration of the cluster by passing additional args
to the python class (namespace
, n_workers
, etc.) of your cluster. See the API reference API
You can scale the cluster
# Scale up the cluster
cluster.scale(5)
# Scale down the cluster
cluster.scale(1)
You can autoscale the cluster
# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)
# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)
You can connect to the client
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
Finally delete the cluster by running
cluster.close()
Additional worker groups¶
Additional worker groups can also be created via the cluster manager in Python.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})
We can also scale the worker groups by name from the cluster object.
cluster.scale(5, worker_group="highmem")
Additional worker groups can also be deleted in Python.
cluster.delete_worker_group(name="highmem")
Any additional worker groups you create will be deleted when the cluster is deleted.
Customising your cluster¶
The KubeCluster
class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying DaskCluster resource can be much more complex and configured in many ways.
Rather than exposing every possibility via keyword arguments instead you can pass a valid DaskCluster
resource spec which will be used when creating the cluster.
You can also generate a spec with make_cluster_spec()
which KubeCluster
uses internally and then modify it with your custom options.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
config = {
"name": "foo",
"n_workers": 2,
"resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}
cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))
You can also modify the spec before passing it to KubeCluster
, for example if you want to set nodeSelector
on your worker pods you could do it like this:
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}
cluster = KubeCluster(custom_cluster_spec=spec)
You could also have the scheduler run a Jupyter server. With this configuration you can access a Jupyter server via the Dask dashboard.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="jupyter-example", n_workers=2, env={"EXTRA_PIP_PACKAGES": "jupyterlab"})
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"].append("--jupyter")
cluster = KubeCluster(custom_cluster_spec=spec)
The cluster.add_worker_group()
method also supports passing a custom_spec
keyword argument which can be generated with make_worker_spec()
.
from dask_kubernetes.operator import KubeCluster, make_worker_spec
cluster = KubeCluster(name="example")
worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}
cluster.add_worker_group(custom_spec=worker_spec)
Private container registry¶
One common use case where make_cluster_spec
comes in handy is when pulling container images from a private registry.
The Kubernetes documentation suggests creating a Secret
with your registry credentials and then set the imagePullSecrets
option in the Pod
spec.
The KubeCluster
class doesn’t expose any way to set imagePullSecrets
so we will need to generate a spec and update it before creating the cluster.
Thankfully make_pod_spec
makes this quick and painless.
$ kubectl create secret docker-registry regcred \
--docker-server=<your-registry-server> \
--docker-username=<your-name> \
--docker-password=<your-pword> \
--docker-email=<your-email>
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
# Generate the spec
spec = make_cluster_spec(name="custom", image="foo.com/jacobtomlinson/dask:latest")
# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
# Create the cluster
cluster = KubeCluster(custom_cluster_spec=spec)
Role-Based Access Control (RBAC)¶
In order to spawn a Dask cluster from a pod that runs on the cluster, the service account creating that pod will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following ClusterRole to that ServiceAccount via a ClusterRoleBinding:
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: dask-cluster-role
rules:
# Application: watching & handling for the custom resource we declare.
- apiGroups: [kubernetes.dask.org]
resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
verbs: [get, list, watch, patch, create, delete]
# Application: other resources it needs to watch and get information from.
- apiGroups:
- "" # indicates the core API group
resources: [pods, pods/status]
verbs:
- "get"
- "list"
- "watch"
- apiGroups:
- "" # indicates the core API group
resources: [services]
verbs:
- "get"
- "list"
- "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: dask-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dask-cluster-role
subjects:
- kind: ServiceAccount
name: dask-sa # adjust name based on the service account you created
API¶
|
Launch a Dask Cluster on Kubernetes using the Operator |
|
Scale cluster to n workers |
|
Turn on adaptivity |
Get logs for Dask scheduler and workers. |
|
|
Create a dask worker group by name |
Delete a dask worker group by name |
|
|
Delete the dask cluster |
- class dask_kubernetes.operator.KubeCluster(*, name: Optional[str] = None, namespace: Optional[str] = None, image: Optional[str] = None, n_workers: Optional[int] = None, resources: Optional[Dict[str, str]] = None, env: Optional[Union[List[dict], Dict[str, str]]] = None, worker_command: Optional[List[str]] = None, port_forward_cluster_ip: Optional[bool] = None, create_mode: Optional[dask_kubernetes.operator.kubecluster.kubecluster.CreateMode] = None, shutdown_on_close: Optional[bool] = None, idle_timeout: Optional[int] = None, resource_timeout: Optional[int] = None, scheduler_service_type: Optional[str] = None, custom_cluster_spec: Optional[Union[str, dict]] = None, scheduler_forward_port: Optional[int] = None, jupyter: bool = False, loop: Optional[tornado.ioloop.IOLoop] = None, asynchronous: bool = False, quiet: bool = False, **kwargs)[source]¶
Launch a Dask Cluster on Kubernetes using the Operator
This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.
- Parameters
- name: str
Name given the Dask cluster. Required except when custom_cluster_spec is passed, in which case it’s ignored in favor of custom_cluster_spec[“metadata”][“name”].
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- image: str (optional)
Image to run in Scheduler and Worker Pods.
- n_workers: int
Number of workers on initial launch. Use
scale
to change this number in the future- resources: Dict[str, str]
- env: List[dict] | Dict[str, str]
List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs
- worker_command: List[str] | str
The command to use when starting the worker. If command consists of multiple words it should be passed as a list of strings. Defaults to
"dask-worker"
.- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to
<port>
.- create_mode: CreateMode (optional)
How to handle cluster creation if the cluster resource already exists. Default behavior is to create a new cluster if one with that name doesn’t exist, or connect to an existing one if it does. You can also set
CreateMode.CREATE_ONLY
to raise an exception if a cluster with that name already exists. OrCreateMode.CONNECT_ONLY
to raise an exception if a cluster with that name doesn’t exist.- shutdown_on_close: bool (optional)
Whether or not to delete the cluster resource when this object is closed. Defaults to
True
when creating a cluster andFalse
when connecting to an existing one.- idle_timeout: int (optional)
If set Kubernetes will delete the cluster automatically if the scheduler is idle for longer than this timeout in seconds.
- resource_timeout: int (optional)
Time in seconds to wait for Kubernetes resources to enter their expected state. Example: If the
DaskCluster
resource that gets created isn’t moved into a knownstatus.phase
by the controller then it is likely the controller isn’t running or is malfunctioning and we time out and clean up with a useful error. Example 2: If the scheduler Pod enters aCrashBackoffLoop
state for longer than this timeout we give up with a useful error. Defaults to60
seconds.- scheduler_service_type: str (optional)
Kubernetes service type to use for the scheduler. Defaults to
ClusterIP
.- jupyter: bool (optional)
Start Jupyter on the scheduler node.
- custom_cluster_spec: str | dict (optional)
Path to a YAML manifest or a dictionary representation of a
DaskCluster
resource object which will be used to create the cluster instead of generating one from the other keyword arguments.- scheduler_forward_port: int (optional)
The port to use when forwarding the scheduler dashboard. Will utilize a random port by default
- quiet: bool
If enabled, suppress all printed output. Defaults to
False
.- **kwargs: dict
Additional keyword arguments to pass to LocalCluster
See also
Examples
>>> from dask_kubernetes.operator import KubeCluster >>> cluster = KubeCluster(name="foo")
You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)
You can then resize the cluster with the scale method >>> cluster.scale(10)
And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)
You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)
You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs >>> cluster.get_logs()
You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)
- Attributes
asynchronous
Are we running in the event loop?
- called_from_running_loop
- dashboard_link
- jupyter_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
([minimum, maximum])Turn on adaptivity
add_worker_group
(name[, n_workers, image, ...])Create a dask worker group by name
close
([timeout])Delete the dask cluster
delete_worker_group
(name)Delete a dask worker group by name
from_name
(name, **kwargs)Create an instance of this class to represent an existing cluster by name.
get_client
()Return client for the cluster
get_logs
()Get logs for Dask scheduler and workers.
scale
(n[, worker_group])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
generate_rich_output
logs
- adapt(minimum=None, maximum=None)[source]¶
Turn on adaptivity
- Parameters
- minimumint
Minimum number of workers
- minimumint
Maximum number of workers
Examples
>>> cluster.adapt() # Allow scheduler to add/remove workers within k8s cluster resource limits >>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
- add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]¶
Create a dask worker group by name
- Parameters
- name: str
Name of the worker group
- n_workers: int
Number of workers on initial launch. Use
.scale(n_workers, worker_group=name)
to change this number in the future.- image: str (optional)
Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.
- resources: Dict[str, str]
Resources to be passed to the underlying pods. If ommitted will use the cluster default.
- env: List[dict]
List of environment variables to pass to worker pod. If ommitted will use the cluster default.
- custom_spec: dict (optional)
A dictionary representation of a worker spec which will be used to create the
DaskWorkerGroup
instead of generating one from the other keyword arguments.
Examples
>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
- delete_worker_group(name)[source]¶
Delete a dask worker group by name
- Parameters
- name: str
Name of the worker group
Examples
>>> cluster.delete_worker_group("high-mem-workers")
- classmethod from_name(name, **kwargs)[source]¶
Create an instance of this class to represent an existing cluster by name.
Will fail if a cluster with that name doesn’t already exist.
- Parameters
- name: str
Name of the cluster to connect to
Examples
>>> cluster = KubeCluster.from_name(name="simple-cluster")
- get_logs()[source]¶
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'foo': ..., 'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., 'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., 'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} Each log will be a string of all logs for that container. To view it is recommeded that you print each log. >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.244.0.222:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- scale(n, worker_group='default')[source]¶
Scale cluster to n workers
- Parameters
- nint
Target number of workers
- worker_groupstr
Worker group to scale
Examples
>>> cluster.scale(10) # scale cluster to ten workers >>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers
- dask_kubernetes.operator.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker', scheduler_service_type='ClusterIP', idle_timeout=0, jupyter=False)[source]¶
Generate a
DaskCluster
kubernetes resource.Populate a template with some common options to generate a
DaskCluster
kubernetes resource.- Parameters
- name: str
Name of the cluster
- image: str (optional)
Container image to use for the scheduler and workers
- n_workers: int (optional)
Number of workers in the default worker group
- resources: dict (optional)
Resource limits to set on scheduler and workers
- env: dict (optional)
Environment variables to set on scheduler and workers
- worker_command: str (optional)
Worker command to use when starting the workers
- idle_timeout: int (optional)
Timeout to cleanup idle cluster
- jupyter: bool (optional)
Start Jupyter on the Dask scheduler