Dask Kubernetes
Contents
Dask Kubernetes¶
Welcome to the documentation for dask-kubernetes
.
Note
If you are looking for general documentation on deploying Dask on Kubernetes new users should head to the Dask documentation page on Kubernetes.
The package dask-kubernetes
provides cluster managers for Kubernetes. dask-kubernetes
is one of many options for deploying Dask clusters, see Deploying Dask in the Dask documentation for an overview of additional options.
KubeCluster¶
KubeCluster
deploys Dask clusters on Kubernetes clusters using custom
Kubernetes resources. It is designed to dynamically launch ad-hoc deployments.
$ # Install operator CRDs and controller, needs to be done once on your Kubernetes cluster
$ helm install --repo https://helm.dask.org --create-namespace -n dask-operator --generate-name dask-kubernetes-operator
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="my-dask-cluster", image='ghcr.io/dask/dask:latest')
cluster.scale(10)
HelmCluster¶
HelmCluster
is for managing an existing Dask cluster which has been deployed using
Helm. You must have already installed the Dask Helm chart
and have the cluster running. You can then use it to manage scaling and retrieve logs.
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="myrelease")
cluster.scale(10)
Installing¶
You can install dask-kubernetes with pip
, conda
, or by installing from source.
Dependencies¶
To use KubeCluster
you may need to have kubectl
installed (official install guide).
To use HelmCluster
you will need to have helm
installed (official install guide).
Pip¶
Pip can be used to install dask-kubernetes and its Python dependencies:
pip install dask-kubernetes --upgrade # Install everything from last released version
Conda¶
To install the latest version of dask-kubernetes from the conda-forge repository using conda:
conda install dask-kubernetes -c conda-forge
Install from Source¶
To install dask-kubernetes from source, clone the repository from github:
git clone https://github.com/dask/dask-kubernetes.git
cd dask-kubernetes
python setup.py install
or use pip
locally if you want to install all dependencies as well:
pip install -e .
You can also install directly from git main branch:
pip install git+https://github.com/dask/dask-kubernetes
Supported Versions¶
Python¶
All Dask projects generally follow the NEP 29 deprecation policy for Python where each Python minor version is support ed for 42 months. Due to Python’s 12 month release cycle this ensures at least the current version and two previous versions are supported.
The Dask Kubernetes CI tests all PRs against all supported Python versions.
Kubernetes¶
For Kubernetes we follow the yearly support KEP. Due to the 4-6 month release cycle this also ensures that at least the current and two previous versions are supported.
The Dask Kubernetes CI tests all PRs against all supported Kubernetes versions.
Note
To keep the CI matrix smaller we test all Kubernetes versions against the latest Python, and all Python versions against the latest Kubernetes. We do not test older versions of Python and Kubernetes together. See dask/dask-kubernetes#559 for more information.
KubeCluster¶
Note
As of 2022.10.0
the default KubeCluster
class requires the Dask Kubernetes Operator. For documentation on the classic KubeCluster implementation see here.
Cluster manager¶
The operator has a new cluster manager called dask_kubernetes.operator.KubeCluster
that you can use to conveniently create and manage a Dask cluster in Python. Then connect a Dask distributed.Client
object to it directly and perform your work.
The goal of the cluster manager is to abstract away the complexity of the Kubernetes resources and provide a clean and simple Python API to manager clusters while still getting all the benefits of the operator.
Under the hood the Python cluster manager will interact with ther Kubernetes API to create custom resources for us.
To create a cluster in the default namespace, run the following
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
You can change the default configuration of the cluster by passing additional args
to the python class (namespace
, n_workers
, etc.) of your cluster. See the API reference API
You can scale the cluster
# Scale up the cluster
cluster.scale(5)
# Scale down the cluster
cluster.scale(1)
You can autoscale the cluster
# Allow cluster to autoscale between 1 and 10 workers
cluster.adapt(minimum=1, maximum=10)
# Disable autoscaling by explicitly scaling to your desired number of workers
cluster.scale(1)
You can connect to the client
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
Finally delete the cluster by running
cluster.close()
Additional worker groups¶
Additional worker groups can also be created via the cluster manager in Python.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='foo')
cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})
We can also scale the worker groups by name from the cluster object.
cluster.scale(5, worker_group="highmem")
Additional worker groups can also be deleted in Python.
cluster.delete_worker_group(name="highmem")
Any additional worker groups you create will be deleted when the cluster is deleted.
Customising your cluster¶
The KubeCluster
class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying DaskCluster resource can be much more complex and configured in many ways.
Rather than exposing every possibility via keyword arguments instead you can pass a valid DaskCluster
resource spec which will be used when creating the cluster.
You can also generate a spec with make_cluster_spec()
which KubeCluster
uses internally and then modify it with your custom options.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
config = {
"name": "foo",
"n_workers": 2,
"resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}
cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))
You can also modify the spec before passing it to KubeCluster
, for example if you want to set nodeSelector
on your worker pods you could do it like this:
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}
cluster = KubeCluster(custom_cluster_spec=spec)
You could also have the scheduler run a Jupyter server. With this configuration you can access a Jupyter server via the Dask dashboard.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="jupyter-example", n_workers=2, env={"EXTRA_PIP_PACKAGES": "jupyterlab"})
spec["spec"]["scheduler"]["spec"]["containers"][0]["args"].append("--jupyter")
cluster = KubeCluster(custom_cluster_spec=spec)
The cluster.add_worker_group()
method also supports passing a custom_spec
keyword argument which can be generated with make_worker_spec()
.
from dask_kubernetes.operator import KubeCluster, make_worker_spec
cluster = KubeCluster(name="example")
worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}
cluster.add_worker_group(custom_spec=worker_spec)
Private container registry¶
One common use case where make_cluster_spec
comes in handy is when pulling container images from a private registry.
The Kubernetes documentation suggests creating a Secret
with your registry credentials and then set the imagePullSecrets
option in the Pod
spec.
The KubeCluster
class doesn’t expose any way to set imagePullSecrets
so we will need to generate a spec and update it before creating the cluster.
Thankfully make_pod_spec
makes this quick and painless.
$ kubectl create secret docker-registry regcred \
--docker-server=<your-registry-server> \
--docker-username=<your-name> \
--docker-password=<your-pword> \
--docker-email=<your-email>
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
# Generate the spec
spec = make_cluster_spec(name="custom", image="foo.com/jacobtomlinson/dask:latest")
# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [{"name": "regcred"}]
# Create the cluster
cluster = KubeCluster(custom_cluster_spec=spec)
Role-Based Access Control (RBAC)¶
In order to spawn a Dask cluster from a pod that runs on the cluster, the service account creating that pod will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following ClusterRole to that ServiceAccount via a ClusterRoleBinding:
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: dask-cluster-role
rules:
# Application: watching & handling for the custom resource we declare.
- apiGroups: [kubernetes.dask.org]
resources: [daskclusters, daskworkergroups, daskworkergroups/scale, daskjobs, daskautoscalers]
verbs: [get, list, watch, patch, create, delete]
# Application: other resources it needs to watch and get information from.
- apiGroups:
- "" # indicates the core API group
resources: [pods, pods/status]
verbs:
- "get"
- "list"
- "watch"
- apiGroups:
- "" # indicates the core API group
resources: [services]
verbs:
- "get"
- "list"
- "watch"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: dask-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dask-cluster-role
subjects:
- kind: ServiceAccount
name: dask-sa # adjust name based on the service account you created
API¶
|
Launch a Dask Cluster on Kubernetes using the Operator |
|
Scale cluster to n workers |
|
Turn on adaptivity |
Get logs for Dask scheduler and workers. |
|
|
Create a dask worker group by name |
Delete a dask worker group by name |
|
|
Delete the dask cluster |
- class dask_kubernetes.operator.KubeCluster(*, name: Optional[str] = None, namespace: Optional[str] = None, image: Optional[str] = None, n_workers: Optional[int] = None, resources: Optional[Dict[str, str]] = None, env: Optional[Union[List[dict], Dict[str, str]]] = None, worker_command: Optional[List[str]] = None, port_forward_cluster_ip: Optional[bool] = None, create_mode: Optional[dask_kubernetes.operator.kubecluster.kubecluster.CreateMode] = None, shutdown_on_close: Optional[bool] = None, idle_timeout: Optional[int] = None, resource_timeout: Optional[int] = None, scheduler_service_type: Optional[str] = None, custom_cluster_spec: Optional[Union[str, dict]] = None, scheduler_forward_port: Optional[int] = None, jupyter: bool = False, loop: Optional[tornado.ioloop.IOLoop] = None, asynchronous: bool = False, quiet: bool = False, **kwargs)[source]¶
Launch a Dask Cluster on Kubernetes using the Operator
This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.
- Parameters
- name: str
Name given the Dask cluster. Required except when custom_cluster_spec is passed, in which case it’s ignored in favor of custom_cluster_spec[“metadata”][“name”].
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- image: str (optional)
Image to run in Scheduler and Worker Pods.
- n_workers: int
Number of workers on initial launch. Use
scale
to change this number in the future- resources: Dict[str, str]
- env: List[dict] | Dict[str, str]
List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs
- worker_command: List[str] | str
The command to use when starting the worker. If command consists of multiple words it should be passed as a list of strings. Defaults to
"dask-worker"
.- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to
<port>
.- create_mode: CreateMode (optional)
How to handle cluster creation if the cluster resource already exists. Default behavior is to create a new cluster if one with that name doesn’t exist, or connect to an existing one if it does. You can also set
CreateMode.CREATE_ONLY
to raise an exception if a cluster with that name already exists. OrCreateMode.CONNECT_ONLY
to raise an exception if a cluster with that name doesn’t exist.- shutdown_on_close: bool (optional)
Whether or not to delete the cluster resource when this object is closed. Defaults to
True
when creating a cluster andFalse
when connecting to an existing one.- idle_timeout: int (optional)
If set Kubernetes will delete the cluster automatically if the scheduler is idle for longer than this timeout in seconds.
- resource_timeout: int (optional)
Time in seconds to wait for Kubernetes resources to enter their expected state. Example: If the
DaskCluster
resource that gets created isn’t moved into a knownstatus.phase
by the controller then it is likely the controller isn’t running or is malfunctioning and we time out and clean up with a useful error. Example 2: If the scheduler Pod enters aCrashBackoffLoop
state for longer than this timeout we give up with a useful error. Defaults to60
seconds.- scheduler_service_type: str (optional)
Kubernetes service type to use for the scheduler. Defaults to
ClusterIP
.- jupyter: bool (optional)
Start Jupyter on the scheduler node.
- custom_cluster_spec: str | dict (optional)
Path to a YAML manifest or a dictionary representation of a
DaskCluster
resource object which will be used to create the cluster instead of generating one from the other keyword arguments.- scheduler_forward_port: int (optional)
The port to use when forwarding the scheduler dashboard. Will utilize a random port by default
- quiet: bool
If enabled, suppress all printed output. Defaults to
False
.- **kwargs: dict
Additional keyword arguments to pass to LocalCluster
See also
Examples
>>> from dask_kubernetes.operator import KubeCluster >>> cluster = KubeCluster(name="foo")
You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)
You can then resize the cluster with the scale method >>> cluster.scale(10)
And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)
You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)
You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs >>> cluster.get_logs()
You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)
- Attributes
asynchronous
Are we running in the event loop?
- called_from_running_loop
- dashboard_link
- jupyter_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
([minimum, maximum])Turn on adaptivity
add_worker_group
(name[, n_workers, image, ...])Create a dask worker group by name
close
([timeout])Delete the dask cluster
delete_worker_group
(name)Delete a dask worker group by name
from_name
(name, **kwargs)Create an instance of this class to represent an existing cluster by name.
get_client
()Return client for the cluster
get_logs
()Get logs for Dask scheduler and workers.
scale
(n[, worker_group])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
generate_rich_output
logs
- adapt(minimum=None, maximum=None)[source]¶
Turn on adaptivity
- Parameters
- minimumint
Minimum number of workers
- minimumint
Maximum number of workers
Examples
>>> cluster.adapt() # Allow scheduler to add/remove workers within k8s cluster resource limits >>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
- add_worker_group(name, n_workers=3, image=None, resources=None, worker_command=None, env=None, custom_spec=None)[source]¶
Create a dask worker group by name
- Parameters
- name: str
Name of the worker group
- n_workers: int
Number of workers on initial launch. Use
.scale(n_workers, worker_group=name)
to change this number in the future.- image: str (optional)
Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.
- resources: Dict[str, str]
Resources to be passed to the underlying pods. If ommitted will use the cluster default.
- env: List[dict]
List of environment variables to pass to worker pod. If ommitted will use the cluster default.
- custom_spec: dict (optional)
A dictionary representation of a worker spec which will be used to create the
DaskWorkerGroup
instead of generating one from the other keyword arguments.
Examples
>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
- delete_worker_group(name)[source]¶
Delete a dask worker group by name
- Parameters
- name: str
Name of the worker group
Examples
>>> cluster.delete_worker_group("high-mem-workers")
- classmethod from_name(name, **kwargs)[source]¶
Create an instance of this class to represent an existing cluster by name.
Will fail if a cluster with that name doesn’t already exist.
- Parameters
- name: str
Name of the cluster to connect to
Examples
>>> cluster = KubeCluster.from_name(name="simple-cluster")
- get_logs()[source]¶
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'foo': ..., 'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., 'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., 'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} Each log will be a string of all logs for that container. To view it is recommeded that you print each log. >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.244.0.222:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- scale(n, worker_group='default')[source]¶
Scale cluster to n workers
- Parameters
- nint
Target number of workers
- worker_groupstr
Worker group to scale
Examples
>>> cluster.scale(10) # scale cluster to ten workers >>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers
- dask_kubernetes.operator.make_cluster_spec(name, image='ghcr.io/dask/dask:latest', n_workers=None, resources=None, env=None, worker_command='dask-worker', scheduler_service_type='ClusterIP', idle_timeout=0, jupyter=False)[source]¶
Generate a
DaskCluster
kubernetes resource.Populate a template with some common options to generate a
DaskCluster
kubernetes resource.- Parameters
- name: str
Name of the cluster
- image: str (optional)
Container image to use for the scheduler and workers
- n_workers: int (optional)
Number of workers in the default worker group
- resources: dict (optional)
Resource limits to set on scheduler and workers
- env: dict (optional)
Environment variables to set on scheduler and workers
- worker_command: str (optional)
Worker command to use when starting the workers
- idle_timeout: int (optional)
Timeout to cleanup idle cluster
- jupyter: bool (optional)
Start Jupyter on the Dask scheduler
HelmCluster¶
HelmCluster is for managing an existing Dask cluster which has been deployed using Helm.
Quickstart¶
First you must install the Dask Helm chart with helm
and have the cluster running.
helm repo add dask https://helm.dask.org
helm repo update
helm install myrelease dask/dask
You can then create a HelmCluster
object in Python to manage scaling the cluster and retrieve logs.
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="myrelease")
cluster.scale(10) # specify number of workers explicitly
With this cluster object you can conveniently connect a Dask dask.distributed.Client
object to the cluster
and perform your work. Provided you have API access to Kubernetes and can run the kubectl
command then
connectivity to the Dask cluster is handled automatically for you via services or port forwarding.
# Example usage
from dask.distributed import Client
import dask.array as da
# Connect Dask to the cluster
client = Client(cluster)
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute()) # Should print 1.0
For more information see the HelmCluster
API reference.
Warning
It is not possible to use HelmCluster
from the Jupyter session
which is deployed as part of the Helm Chart without first copying your
~/.kube/config
file to that Jupyter session.
API¶
|
Connect to a Dask cluster deployed via the Helm Chart. |
|
Scale cluster to n workers. |
|
Turn on adaptivity (Not recommended). |
|
- class dask_kubernetes.HelmCluster(release_name=None, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, name=None, **kwargs)[source]¶
Connect to a Dask cluster deployed via the Helm Chart.
This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.
- Parameters
- release_name: str
Name of the helm release to connect to.
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- port_forward_cluster_ip: bool (optional)
If the chart uses ClusterIP type services, forward the ports locally. If you are using
HelmCluster
from the Jupyter session that was installed by the helm chart this should beFalse
. If you are running it locally it should be the port you are forwarding to<port>
.- auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
[InCluster(), KubeConfig()]
.- scheduler_name: str (optional)
Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”.
- worker_name: str (optional)
Name of the Dask worker deployment in the current release. Defaults to “worker”.
- node_host: str (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which node to connect to.- node_port: int (optional)
A node address. Can be provided in case scheduler service type is
NodePort
and you want to manually specify which port to connect to.- **kwargs: dict
Additional keyword arguments to pass to Cluster.
See also
HelmCluster.scale
HelmCluster.logs
Examples
>>> from dask_kubernetes import HelmCluster >>> cluster = HelmCluster(release_name="myhelmrelease")
You can then resize the cluster with the scale method
>>> cluster.scale(10)
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs
>>> cluster.get_logs()
- Attributes
asynchronous
Are we running in the event loop?
- called_from_running_loop
- dashboard_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
(*args, **kwargs)Turn on adaptivity (Not recommended).
get_client
()Return client for the cluster
get_logs
()Get logs for Dask scheduler and workers.
scale
(n_workers[, worker_group])Scale cluster to n workers.
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
close
from_name
logs
- get_logs()[source]¶
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'testdask-scheduler-5c8ffb6b7b-sjgrg': ..., 'testdask-worker-64c8b78cc-992z8': ..., 'testdask-worker-64c8b78cc-hzpdc': ..., 'testdask-worker-64c8b78cc-wbk4f': ...}
Each log will be a string of all logs for that container. To view it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.1.6.131:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
- scale(n_workers, worker_group=None)[source]¶
Scale cluster to n workers.
This sets the Dask worker deployment size to the requested number. It also allows you to set the worker deployment size of another worker group. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.
Examples
>>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=4, threads=241, memory=2.95 TiB) >>> cluster.scale(4) >>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=5, threads=321, memory=3.94 TiB) >>> cluster.scale(5, worker_group="high-mem-workers") >>> cluster HelmCluster(my-dask.default, 'tcp://localhost:51481', workers=9, threads=325, memory=3.94 TiB)
Overview¶
What is the operator?¶
The Dask Operator is a small service that runs on your Kubernetes cluster and allows you to create and manage your Dask clusters as Kubernetes resources. Creating clusters can either be done via the Kubernetes API with kubectl or the Python API with KubeCluster.
To install the operator you need to apply some custom resource definitions that allow us to describe Dask resources and the operator itself which is a small Python application that
watches the Kubernetes API for events related to our custom resources and creates other resources such as Pods
and Services
accordingly.
What resources does the operator manage?¶
The operator manages a hierarchy of resources, some custom resources to represent Dask primitives like clusters and worker groups, and native Kubernetes resources such as pods and services to run the cluster processes and facilitate communication.
Worker Groups¶
A DaskWorkerGroup
represents a homogenous group of workers that can be scaled. The resource is similar to a native Kubernetes Deployment
in that it manages a group of workers
with some intelligence around the Pod
lifecycle. A worker group must be attached to a Dask Cluster resource in order to function.
All Kubernetes annotations on the
DaskWorkerGroup
resource will be passed onto worker Pod
resources. Annotations created by kopf or
kubectl (i.e. starting with “kopf.zalando.org” or “kubectl.kubernetes.io”) will not be passed on.
Clusters¶
The DaskCluster
custom resource creates a Dask cluster by creating a scheduler Pod
, scheduler Service
and default DaskWorkerGroup
which in turn creates worker Pod
resources.
Workers connect to the scheduler via the scheduler Service
and that service can also be exposed to the user in order to connect clients and perform work.
The operator also has support for creating additional worker groups. These are extra groups of workers with different configuration settings and can be scaled separately. You can then use resource annotations to schedule different tasks to different groups.
All Kubernetes annotations <https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/> on the
DaskCluster
resource will be passed onto the scheduler Pod
and Service
as well the DaskWorkerGroup
resources. Annotations created by kopf or kubectl (i.e. starting with “kopf.zalando.org” or “kubectl.kubernetes.io”)
will not be passed on.
For example you may wish to have a smaller pool of workers that have more memory for memory intensive tasks, or GPUs for compute intensive tasks.
Jobs¶
A DaskJob
is a batch style resource that creates a Pod
to perform some specific task from start to finish alongside a DaskCluster
that can be leveraged to perform the work.
All Kubernetes annotations <https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/> on the
DaskJob
resource will be passed on to the job-runner Pod
resource. If one also wants to set Kubernetes
annotations on the cluster-related resources (scheduler and worker Pods
), these can be set as
spec.cluster.metadata
in the DaskJob
resource. Annotations created by kopf or kubectl (i.e. starting with
“kopf.zalando.org” or “kubectl.kubernetes.io”) will not be passed on.
Once the job Pod
runs to completion the cluster is removed automatically to save resources. This is great for workflows like training a distributed machine learning model with Dask.
Autoscalers¶
A DaskAutoscaler
resource will communicate with the scheduler periodically and auto scale the default DaskWorkerGroup
to the desired number of workers.
Installing¶
To use the Dask Operator you must install the custom resource definitions, service account, roles, and the operator controller deployment.
Quickstart¶
$ helm install --repo https://helm.dask.org --create-namespace -n dask-operator --generate-name dask-kubernetes-operator

Installing with Helm¶
The operator has a Helm chart which can be used to manage the installation of the operator. The chart is published in the Dask Helm repo repository, and can be installed via:
$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈
$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
Then you should be able to list your Dask clusters via kubectl
.
$ kubectl get daskclusters
No resources found in default namespace.
We can also check the operator pod is running:
$ kubectl get pods -A -l app.kubernetes.io/name=dask-kubernetes-operator
NAMESPACE NAME READY STATUS RESTARTS AGE
dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s
Warning
Please note that Helm does not support updating or deleting CRDs. If updates are made to the CRD templates in future releases (to support future k8s releases, for example) you may have to manually update the CRDs or delete/reinstall the Dask Operator.
Single namespace¶
By default the controller is installed with a ClusterRole
and watches all namespaces.
You can also just install it into a single namespace by setting the following options.
$ helm install -n my-namespace --generate-name dask/dask-kubernetes-operator --set rbac.cluster=false --set kopfArgs="{--namespace=my-namespace}"
NAME: dask-kubernetes-operator-1749875935
NAMESPACE: my-namespace
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
Prometheus¶
The operator helm chart also contains some optional ServiceMonitor and PodMonitor resources to enable Prometheus scraping of Dask components. As not all clusters have the Prometheus operator installed these are disabled by default. You can enable them with the following comfig options.
metrics:
scheduler:
enabled: true
serviceMonitor:
enabled: true
worker:
enabled: true
serviceMonitor:
enabled: true
You’ll also need to ensure the container images you choose for your Dask components have the prometheus_client
library installed.
If you’re using the official Dask images you can install this at runtime.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="monitored", env={"EXTRA_PIP_PACKAGES": "prometheus_client"})
Chart Configuration Reference¶
Dask-kubernetes-operator¶
A helm chart for managing the deployment of the dask kubernetes operator and CRDs
The following table lists the configurable parameters of the Dask-kubernetes-operator chart and their default values.
Parameter |
Description |
Default |
---|---|---|
|
Docker image for the operator |
|
|
Release version |
|
|
Pull policy |
|
|
Image pull secrets for private registries |
|
|
Override release name (not including random UUID) |
|
|
Override full release name |
|
|
Create a service account for the operator to use |
|
|
Annotations to add to the service account |
|
|
The name of the service account to use. If not set and create is true, a name is generated using the fullname template. |
|
|
Create a Role/ClusterRole needed by the operator and bind it to the service account |
|
|
Creates a ClusterRole if true, else create a namespaced Role |
|
|
Extra annotations for the operator pod |
|
|
Security context for the operator pod |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Resources for the operator pod |
|
|
Volumes for the operator pod |
|
|
Volume mounts for the operator container |
|
|
Node selector |
|
|
Tolerations |
|
|
Affinity |
|
|
Priority class |
|
|
Command line flags to pass to kopf on start up |
|
|
Enable scheduler metrics. Pip package [prometheus-client](https://pypi.org/project/prometheus-client/) should be present on scheduler. |
|
|
Enable scheduler servicemonitor. |
|
|
Deploy servicemonitor in different namespace, e.g. monitoring. |
|
|
Selector to select which namespaces the Endpoints objects are discovered from. |
|
|
Additional labels to add to the ServiceMonitor metadata. |
|
|
Interval at which metrics should be scraped. |
|
|
The label to use to retrieve the job name from. |
|
|
TargetLabels transfers labels on the Kubernetes Service onto the target. |
|
|
MetricRelabelConfigs to apply to samples before ingestion. |
|
|
Enable workers metrics. Pip package [prometheus-client](https://pypi.org/project/prometheus-client/) should be present on workers. |
|
|
Enable workers podmonitor |
|
|
Deploy podmonitor in different namespace, e.g. monitoring. |
|
|
Selector to select which namespaces the Endpoints objects are discovered from. |
|
|
Additional labels to add to the PodMonitor metadata. |
|
|
Interval at which metrics should be scraped. |
|
|
The label to use to retrieve the job name from. |
|
|
PodTargetLabels transfers labels on the Kubernetes Pod onto the target. |
|
|
MetricRelabelConfigs to apply to samples before ingestion. |
|
|
|
|
|
|
Documentation generated by Frigate.
Installing with Manifests¶
If you prefer to install the operator from static manifests with kubectl
and set configuration options with tools like kustomize
you can generate the default manifests with:
$ helm template --include-crds --repo https://helm.dask.org release dask-kubernetes-operator | kubectl apply -f -
Kubeflow¶
In order to use the Dask Operator with Kubeflow you need to perform some extra installation steps.
User permissions¶
Kubeflow doesn’t know anything about our Dask custom resource definitions so we need to update the kubeflow-kubernetes-edit
cluster role. This role
allows users with cluster edit permissions to create pods, jobs and other resources and we need to add the Dask custom resources to that list. Edit the
existing clusterrole
and add a new rule to the rules
section for kubernetes.dask.org
that allows all operations on all custom resources in our API namespace.
$ kubectl patch clusterrole kubeflow-kubernetes-edit --type="json" --patch '[{"op": "add", "path": "/rules/-", "value": {"apiGroups": ["kubernetes.dask.org"],"resources": ["*"],"verbs": ["*"]}}]'
clusterrole.rbac.authorization.k8s.io/kubeflow-kubernetes-edit patched
Dashboard access¶
If you are using the Jupyter Notebook service in KubeFlow there are a couple of extra steps you need to do to be able to access the Dask dashboard. The dashboard will be running on the scheduler pod and accessible via the scheduler service, so to access that your Jupyter container will need to have the jupyter-server-proxy extension installed. If you are using the Dask Jupter Lab extension this will be installed automatically for you.
By default the proxy will only allow proxying other services running on the same host as the Jupyter server, which means you can’t access the scheduler
running in another pod. So you need to set some extra config to tell the proxy which hosts to allow. Given that we can already execute arbitrary code
in Jupyter (and therefore interact with other services within the Kubernetes cluster) we can allow all hosts in the proxy settings with
c.ServerProxy.host_allowlist = lambda app, host: True
.
The dask_kubernetes.operator.KubeCluster
and distributed.Client
objects both have a dashboard_link
attribute that you can
view to find the URL of the dashboard, and this is also used in the widgets shown in Jupyter. The default link will not work on KubeFlow so you need
to change this to "{NB_PREFIX}/proxy/{host}:{port}/status"
to ensure it uses the Jupyter proxy.
To apply these configuration options to the Jupyter pod you can create a PodDefault
configuration object that can be selected when launching the notebook. Create
a new file with the following contents.
# configure-dask-dashboard.yaml
apiVersion: "kubeflow.org/v1alpha1"
kind: PodDefault
metadata:
name: configure-dask-dashboard
spec:
selector:
matchLabels:
configure-dask-dashboard: "true"
desc: "configure dask dashboard"
env:
- name: DASK_DISTRIBUTED__DASHBOARD__LINK
value: "{NB_PREFIX}/proxy/{host}:{port}/status"
volumeMounts:
- name: jupyter-server-proxy-config
mountPath: /root/.jupyter/jupyter_server_config.py
subPath: jupyter_server_config.py
volumes:
- name: jupyter-server-proxy-config
configMap:
name: jupyter-server-proxy-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: jupyter-server-proxy-config
data:
jupyter_server_config.py: |
c.ServerProxy.host_allowlist = lambda app, host: True
Then apply this to your KubeFlow user’s namespace with kubectl
. For example with the default user@example.com
user
it would be.
$ kubectl apply -n kubeflow-user-example-com -f configure-dask-dashboard.yaml
Then when you launch your Jupyter Notebook server be sure to check the configure dask dashboard
configuration option.

Custom Resources¶
The Dask Operator has a few custom resources that can be used to create various Dask components.
DaskCluster creates a full Dask cluster with a scheduler and workers.
DaskWorkerGroup creates homogenous groups of workers,
DaskCluster
creates one by default but you can add more if you want multiple worker types.DaskJob creates a
Pod
that will run a script to completion along with aDaskCluster
that the script can leverage.
DaskCluster¶
The DaskCluster
custom resource creates a Dask cluster by creating a scheduler Pod
, scheduler Service
and default DaskWorkerGroup which in turn creates worker Pod
resources.
Let’s create an example called cluster.yaml
with the following configuration:
# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: simple
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: NodePort
selector:
dask.org/cluster-name: simple
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
Editing this file will change the default configuration of you Dask cluster. See the Configuration Reference DaskAutoscaler. Now apply cluster.yaml
$ kubectl apply -f cluster.yaml
daskcluster.kubernetes.dask.org/simple created
We can list our clusters:
$ kubectl get daskclusters
NAME AGE
simple 47s
To connect to this Dask cluster we can use the service that was created for us.
$ kubectl get svc -l dask.org/cluster-name=simple
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
simple ClusterIP 10.96.85.120 <none> 8786/TCP,8787/TCP 86s
We can see here that port 8786
has been exposed for the Dask communication along with 8787
for the Dashboard.
How you access these service endpoints will vary depending on your Kubernetes cluster configuration.
For this quick example we could use kubectl
to port forward the service to your local machine.
$ kubectl port-forward svc/simple 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
Forwarding from [::1]:8786 -> 8786
Then we can connect to it from a Python session.
>>> from dask.distributed import Client
>>> client = Client("localhost:8786")
>>> print(client)
<Client: 'tcp://10.244.0.12:8786' processes=3 threads=12, memory=23.33 GiB>
We can also list all of the pods created by the operator to run our cluster.
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 104s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 104s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 104s
simple-scheduler 1/1 Running 0 104s
The workers we see here are created by our clusters default workergroup
resource that was also created by the operator.
You can scale the workergroup
like you would a Deployment
or ReplicaSet
:
$ kubectl scale --replicas=5 daskworkergroup simple-default
daskworkergroup.kubernetes.dask.org/simple-default
We can verify that new pods have been created.
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 5m26s
simple-default-worker-a52bf313590f432d9dc7395875583b52 1/1 Running 0 27s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 5m26s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 5m26s
simple-default-worker-f4223a45b49d49288195c540c32f0fc0 1/1 Running 0 27s
simple-scheduler 1/1 Running 0 5m26s
Finally we can delete the cluster either by deleting the manifest we applied before, or directly by name:
$ kubectl delete -f cluster.yaml
daskcluster.kubernetes.dask.org "simple" deleted
$ kubectl delete daskcluster simple
daskcluster.kubernetes.dask.org "simple" deleted
DaskWorkerGroup¶
When we create a DaskCluster
resource a default worker group is created for us. But we can add more by creating more manifests. This allows us to create workers of different shapes and sizes that Dask can leverage for different tasks.
Let’s create an example called highmemworkers.yaml
with the following configuration:
# highmemworkers.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: simple-highmem
spec:
cluster: simple
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
resources:
requests:
memory: "32Gi"
limits:
memory: "32Gi"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --resources
- MEMORY=32e9
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
The main thing we need to ensure is that the cluster
option matches the name of the cluster we created earlier. This will cause
the workers to join that cluster.
See the DaskAutoscaler. Now apply highmemworkers.yaml
$ kubectl apply -f highmemworkers.yaml
daskworkergroup.kubernetes.dask.org/simple-highmem created
We can list our clusters:
$ kubectl get daskworkergroups
NAME AGE
simple-default 2 hours
simple-highmem 47s
We don’t need to worry about deleting this worker group seperately, because it has joined the existing cluster Kubernetes will delete it
when the DaskCluster
resource is deleted.
Scaling works the same was as the default worker group and can be done with the kubectl scale
command.
DaskJob¶
The DaskJob
custom resource behaves similarly to other Kubernetes batch resources.
It creates a Pod
that executes a command to completion. The difference is that the DaskJob
also creates
a DaskCluster
alongside it and injects the appropriate configuration into the job Pod
for it to
automatically connect to and leverage the Dask cluster.
Let’s create an example called job.yaml
with the following configuration:
# job.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: simple-job
namespace: default
spec:
job:
spec:
containers:
- name: job
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- python
- -c
- "from dask.distributed import Client; client = Client(); # Do some work..."
cluster:
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
env:
- name: WORKER_ENV
value: hello-world # We dont test the value, just the name
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
env:
- name: SCHEDULER_ENV
value: hello-world
service:
type: ClusterIP
selector:
dask.org/cluster-name: simple-job
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
Editing this file will change the default configuration of you Dask job. See the DaskAutoscaler. Now apply job.yaml
$ kubectl apply -f job.yaml
daskjob.kubernetes.dask.org/simple-job created
Now if we check our cluster resources we should see our job and cluster pods being created.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-scheduler 1/1 Running 0 8s
simple-job-runner 1/1 Running 0 8s
simple-job-default-worker-1f6c670fba 1/1 Running 0 8s
simple-job-default-worker-791f93d9ec 1/1 Running 0 8s
Our runner pod will be doing whatever we configured it to do. In our example you can see we just create a simple dask.distributed.Client
object like this:
from dask.distributed import Client
client = Client()
# Do some work...
We can do this because the job pod gets some additional environment variables set at runtime which tell the Client
how to connect to the cluster, so the user doesn’t need to
worry about it.
The job pod has a default restart policy of OnFalure
so if it exits with anything other than a 0
return code it will be restarted automatically until it completes successfully. When it does return a 0
it will
go into a Completed
state and the Dask cluster will be cleaned up automatically freeing up Kubernetes cluster resources.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-runner 0/1 Completed 0 14s
simple-job-scheduler 1/1 Terminating 0 14s
simple-job-default-worker-1f6c670fba 1/1 Terminating 0 14s
simple-job-default-worker-791f93d9ec 1/1 Terminating 0 14s
When you delete the DaskJob
resource everything is delete automatically, whether that’s just the Completed
runner pod left over after a successful run or a full Dask cluster and runner that is still running.
$ kubectl delete -f job.yaml
daskjob.kubernetes.dask.org "simple-job" deleted
DaskAutoscaler¶
The DaskAutoscaler
resource allows the scheduler to scale up and down the number of workers
using dask’s adaptive mode.
By creating the resource the operator controller will periodically poll the scheduler and request the desired number of workers. The scheduler calculates this number by profiling the tasks it is processing and then extrapolating how many workers it would need to complete the current graph within 5 seconds.
The controller will constrain this number between the minimum
and maximum
values configured in the DaskAutoscaler
resource
and then update the number of replicas in the default DaskWorkerGroup
.
# autoscaler.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: simple
spec:
cluster: "simple"
minimum: 1 # we recommend always having a minimum of 1 worker so that an idle cluster can start working on tasks immediately
maximum: 10 # you can place a hard limit on the number of workers regardless of what the scheduler requests
$ kubectl apply -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple created
You can end the autoscaling at any time by deleting the resource. The number of workers will remain at whatever the autoscaler last set it to.
$ kubectl delete -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple deleted
Note
The autoscaler will only scale the default WorkerGroup
. If you have additional worker groups configured they
will not be taken into account.
Labels and Annotations¶
Labels and annotations are propagated to child resources, so labels applied to a DaskCluster
will also be present on the Pod
and Service
resources it creates.
Labels/annotations on
DaskCluster
are propagated to theDaskWorkerGroup
, schedulerPod
and schedulerService
.Labels/annotations on
DaskWorkerGroup
are propagated to the workerPod
.Labels/annotations on
DaskJob
are propagated to the jobPod
andDaskCluster
.
Some resources also have subresource metadata options for setting labels and annotations on the resources it creates.
DaskCluster
hasspec.worker.metadata
which is merged into the labels/annotations for theDaskWorkerGroup
.DaskCluster
hasspec.scheduler.metadata
which is merged into the labels/annotations for the schedulerPod
and schedulerService
.DaskJob
hasspec.job.metadata
which is merged into the labels/annotations for the jobPod
.
The order of label/annotation application is top_level <= subresource <= base
.
So if the DaskCluster
has a label of foo=bar
but the spec.worker.metadata.labels
had a label of foo=baz
then the worker Pod
would have foo=baz
.
Equally, if the reserved base label dask.org/component
is set at either the top-level or subresource-level this will be overridden by the controller.
So setting dask.org/component=superworker
in DaskCluster.spec.worker.metadata.labels
will have no effect and the worker Pod
will still have the expected label of dask.org/component=worker
.
Example¶
The following DaskCluster
has top-level annotations as well as worker and scheduler subresource annotations.
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
annotations:
hello: world
spec:
worker:
replicas: 2
metadata:
annotations:
foo: bar
spec:
...
scheduler:
metadata:
annotations:
fizz: buzz
spec:
...
The resulting scheduler Pod
metadata annotations would be.
apiVersion: v1
kind: Pod
metadata:
name: example-scheduler
annotations:
fizz: buzz
hello: world
...
Full Configuration Reference¶
Full DaskCluster
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
spec:
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
scheduler:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
service: ... # ServiceSpec, standard k8s service - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#servicespec-v1-core
Full DaskWorkerGroup
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: example
spec:
cluster: "name of DaskCluster to associate worker group with"
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
Full DaskJob
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: example
spec:
job:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
cluster:
spec: ... # ClusterSpec, DaskCluster resource spec
Full DaskAutoscaler
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: example
spec:
cluster: "name of DaskCluster to autoscale"
minimum: 0 # minimum number of workers to create
maximum: 10 # maximum number of workers to create
Extending (advanced)¶
You can extend the functionality of the Dask Operator controller by writing plugins.
You may wish to do this if you want the operator to create other resources like Istio VirtualSerivce
, Gateway
and Certificate
resources.
Extra resources like this may end up being a common requirement, but given the endless possibilities of k8s cluster setups it’s hard to make this configurable.
To help cluster administrators ensure the Dask Operator does exactly what they need we support extending the controller via plugins.
Controller Design Overview¶
The Dask Operator’s controller is built using kopf which allows you to write custom handler functions in Python for any Kubernetes event.
The Dask Operator has a selection of Custom Resources and the controller handles create/update/delete events for these resources.
For example whenever a DaskCluster
resource is created the controller sets the status.phase
attribute to Created
.
@kopf.on.create("daskcluster.kubernetes.dask.org")
async def daskcluster_create(name, namespace, logger, patch, **kwargs):
"""When DaskCluster resource is created set the status.phase.
This allows us to track that the operator is running.
"""
logger.info(f"DaskCluster {name} created in {namespace}.")
patch.status["phase"] = "Created"
Then there is another handler that watches for DaskCluster
resources that have been put into this Created
phase.
This handler creates the Pod
, Service
and DaskWorkerGroup
subresources of the cluster and then puts it into a Running
phase.
@kopf.on.field("daskcluster.kubernetes.dask.org", field="status.phase", new="Created")
async def daskcluster_create_components(spec, name, namespace, logger, patch, **kwargs):
"""When the DaskCluster status.phase goes into Pending create the cluster components."""
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)
custom_api = kubernetes.client.CustomObjectsApi(api_client)
# Create scheduler Pod
data = build_scheduler_pod_spec(...)
kopf.adopt(data)
await api.create_namespaced_pod(namespace=namespace, body=data)
# Create scheduler Service
data = build_scheduler_service_spec(...)
kopf.adopt(data)
await api.create_namespaced_service(namespace=namespace, body=data)
# Create DaskWorkerGroup
data = build_worker_group_spec(...)
kopf.adopt(data)
await custom_api.create_namespaced_custom_object(group="kubernetes.dask.org", version="v1", plural="daskworkergroups", namespace=namespace, body=data)
# Set DaskCluster to Running phase
patch.status["phase"] = "Running"
Then when the DaskWorkerGroup
resource is created that triggers the worker creation event handler which creates more Pod
resources.
In turn the creation of Pod
and Service
resources will be triggering internal event handlers in Kubernetes which will create containers, set iptable rules, etc.
This model of writing small handlers that are triggered by events in Kubernetes allows you to create powerful tools with simple building blocks.
Writing your own handlers¶
To avoid users having to write their own controllers the Dask Operator controller supports loading additional handlers from other packages via entry_points
.
Custom handlers must be packaged as a Python module and be importable.
For example let’s say you have a minimal Python package with the following structure:
my_controller_plugin/
├── pyproject.toml
└── my_controller_plugin/
├── __init__.py
└── plugin.py
If you wanted to write a custom handler that would be triggered when the scheduler Service
is created then plugin.py
would look like this:
import kopf
@kopf.on.create("service", labels={"dask.org/component": "scheduler"})
async def handle_scheduler_service_create(meta, new, namespace, logger, **kwargs):
# Do something here
# See https://kopf.readthedocs.io/en/stable/handlers for documentation on what is possible here
Then you need to ensure that your pyproject.toml
registers the plugin as a dask_operator_plugin
.
...
[option.entry_points]
dask_operator_plugin =
my_controller_plugin = my_controller_plugin.plugin
Then you can package this up and push it to your preferred Python package repository.
Installing your plugin¶
When the Dask Operator controller starts up it checks for any plugins registered via the dask_operator_plugin
entry point and loads those too.
This means that installing your plugin is as simple as ensuring your plugin package is installed in the controller container image.
The controller uses the ghcr.io/dask/dask-kubernetes-operator:latest
container image by default so your custom container Dockerfile
would look something like this:
FROM ghcr.io/dask/dask-kubernetes-operator:latest
RUN pip install my-controller-plugin
Then when you install the controller deployment either via the manifest or with helm you would specify your custom container image instead.
helm install --set image.name=my_controller_image myrelease dask/dask-kubernetes-operator
Troubleshooting¶
This page contains common problems and resolutions.
Why am I losing data during scale down?¶
When scaling down a cluster the controller will attempt to coordinate with the Dask scheduler and decide which workers to remove. If the controller cannot communicate with the scheduler it will fall back to last-in-first-out scaling and will remove the worker with the lowest uptime, even if that worker is actively processing data. This can result in loss of data and recalculation of a graph.
This commonly happens if the version of Dask on the scheduler is very different to the verison on the controller.
To mitigate this Dask has an optional HTTP API which is more decoupled than the RPC and allows for better support between versions.
KubeCluster (classic)¶
Warning
This implementation of KubeCluster
is being retired and we recommend migrating to the operator based implementation.
KubeCluster
deploys Dask clusters on Kubernetes clusters using native
Kubernetes APIs. It is designed to dynamically launch ad-hoc deployments.
Quickstart¶
To launch a Dask cluster on Kubernetes with KubeCluster
you need to first configure your worker
pod specification. Then create a cluster with that spec.
from dask_kubernetes.classic import KubeCluster, make_pod_spec
pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
memory_limit='4G', memory_request='4G',
cpu_limit=1, cpu_request=1)
cluster = KubeCluster(pod_spec)
cluster.scale(10) # specify number of workers explicitly
cluster.adapt(minimum=1, maximum=100) # or dynamically scale based on current workload
You can then connect a Dask dask.distributed.Client
object to the cluster and perform your work.
# Example usage
from dask.distributed import Client
import dask.array as da
# Connect Dask to the cluster
client = Client(cluster)
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute()) # Should print 1.0
You can alternatively define your worker specification via YAML by creating a pod manifest that will be used as a template.
# worker-spec.yml
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:
- image: ghcr.io/dask/dask:latest
imagePullPolicy: IfNotPresent
args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
name: dask-worker
env:
- name: EXTRA_PIP_PACKAGES
value: git+https://github.com/dask/distributed
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
from dask_kubernetes.classic import KubeCluster
cluster = KubeCluster('worker-spec.yml')
cluster.scale(10)
For more information see the KubeCluster
API reference.
Best Practices¶
Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See
dask_kubernetes.classic.KubeCluster
docstring for guidance on how to check and modify this.Your Kubernetes resource limits and requests should match the
--memory-limit
and--nthreads
parameters given to thedask-worker
command. Otherwise your workers may get killed by Kubernetes as they pack into the same node and overwhelm that nodes’ available memory, leading toKilledWorker
errors.We recommend adding the
--death-timeout, '60'
arguments and therestartPolicy: Never
attribute to your worker specification. This ensures that these pods will clean themselves up if your Python process disappears unexpectedly.
GPUs¶
Because dask-kubernetes
uses standard kubernetes pod specifications, we can
use kubernetes device plugins
and add resource limits defining the number of GPUs per pod/worker.
Additionally, we can also use tools like dask-cuda for optimized Dask/GPU interactions.
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:
- image: nvcr.io/nvidia/rapidsai/rapidsai-core:23.04-cuda11.8-runtime-ubuntu22.04-py3.10
imagePullPolicy: IfNotPresent
args: [dask-cuda-worker, $(DASK_SCHEDULER_ADDRESS), --rmm-pool-size, 10GB]
name: dask-cuda
resources:
limits:
cpu: "2"
memory: 6G
nvidia.com/gpu: 1 # requesting 1 GPU
requests:
cpu: "2"
memory: 6G
nvidia.com/gpu: 1 # requesting 1 GPU
Configuration¶
You can use Dask’s configuration to control the behavior of Dask-kubernetes. You can see a full set of configuration options here. Some notable ones are described below:
kubernetes.worker-template-path
: a path to a YAML file that holds a Pod spec for the worker. If provided then this will be used whendask_kubernetes.classic.KubeCluster
is called with no arguments:cluster = KubeCluster() # reads provided yaml file
distributed.dashboard.link
: a Python pre-formatted string that shows the location of Dask’s dashboard. This string will receive values forhost
,port
, and all environment variables.For example this is useful when using dask-kubernetes with JupyterHub and nbserverproxy to route the dashboard link to a proxied address as follows:
"{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"
kubernetes.worker-name
: a Python pre-formatted string to use when naming dask worker pods. This string will receive values foruser
,uuid
, and all environment variables. This is useful when you want to have control over the naming convention for your pods and use other tokens from the environment. For example when using zero-to-jupyterhub every user is calledjovyan
and so you may wish to usedask-{JUPYTERHUB_USER}-{uuid}
instead ofdask-{user}-{uuid}
. Ensure you keep the ``uuid`` somewhere in the template.
Role-Based Access Control (RBAC)¶
In order to spawn a Dask cluster, the service account creating those pods will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following Role to that ServiceAccount via a RoleBinding:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: daskKubernetes
rules:
- apiGroups:
- "" # indicates the core API group
resources:
- "pods"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
- apiGroups:
- "" # indicates the core API group
resources:
- "pods/log"
verbs:
- "get"
- "list"
- apiGroups:
- "" # indicates the core API group
resources:
- "services"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
- apiGroups:
- "policy" # indicates the policy API group
resources:
- "poddisruptionbudgets"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
Docker Images¶
Example Dask docker images ghcr.io/dask/dask and ghcr.io/dask/dask-notebook are available on https://github.com/orgs/dask/packages . More information about these images is available at the Dask documentation.
Note that these images can be further customized with extra packages using
EXTRA_PIP_PACKAGES
, EXTRA_APT_PACKAGES
, and EXTRA_CONDA_PACKAGES
as described in the
Extensibility section.
Deployment Details¶
Scheduler¶
Before workers are created a scheduler will be deployed with the following resources:
A pod with a scheduler running
A service (svc) to expose scheduler and dashboard ports
A PodDisruptionBudget avoid voluntary disruptions of the scheduler pod
By default the Dask configuration option kubernetes.scheduler-service-type
is
set to ClusterIp
. In order to connect to the scheduler the KubeCluster
will first attempt to connect directly,
but this will only be successful if dask-kubernetes
is being run from within the Kubernetes cluster.
If it is unsuccessful it will attempt to port forward the service locally using the kubectl
utility.
If you update the service type to NodePort
. The scheduler will be exposed on the same random high port on all
nodes in the cluster. In this case KubeCluster
will attempt to list nodes in order to get an IP to connect on
and requires additional permissions to do so.
- apiGroups:
- "" # indicates the core API group
resources:
- "nodes"
verbs:
- "get"
- "list"
If you set the service type to LoadBalancer
then KubeCluster
will connect to the external address of the assigned
loadbalancer, but this does require that your Kubernetes cluster has the appropriate operator to assign loadbalancers.
Legacy mode¶
For backward compatibility with previous versions of dask-kubernetes
it is also possible to run the scheduler locally.
A local
scheduler is created where the Dask client will be created.
from dask_kubernetes.classic import KubeCluster
from dask.distributed import Client
cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='local')
cluster.scale(10)
client = Client(cluster)
In this mode the Dask workers will attempt to connect to the machine where you are running dask-kubernetes
.
Generally this will need to be within the Kubernetes cluster in order for the workers to make a successful connection.
Workers¶
Workers are created directly as simple pods. These worker pods are configured
to shutdown if they are unable to connect to the scheduler for 60 seconds.
The pods are cleaned up when close()
is called,
or the scheduler process exits.
The pods are created with two default tolerations:
k8s.dask.org/dedicated=worker:NoSchedule
k8s.dask.org_dedicated=worker:NoSchedule
If you have nodes with the corresponding taints, then the worker pods will schedule to those nodes (and no other pods will be able to schedule to those nodes).
API¶
|
Launch a Dask cluster on Kubernetes |
|
Turn on adaptivity |
|
Create cluster with worker pod spec defined by Python dictionary |
|
Create cluster with worker pod spec defined by a YAML file |
|
Return logs for the cluster, scheduler and workers |
Scale cluster to n workers |
|
Configure the Kubernetes connection from a container's environment. |
|
|
Configure the Kubernetes connection from a kubeconfig file. |
|
Configure the Kubernetes connection explicitly. |
|
Create generic pod template from input parameters |
- class dask_kubernetes.KubeCluster(pod_template=None, name=None, namespace=None, n_workers=None, host=None, port=None, env=None, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], idle_timeout=None, deploy_mode=None, interface=None, protocol=None, dashboard_address=None, security=None, scheduler_service_wait_timeout=None, scheduler_service_name_resolution_retries=None, scheduler_pod_template=None, apply_default_affinity='preferred', **kwargs)[source]¶
Launch a Dask cluster on Kubernetes
This starts a local Dask scheduler and then dynamically launches Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken to be either the current one on which this code is running, or as a fallback, the default one configured in a kubeconfig file.
Environments
Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See examples below for suggestions on how to manage and check for this.
Network
Since the Dask scheduler is launched locally, for it to work, we need to be able to open network connections between this local node and all the workers nodes on the Kubernetes cluster. If the current process is not already on a Kubernetes node, some network configuration will likely be required to make this work.
Resources
Your Kubernetes resource limits and requests should match the
--memory-limit
and--nthreads
parameters given to thedask-worker
command.- Parameters
- pod_template: (kubernetes.client.V1Pod, dict, str)
A Kubernetes specification for a Pod for a dask worker. Can be either a
V1Pod
, a dict representation of a pod, or a path to a yaml file containing a pod specification.- scheduler_pod_template: kubernetes.client.V1Pod (optional)
A Kubernetes specification for a Pod for a dask scheduler. Defaults to the pod_template.
- name: str (optional)
Name given to the pods. Defaults to
dask-$USER-random
- namespace: str (optional)
Namespace in which to launch the workers. Defaults to current namespace if available or “default”
- n_workers: int
Number of workers on initial launch. Use
scale
to change this number in the future- env: Dict[str, str]
Dictionary of environment variables to pass to worker pod
- host: str
Listen address for local scheduler. Defaults to 0.0.0.0
- port: int
Port of local scheduler
- auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
[InCluster(), KubeConfig()]
.- idle_timeout: str (optional)
The scheduler task will exit after this amount of time if there are no requests from the client. Default is to never timeout.
- scheduler_service_wait_timeout: int (optional)
Timeout, in seconds, to wait for the remote scheduler service to be ready. Defaults to 30 seconds. Set to 0 to disable the timeout (not recommended).
- scheduler_service_name_resolution_retries: int (optional)
Number of retries to resolve scheduler service name when running from within the Kubernetes cluster. Defaults to 20. Must be set to 1 or greater.
- deploy_mode: str (optional)
Run the scheduler as “local” or “remote”. Defaults to
"remote"
.- apply_default_affinity: str (optional)
Apply a default affinity to pods: “required”, “preferred” or “none” Defaults to
"preferred"
.- **kwargs: dict
Additional keyword arguments to pass to SpecCluster
See also
KubeCluster.adapt
Examples
>>> from dask_kubernetes.classic import KubeCluster, make_pod_spec >>> pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest', ... memory_limit='4G', memory_request='4G', ... cpu_limit=1, cpu_request=1, ... env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'}) >>> cluster = KubeCluster(pod_spec) >>> cluster.scale(10)
You can also create clusters with worker pod specifications as dictionaries or stored in YAML files
>>> cluster = KubeCluster('worker-template.yml') >>> cluster = KubeCluster({...})
Rather than explicitly setting a number of workers you can also ask the cluster to allocate workers dynamically based on current workload
>>> cluster.adapt()
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can verify that your local environment matches your worker environments by calling
client.get_versions(check=True)
. This will raise an informative error if versions do not match.>>> client.get_versions(check=True)
The
ghcr.io/dask/dask
docker images supportEXTRA_PIP_PACKAGES
,EXTRA_APT_PACKAGES
andEXTRA_CONDA_PACKAGES
environment variables to help with small adjustments to the worker environments. We recommend the use of pip over conda in this case due to a much shorter startup time. These environment variables can be modified directly from the KubeCluster constructor methods using theenv=
keyword. You may list as many packages as you like in a single string like the following:>>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed' >>> conda = '-c conda-forge scikit-learn' >>> KubeCluster(..., env={'EXTRA_PIP_PACKAGES': pip, ... 'EXTRA_CONDA_PACKAGES': conda})
You can also start a KubeCluster with no arguments if the worker template is specified in the Dask config files, either as a full template in
kubernetes.worker-template
or a path to a YAML file inkubernetes.worker-template-path
.See https://docs.dask.org/en/latest/configuration.html for more information about setting configuration values.:
$ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml
>>> cluster = KubeCluster() # automatically finds 'worker_template.yaml'
- Attributes
asynchronous
Are we running in the event loop?
- called_from_running_loop
- dashboard_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
Methods
adapt
([Adaptive, minimum, maximum, ...])Turn on adaptivity
from_dict
(pod_spec, **kwargs)Create cluster with worker pod spec defined by Python dictionary
from_name
(name)Create an instance of this class to represent an existing cluster by name.
from_yaml
(yaml_path, **kwargs)Create cluster with worker pod spec defined by a YAML file
get_client
()Return client for the cluster
get_logs
([cluster, scheduler, workers])Return logs for the cluster, scheduler and workers
new_worker_spec
()Return name and spec for the next worker
scale
(n)Scale cluster to n workers
scale_up
([n, memory, cores])Scale cluster to n workers
sync
(func, *args[, asynchronous, ...])Call func with args synchronously or asynchronously depending on the calling context
wait_for_workers
(n_workers[, timeout])Blocking call to wait for n workers before continuing
close
logs
scale_down
- classmethod from_dict(pod_spec, **kwargs)[source]¶
Create cluster with worker pod spec defined by Python dictionary
Deprecated, please use the KubeCluster constructor directly.
See also
Examples
>>> spec = { ... 'metadata': {}, ... 'spec': { ... 'containers': [{ ... 'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)', ... '--nthreads', '1', ... '--death-timeout', '60'], ... 'command': None, ... 'image': 'ghcr.io/dask/dask:latest', ... 'name': 'dask-worker', ... }], ... 'restartPolicy': 'Never', ... } ... } >>> cluster = KubeCluster.from_dict(spec, namespace='my-ns')
- classmethod from_yaml(yaml_path, **kwargs)[source]¶
Create cluster with worker pod spec defined by a YAML file
Deprecated, please use the KubeCluster constructor directly.
We can start a cluster with pods defined in an accompanying YAML file like the following:
kind: Pod metadata: labels: foo: bar baz: quux spec: containers: - image: ghcr.io/dask/dask:latest name: dask-worker args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB] restartPolicy: Never
See also
Examples
>>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns')
- class dask_kubernetes.ClusterAuth[source]¶
An abstract base class for methods for configuring a connection to a Kubernetes API server.
Examples
>>> from dask_kubernetes import KubeConfig >>> auth = KubeConfig(context='minikube')
>>> from dask_kubernetes import KubeAuth >>> auth = KubeAuth(host='https://localhost', username='superuser', password='pass')
Methods
load
()Load Kubernetes configuration and set as default
load_first
([auth])Load the first valid configuration in the list auth.
- class dask_kubernetes.InCluster[source]¶
Configure the Kubernetes connection from a container’s environment.
This authentication method is intended for use when the client is running in a container started by Kubernetes with an authorized service account. This loads the mounted service account token and discovers the Kubernetes API via Kubernetes service discovery.
Methods
load
()Load Kubernetes configuration and set as default
load_first
([auth])Load the first valid configuration in the list auth.
- class dask_kubernetes.KubeConfig(config_file=None, context=None, persist_config=True)[source]¶
Configure the Kubernetes connection from a kubeconfig file.
- Parameters
- config_file: str (optional)
The path of the kubeconfig file to load. Defaults to the value of the
KUBECONFIG
environment variable, or the string~/.kube/config
.- context: str (optional)
The kubeconfig context to use. Defaults to the value of
current-context
in the configuration file.- persist_config: bool (optional)
Whether changes to the configuration will be saved back to disk (e.g. GCP token refresh). Defaults to
True
.
Methods
get_kube_config_loader_for_yaml_file
()load
()Load Kubernetes configuration and set as default
load_first
([auth])Load the first valid configuration in the list auth.
load_kube_config
()
- class dask_kubernetes.KubeAuth(host, **kwargs)[source]¶
Configure the Kubernetes connection explicitly.
- Parameters
- host: str
The base URL of the Kubernetes host to connect
- username: str (optional)
Username for HTTP basic authentication
- password: str (optional)
Password for HTTP basic authentication
- debug: bool (optional)
Debug switch
- verify_ssl: bool (optional)
Set this to false to skip verifying SSL certificate when calling API from https server. Defaults to
True
.- ssl_ca_cert: str (optional)
Set this to customize the certificate file to verify the peer.
- cert_file: str (optional)
Client certificate file
- key_file: str (optional)
Client key file
- assert_hostname: bool (optional)
Set this to True/False to enable/disable SSL hostname verification. Defaults to True.
- proxy: str (optional)
URL for a proxy to connect through
Methods
load
()Load Kubernetes configuration and set as default
load_first
([auth])Load the first valid configuration in the list auth.
- dask_kubernetes.make_pod_spec(image, labels={}, threads_per_worker=1, env={}, extra_container_config={}, extra_pod_config={}, resources=None, memory_limit=None, memory_request=None, cpu_limit=None, cpu_request=None, gpu_limit=None, annotations={})[source]¶
Create generic pod template from input parameters
- Parameters
- imagestr
Docker image name
- labelsdict
Dict of labels to pass to
V1ObjectMeta
- threads_per_workerint
Number of threads per each worker
- envdict
Dict of environment variables to pass to
V1Container
- extra_container_configdict
Extra config attributes to set on the container object
- extra_pod_configdict
Extra config attributes to set on the pod object
- resourcesstr
Resources for task constraints like “GPU=2 MEM=10e9”. Resources are applied separately to each worker process (only relevant when starting multiple worker processes. Passed to the –resources option in
dask-worker
.- memory_limitint, float, or str
Bytes of memory per process that the worker can use (applied to both
dask-worker --memory-limit
andspec.containers[].resources.limits.memory
). This can be:an integer (bytes), note 0 is a special case for no memory management.
a float (bytes). Note: fraction of total system memory is not supported by k8s.
a string (like 5GiB or 5000M). Note: ‘GB’ is not supported by k8s.
‘auto’ for automatically computing the memory limit. [default: auto]
- memory_requestint, float, or str
Like
memory_limit
(applied only tospec.containers[].resources.requests.memory
and ignored bydask-worker
).- cpu_limitfloat or str
CPU resource limits (applied to
spec.containers[].resources.limits.cpu
).- cpu_requestfloat or str
CPU resource requests (applied to
spec.containers[].resources.requests.cpu
).- gpu_limitint
GPU resource limits (applied to
spec.containers[].resources.limits."nvidia.com/gpu"
).- annotationsdict
Dict of annotations passed to
V1ObjectMeta
- Returns
- podV1PodSpec
Examples
>>> make_pod_spec(image='ghcr.io/dask/dask:latest', memory_limit='4G', memory_request='4G')
Migrating to the operator¶
The KubeCluster
class is being replaced with a new version that is built using the Kubernetes Operator pattern.
We encourage all users of the classic implementation to migrate to the new version as it is generally unmaintained and will be removed in a future release.
Why should you migrate?¶
You may be thinking “why do I have to do this?” and “the old version works just fine” so let’s take a moment to unpack why we have rebuilt KubeCluster
and to hopefully convince you that this is a good decision and worth the effort.
The original implementation of KubeCluster
was created shortly after Kubernetes went 1.0
and best practice design patterns were still emerging.
While it has been updated over time it has been more and more difficult to maintain due to the way it was designed.
We decided to completely rebuild dask-kubernetes
with the operator pattern in mind which is now an established design pattern for building composable Kubernetes applications.
Here are some reasons why we decided to make this change:
Top level
DaskCluster
resource - Cascade deletionSimpler Python API
More powerful YAML API - Create, scale and delete clusters with
kubectl
Detatch and reattactch from running clusters
New resource types like
DaskJob
Multiple worker groups
Autoscaling handled by the controller and not the cluster manager
For more information watch the Dask blog for the Dask Kubernetes Operator announcement blog post.
Installing the operator¶
To use the new implementation of KubeCluster
you need to install the Dask operator custom resources and controller.
The custom resources allow us to describe our Dask cluster components as native Kubernetes resources rather than directly creating Pod
and Service
resources like the classic implementation does.
Unfortunately this requires a small amount of first time setup on you Kubernetes cluster before you can start using dask-kubernetes
. This is a key reason why the new implementation has breaking changes.
The quickest way to install things is with helm
.
$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈
$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
Now that you have the controller and CRDs installed on your cluster you can start using the new dask_kubernetes.operator.KubeCluster
.
Using the new KubeCluster¶
The way you create clusters with KubeCluster
has changed so let’s look at some comparisons and explore how to migrate from the classic to the new.
Simplified Python API¶
One of the first big changes we’ve made is making simple use cases simpler. The only thing you need to create a minimal cluster is to give it a name.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="mycluster")
The first step we see folks take in customising their clusters is to modify things like the container image, environment variables, resources, etc. We’ve made all of the most common options available as keyword arguments to make small changes easier.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="mycluster",
image='ghcr.io/dask/dask:latest',
n_workers=3
env={"FOO": "bar"},
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})
Advanced YAML API¶
We’ve taken care to simplify the API for new users, but we have also worked hard to ensure the new implementation provides even more flexibility for advanced users.
Users of the classic implementation of KubeCluster
have a lot of control over what the worker pods look like because you are required
to provide a full YAML Pod
spec. Instead of creating a loose collection of Pod
resources directly the new implementation groups everything together into a DaskCluster
custom resource.
This resource contains some cluster configuration options and nested specs for the worker pods and scheduler pod/service.
This way things are infinitely configurable, just be careful not to shooot yourself in the foot.
The classic getting started page had the following pod spec example:
# worker-spec.yml
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:
- image: ghcr.io/dask/dask:latest
imagePullPolicy: IfNotPresent
args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
name: dask-worker
env:
- name: EXTRA_PIP_PACKAGES
value: git+https://github.com/dask/distributed
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
In the new implementation a cluster spec with the same options would look like this:
# cluster-spec.yml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
labels:
foo: bar
spec:
worker:
replicas: 2
spec:
restartPolicy: Never
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60', '--name', $(DASK_WORKER_NAME)]
env:
- name: EXTRA_PIP_PACKAGES
value: git+https://github.com/dask/distributed
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: ClusterIP
selector:
dask.org/cluster-name: example
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
Note that the spec.worker.spec
section of the new cluster spec matches the spec
of the old pod spec. But as you can see there is a lot more configuration available in this example including first-class control over the scheduler pod and service.
One powerful difference of using our own custom resources is that everything about our cluster is contained in the DaskCluster
spec and all of the cluster lifecycle logic is handled by our custom controller in Kubernetes.
This means we can equally create our cluster with Python or via the kubectl
CLI.
You don’t even need to have dask-kubernetes
installed to manage your clusters if you have other Kubernetes tooling that you would like to integrate with natively.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(custom_cluster_spec="cluster-spec.yml")
Is the same as:
$ kubectl apply -f cluster-spec.yml
You can still connect to the cluster created via kubectl
back in Python by name and have all of the convenience of using a cluster manager object.
from dask.distributed import Client
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster.from_name("example")
cluster.scale(5)
client = Client(cluster)
Middle ground¶
There is also a middle ground for users who would prefer to stay in Python and have much of the spec generated for them, but still want to be able to make complex customisations.
When creating a new KubeCluster
with keyword arguments those arguments are passed to a call to dask_kubernetes.operator.make_cluster_spec
which is similar to dask_kubernetes.make_pod_spec
that you may have used in the past.
This function generates a dictionary representation of your DaskCluster
spec which you can modify and pass to KubeCluster
yourself.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
cluster = KubeCluster(name="foo", n_workers= 2, env={"FOO": "bar"})
# is equivalent to
spec = make_cluster_spec(name="foo", n_workers= 2, env={"FOO": "bar"})
cluster = KubeCluster(custom_cluster_spec=spec)
This is useful if you want the convenience of keyword arguments for common options but still have the ability to make advanced tweaks like setting nodeSelector
options on the worker pods.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}
cluster = KubeCluster(custom_cluster_spec=spec)
This can also enable you to migrate smoothly over from the existing tooling if you are using make_pod_spec
as the classic pod spec is a subset of the new cluster spec.
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from dask_kubernetes.classic import make_pod_spec
# generate your existing classic pod spec
pod_spec = make_pod_spec(**your_custom_options)
pod_spec[...] = ... # Your existing tweaks to the pod spec
# generate a new cluster spec and merge in the existing pod spec
cluster_spec = make_cluster_spec(name="merge-example")
cluster_spec["spec"]["worker"]["spec"] = pod_spec["spec"]
cluster = KubeCluster(custom_cluster_spec=cluster_spec)
Troubleshooting¶
Moving from the classic implementation to the new operator based implementation will require some effort on your part. Sorry about that.
Hopefully this guide has given you enough information that you are motivated and able to make the change. However if you get stuck or you would like input from a Dask maintainer please don’t hesitate to reach out to us via the Dask Forum.
Testing¶
Running the test suite for dask-kubernetes
doesn’t require an existing Kubernetes cluster but does require
Docker, kubectl and helm.
Start by installing dask-kubernetes in editable mode - this will ensure that pytest can import dask-kubernetes:
$ pip install -e .
You will also need to install the test dependencies:
$ pip install -r requirements-test.txt
Tests are run using pytest:
$ pytest
============================================== test session starts ==============================================
platform darwin -- Python 3.8.8, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 --
cachedir: .pytest_cache
rootdir: /Users/jtomlinson/Projects/dask/dask-kubernetes, configfile: setup.cfg
plugins: anyio-2.2.0, asyncio-0.14.0, kind-21.1.3
collected 64 items
...
================= 56 passed, 1 skipped, 6 xfailed, 1 xpassed, 53 warnings in 404.19s (0:06:44) ==================
Note
Running pytest
compiles the Custom Resource Definitions from source using k8s-crd-resolver
, tests against them and then uninstalls them. You may have to install them again manually.
Kind¶
To test dask-kubernetes
against a real Kubernetes cluster we use the pytest-kind plugin.
Kind stands for Kubernetes in Docker and will create a full Kubernetes cluster within a single Docker container on your system.
Kubernetes will then make use of the lower level containerd runtime to start additional containers, so your Kubernetes pods will not
appear in your docker ps
output.
By default we set the --keep-cluster
flag in setup.cfg
which means the Kubernetes container will persist between pytest
runs
to avoid creation/teardown time. Therefore you may want to manually remove the container when you are done working on dask-kubernetes
:
$ docker stop pytest-kind-control-plane
$ docker rm pytest-kind-control-plane
When you run the tests for the first time a config file will be created called .pytest-kind/pytest-kind/kubeconfig
which is used for authenticating
with the Kubernetes cluster running in the container. If you wish to inspect the cluster yourself for debugging purposes you can set the environment
variable KUBECONFIG
to point to that file, then use kubectl
or helm
as normal:
$ export KUBECONFIG=.pytest-kind/pytest-kind/kubeconfig
$ kubectl get nodes
NAME STATUS ROLES AGE VERSION
pytest-kind-control-plane Ready control-plane,master 10m v1.20.2
$ helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
Docker image¶
Within the test suite there is a fixture which creates a Docker image called dask-kubernetes:dev
from this Dockerfile.
This image will be imported into the kind cluster and then be used in all Dask clusters created.
This is the official Dask Docker image but with the very latest trunks of dask
and distrubuted
installed. It is recommended that you also have the
latest development install of those projects in your local development environment too.
This image may go stale over time so you might want to periodically delete it to ensure it gets recreated with the latest code changes:
$ docker rmi dask-kubernetes:dev
Linting and formatting¶
To accept Pull Requests to dask-kubernetes
we require that they pass black
formatting and flake8
linting.
To save developer time we support using pre-commit which runs black
and flake8
every time
you attempt to locally commit code:
$ pip install pre-commit
$ pre-commit install
Testing Operator Controller PRs¶
Sometimes you may want to try out a PR of changes made to the operator controller before it has been merged.
To do this you’ll need to build a custom Docker image and push it to a registry that your k8s cluster can pull from.
The custom image needs to take the latest stable release of the controller and install the development branch into it. You can do this directly from GitHub repos with pip or you can copy your local files in and install that.
FROM ghcr.io/dask/dask-kubernetes-operator:<latest stable release>
RUN pip install git+https://github.com/dask/dask-kubernetes.git@refs/pull/<PR>/head
$ docker build -t <image>:<tag> .
$ docker push -t <image>:<tag> .
Then you can use helm
to install the controller with your custom image.
$ helm install --repo https://helm.dask.org \
--create-namespace \
-n dask-operator \
--generate-name \
dask-kubernetes-operator \
--set image.name=<image> \
--set image.tag=<tag>
Releasing¶
Releases are published automatically when a tag is pushed to GitHub.
# Set next version number
export RELEASE=x.x.x
# Create tags
git commit --allow-empty -m "Release $RELEASE"
git tag -a $RELEASE -m "Version $RELEASE"
# Push
git push upstream --tags
History¶
This repository was originally inspired by a Dask+Kubernetes solution within the Jade (Jupyter and Dask Environemt) project out of the UK Met office Informatics Lab. This Dask + Kubernetes solution was primarily developed by Jacob Tomlinson of the Informatics Lab and Matt Pryor of the STFC and funded by NERC.
It was then adapted by Yuvi Panda at the UC Berkeley Institute for Data Science (BIDS) and DSEP programs while using it with JupyterHub on the Pangeo project. It was then brought under the Dask github organization where it lives today.
This repository was originally named daskernetes to avoid conflict with an older, Google Cloud Platform specific solution named dask-kubernetes. Eventually this package superceded that one and took on the name dask-kubernetes.


