Kube Cluster¶
Quickstart¶
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.scale(10) # specify number of workers explicitly
cluster.adapt(minimum=1, maximum=100) # or dynamically scale based on current workload
# worker-spec.yml
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:
- image: daskdev/dask:latest
imagePullPolicy: IfNotPresent
args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
name: dask
env:
- name: EXTRA_PIP_PACKAGES
value: git+https://github.com/dask/distributed
resources:
limits:
cpu: "2"
memory: 6G
requests:
cpu: "2"
memory: 6G
# Example usage
from dask.distributed import Client
import dask.array as da
# Connect Dask to the cluster
client = Client(cluster)
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute()) # Should print 1.0
For more information see the KubeCluster
API reference.
Best Practices¶
- Your worker pod image should have a similar environment to your local
environment, including versions of Python, dask, cloudpickle, and any
libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn).
See
dask_kubernetes.KubeCluster
docstring for guidance on how to check and modify this. - Your Kubernetes resource limits and requests should match the
--memory-limit
and--nthreads
parameters given to thedask-worker
command. Otherwise your workers may get killed by Kubernetes as they pack into the same node and overwhelm that nodes’ available memory, leading toKilledWorker
errors. - We recommend adding the
--death-timeout, '60'
arguments and therestartPolicy: Never
attribute to your worker specification. This ensures that these pods will clean themselves up if your Python process disappears unexpectedly.
GPUs¶
Because dask-kubernetes
uses standard kubernetes pod specifications, we can
use kubernetes device plugins
and add resource limits defining the number of GPUs per pod/worker.
Additionally, we can also use tools like dask-cuda for optimized Dask/GPU interactions.
kind: Pod
metadata:
labels:
foo: bar
spec:
restartPolicy: Never
containers:
- image: rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8
imagePullPolicy: IfNotPresent
args: [dask-cuda-worker, $(DASK_SCHEDULER_ADDRESS), --rmm-pool-size, 10GB]
name: dask-cuda
resources:
limits:
cpu: "2"
memory: 6G
nvidia.com/gpu: 1 # requesting 1 GPU
requests:
cpu: "2"
memory: 6G
Configuration¶
You can use Dask’s configuration to control the behavior of Dask-kubernetes. You can see a full set of configuration options here. Some notable ones are described below:
kubernetes.worker-template-path
: a path to a YAML file that holds a Pod spec for the worker. If provided then this will be used whendask_kubernetes.KubeCluster
is called with no arguments:cluster = KubeCluster() # reads provided yaml file
distributed.dashboard.link
: a Python pre-formatted string that shows the location of Dask’s dashboard. This string will receive values forhost
,port
, and all environment variables.For example this is useful when using dask-kubernetes with JupyterHub and nbserverproxy to route the dashboard link to a proxied address as follows:
"{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"
kubernetes.worker-name
: a Python pre-formatted string to use when naming dask worker pods. This string will receive values foruser
,uuid
, and all environment variables. This is useful when you want to have control over the naming convention for your pods and use other tokens from the environment. For example when using zero-to-jupyterhub every user is calledjovyan
and so you may wish to usedask-{JUPYTERHUB_USER}-{uuid}
instead ofdask-{user}-{uuid}
. Ensure you keep the ``uuid`` somewhere in the template.
Role-Based Access Control (RBAC)¶
In order to spawn worker (or separate scheduler) Dask pods, the service account creating those pods will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following Role to that ServiceAccount via a RoleBinding:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: daskKubernetes
rules:
- apiGroups:
- "" # indicates the core API group
resources:
- "pods"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
- apiGroups:
- "" # indicates the core API group
resources:
- "pods/log"
verbs:
- "get"
- "list"
If you intend to use the newer Dask functionality in which the scheduler is created in its own pod and accessed via a service, you will also need:
- apiGroups:
- "" # indicates the core API group
resources:
- "services"
verbs:
- "get"
- "list"
- "watch"
- "create"
- "delete"
Docker Images¶
Example Dask docker images daskdev/dask and daskdev/dask-notebook are available on https://hub.docker.com/r/daskdev . More information about these images is available at the Dask documentation.
Note that these images can be further customized with extra packages using
EXTRA_PIP_PACKAGES
, EXTRA_APT_PACKAGES
, and EXTRA_CONDA_PACKAGES
as described in the
Extensibility section.
Deployment Details¶
Workers¶
Workers are created directly as simple pods. These worker pods are configured
to shutdown if they are unable to connect to the scheduler for 60 seconds.
The pods are cleaned up when close()
is called,
or the scheduler process exits.
The pods are created with two default tolerations:
k8s.dask.org/dedicated=worker:NoSchedule
k8s.dask.org_dedicated=worker:NoSchedule
If you have nodes with the corresponding taints, then the worker pods will schedule to those nodes (and no other pods will be able to schedule to those nodes).
Scheduler¶
The scheduler can be deployed locally (default) or remotely. A local
scheduler is created where the Dask client will be created.
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='local')
cluster.scale(10)
client = Client(cluster)
The scheduler can also be deployed on the kubernetes cluster with
deploy_mode=remote
:
import dask
from dask_kubernetes import KubeCluster
from dask.distributed import Client
cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='remote')
cluster.scale(10)
client = Client(cluster)
When deploying remotely, the following k8s resources are created:
- A pod with a scheduler running
- (optional) A pod with a LoadBalancer and complimentary service (svc) to expose scheduler and dashboard ports
- A PodDisruptionBudget avoid voluntary disruptions of the scheduler pod
By default, the configuration option, scheduler-service-type
, is
set to ClusterIp
. To optionally use a LoadBalancer, change scheduler-service-type
to
LoadBalancer
. This change can either be done with the dask-kubernetes
configuration file or programmatically with dask.config.set
:
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})