Custom Resources
Contents
Custom Resources¶
The Dask Operator has a few custom resources that can be used to create various Dask components.
DaskCluster creates a full Dask cluster with a scheduler and workers.
DaskWorkerGroup creates homogenous groups of workers,
DaskCluster
creates one by default but you can add more if you want multiple worker types.DaskJob creates a
Pod
that will run a script to completion along with aDaskCluster
that the script can leverage.
DaskCluster¶
The DaskCluster
custom resource creates a Dask cluster by creating a scheduler Pod
, scheduler Service
and default DaskWorkerGroup which in turn creates worker Pod
resources.
Let’s create an example called cluster.yaml
with the following configuration:
# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: simple
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: NodePort
selector:
dask.org/cluster-name: simple
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
Editing this file will change the default configuration of you Dask cluster. See the Configuration Reference DaskAutoscaler. Now apply cluster.yaml
$ kubectl apply -f cluster.yaml
daskcluster.kubernetes.dask.org/simple created
We can list our clusters:
$ kubectl get daskclusters
NAME AGE
simple 47s
To connect to this Dask cluster we can use the service that was created for us.
$ kubectl get svc -l dask.org/cluster-name=simple
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
simple ClusterIP 10.96.85.120 <none> 8786/TCP,8787/TCP 86s
We can see here that port 8786
has been exposed for the Dask communication along with 8787
for the Dashboard.
How you access these service endpoints will vary depending on your Kubernetes cluster configuration.
For this quick example we could use kubectl
to port forward the service to your local machine.
$ kubectl port-forward svc/simple 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
Forwarding from [::1]:8786 -> 8786
Then we can connect to it from a Python session.
>>> from dask.distributed import Client
>>> client = Client("localhost:8786")
>>> print(client)
<Client: 'tcp://10.244.0.12:8786' processes=3 threads=12, memory=23.33 GiB>
We can also list all of the pods created by the operator to run our cluster.
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 104s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 104s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 104s
simple-scheduler 1/1 Running 0 104s
The workers we see here are created by our clusters default workergroup
resource that was also created by the operator.
You can scale the workergroup
like you would a Deployment
or ReplicaSet
:
$ kubectl scale --replicas=5 daskworkergroup simple-default
daskworkergroup.kubernetes.dask.org/simple-default
We can verify that new pods have been created.
$ kubectl get po -l dask.org/cluster-name=simple
NAME READY STATUS RESTARTS AGE
simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 5m26s
simple-default-worker-a52bf313590f432d9dc7395875583b52 1/1 Running 0 27s
simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 5m26s
simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 5m26s
simple-default-worker-f4223a45b49d49288195c540c32f0fc0 1/1 Running 0 27s
simple-scheduler 1/1 Running 0 5m26s
Finally we can delete the cluster either by deleting the manifest we applied before, or directly by name:
$ kubectl delete -f cluster.yaml
daskcluster.kubernetes.dask.org "simple" deleted
$ kubectl delete daskcluster simple
daskcluster.kubernetes.dask.org "simple" deleted
DaskWorkerGroup¶
When we create a DaskCluster
resource a default worker group is created for us. But we can add more by creating more manifests. This allows us to create workers of different shapes and sizes that Dask can leverage for different tasks.
Let’s create an example called highmemworkers.yaml
with the following configuration:
# highmemworkers.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: simple-highmem
spec:
cluster: simple
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
resources:
requests:
memory: "32Gi"
limits:
memory: "32Gi"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --resources
- MEMORY=32e9
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
The main thing we need to ensure is that the cluster
option matches the name of the cluster we created earlier. This will cause
the workers to join that cluster.
See the DaskAutoscaler. Now apply highmemworkers.yaml
$ kubectl apply -f highmemworkers.yaml
daskworkergroup.kubernetes.dask.org/simple-highmem created
We can list our clusters:
$ kubectl get daskworkergroups
NAME AGE
simple-default 2 hours
simple-highmem 47s
We don’t need to worry about deleting this worker group seperately, because it has joined the existing cluster Kubernetes will delete it
when the DaskCluster
resource is deleted.
Scaling works the same was as the default worker group and can be done with the kubectl scale
command.
DaskJob¶
The DaskJob
custom resource behaves similarly to other Kubernetes batch resources.
It creates a Pod
that executes a command to completion. The difference is that the DaskJob
also creates
a DaskCluster
alongside it and injects the appropriate configuration into the job Pod
for it to
automatically connect to and leverage the Dask cluster.
Let’s create an example called job.yaml
with the following configuration:
# job.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: simple-job
namespace: default
spec:
job:
spec:
containers:
- name: job
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- python
- -c
- "from dask.distributed import Client; client = Client(); # Do some work..."
cluster:
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
- --dashboard
- --dashboard-address
- "8788"
ports:
- name: http-dashboard
containerPort: 8788
protocol: TCP
env:
- name: WORKER_ENV
value: hello-world # We dont test the value, just the name
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
env:
- name: SCHEDULER_ENV
value: hello-world
service:
type: ClusterIP
selector:
dask.org/cluster-name: simple-job
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
Editing this file will change the default configuration of you Dask job. See the DaskAutoscaler. Now apply job.yaml
$ kubectl apply -f job.yaml
daskjob.kubernetes.dask.org/simple-job created
Now if we check our cluster resources we should see our job and cluster pods being created.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-scheduler 1/1 Running 0 8s
simple-job-runner 1/1 Running 0 8s
simple-job-default-worker-1f6c670fba 1/1 Running 0 8s
simple-job-default-worker-791f93d9ec 1/1 Running 0 8s
Our runner pod will be doing whatever we configured it to do. In our example you can see we just create a simple dask.distributed.Client
object like this:
from dask.distributed import Client
client = Client()
# Do some work...
We can do this because the job pod gets some additional environment variables set at runtime which tell the Client
how to connect to the cluster, so the user doesn’t need to
worry about it.
The job pod has a default restart policy of OnFalure
so if it exits with anything other than a 0
return code it will be restarted automatically until it completes successfully. When it does return a 0
it will
go into a Completed
state and the Dask cluster will be cleaned up automatically freeing up Kubernetes cluster resources.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
simple-job-runner 0/1 Completed 0 14s
simple-job-scheduler 1/1 Terminating 0 14s
simple-job-default-worker-1f6c670fba 1/1 Terminating 0 14s
simple-job-default-worker-791f93d9ec 1/1 Terminating 0 14s
When you delete the DaskJob
resource everything is delete automatically, whether that’s just the Completed
runner pod left over after a successful run or a full Dask cluster and runner that is still running.
$ kubectl delete -f job.yaml
daskjob.kubernetes.dask.org "simple-job" deleted
DaskAutoscaler¶
The DaskAutoscaler
resource allows the scheduler to scale up and down the number of workers
using dask’s adaptive mode.
By creating the resource the operator controller will periodically poll the scheduler and request the desired number of workers. The scheduler calculates this number by profiling the tasks it is processing and then extrapolating how many workers it would need to complete the current graph within 5 seconds.
The controller will constrain this number between the minimum
and maximum
values configured in the DaskAutoscaler
resource
and then update the number of replicas in the default DaskWorkerGroup
.
# autoscaler.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: simple
spec:
cluster: "simple"
minimum: 1 # we recommend always having a minimum of 1 worker so that an idle cluster can start working on tasks immediately
maximum: 10 # you can place a hard limit on the number of workers regardless of what the scheduler requests
$ kubectl apply -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple created
You can end the autoscaling at any time by deleting the resource. The number of workers will remain at whatever the autoscaler last set it to.
$ kubectl delete -f autoscaler.yaml
daskautoscaler.kubernetes.dask.org/simple deleted
Note
The autoscaler will only scale the default WorkerGroup
. If you have additional worker groups configured they
will not be taken into account.
Labels and Annotations¶
Labels and annotations are propagated to child resources, so labels applied to a DaskCluster
will also be present on the Pod
and Service
resources it creates.
Labels/annotations on
DaskCluster
are propagated to theDaskWorkerGroup
, schedulerPod
and schedulerService
.Labels/annotations on
DaskWorkerGroup
are propagated to the workerPod
.Labels/annotations on
DaskJob
are propagated to the jobPod
andDaskCluster
.
Some resources also have subresource metadata options for setting labels and annotations on the resources it creates.
DaskCluster
hasspec.worker.metadata
which is merged into the labels/annotations for theDaskWorkerGroup
.DaskCluster
hasspec.scheduler.metadata
which is merged into the labels/annotations for the schedulerPod
and schedulerService
.DaskJob
hasspec.job.metadata
which is merged into the labels/annotations for the jobPod
.
The order of label/annotation application is top_level <= subresource <= base
.
So if the DaskCluster
has a label of foo=bar
but the spec.worker.metadata.labels
had a label of foo=baz
then the worker Pod
would have foo=baz
.
Equally, if the reserved base label dask.org/component
is set at either the top-level or subresource-level this will be overridden by the controller.
So setting dask.org/component=superworker
in DaskCluster.spec.worker.metadata.labels
will have no effect and the worker Pod
will still have the expected label of dask.org/component=worker
.
Example¶
The following DaskCluster
has top-level annotations as well as worker and scheduler subresource annotations.
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
annotations:
hello: world
spec:
worker:
replicas: 2
metadata:
annotations:
foo: bar
spec:
...
scheduler:
metadata:
annotations:
fizz: buzz
spec:
...
The resulting scheduler Pod
metadata annotations would be.
apiVersion: v1
kind: Pod
metadata:
name: example-scheduler
annotations:
fizz: buzz
hello: world
...
Full Configuration Reference¶
Full DaskCluster
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: example
spec:
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
scheduler:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
service: ... # ServiceSpec, standard k8s service - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#servicespec-v1-core
Full DaskWorkerGroup
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: example
spec:
cluster: "name of DaskCluster to associate worker group with"
worker:
replicas: 2 # number of replica workers to spawn
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
Full DaskJob
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskJob
metadata:
name: example
spec:
job:
spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
cluster:
spec: ... # ClusterSpec, DaskCluster resource spec
Full DaskAutoscaler
spec reference.
apiVersion: kubernetes.dask.org/v1
kind: DaskAutoscaler
metadata:
name: example
spec:
cluster: "name of DaskCluster to autoscale"
minimum: 0 # minimum number of workers to create
maximum: 10 # maximum number of workers to create