Dask Operator

Warning

The Dask Operator for Kubernetes is experimental. So any bug reports are appreciated!

The Dask Operator is a small service that runs on you Kubernetes cluster and allows you to create and manage your Dask clusters as native Kubernetes resources. Creating clusters can either be done via the Kubernetes API (kubectl) or the Python API (dask_kubernetes.experimental.KubeCluster)

Installing the Operator

To install the the operator first we need to create the Dask custom resources:

$ kubectl apply -f https://raw.githubusercontent.com/dask/dask-kubernetes/main/dask_kubernetes/operator/deployment/manifests/daskcluster.yaml
$ kubectl apply -f https://raw.githubusercontent.com/dask/dask-kubernetes/main/dask_kubernetes/operator/deployment/manifests/daskworkergroup.yaml

Then you should be able to list your Dask clusters via kubectl.

$ kubectl get daskclusters
No resources found in default namespace.

Next we need to install the operator. The operator will watch for new daskcluster resources being created and add/remove pods/services/etc to create the cluster.

$ kubectl apply -f https://raw.githubusercontent.com/dask/dask-kubernetes/main/dask_kubernetes/operator/deployment/manifests/operator.yaml

This will create the appropriate roles, service accounts and a deployment for the operator. We can check the operator pod is running:

$ kubectl get pods -A -l application=dask-kubernetes-operator
NAMESPACE     NAME                                        READY   STATUS    RESTARTS   AGE
kube-system   dask-kubernetes-operator-775b8bbbd5-zdrf7   1/1     Running   0          74s

Installing the operator with Helm

Along with a set of kubernetes manifests, the operator has a basic Helm chart which can be used to manage the installation of the operator. The chart is published in the Dask Helm repo repository, and can be installed via:

$ helm repo add dask https://helm.dask.org
$ helm repo update
$ helm install myrelease dask/dask-kubernetes-operator

This will install the custom resource definitions, service account, roles, and the operator deployment.

Warning

Please note that Helm does not support updating or deleting CRDs. If updates are made to the CRD templates in future releases (to support future k8s releases, for example) you may have to manually update the CRDs.

Creating a Dask cluster via kubectl

Now we can create Dask clusters.

Let’s create an example called cluster.yaml with the following configuration:

# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: simple-cluster
spec:
  worker:
    replicas: 2
    spec:
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-worker
          # Note the name of the cluster service, which adds "-service" to the end
          - tcp://simple-cluster-service.default.svc.cluster.local:8786
  scheduler:
    spec:
      containers:
      - name: scheduler
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-scheduler
        ports:
          - name: comm
            containerPort: 8786
            protocol: TCP
          - name: dashboard
            containerPort: 8787
            protocol: TCP
        readinessProbe:
          tcpSocket:
            port: comm
            initialDelaySeconds: 5
            periodSeconds: 10
        livenessProbe:
          tcpSocket:
            port: comm
            initialDelaySeconds: 15
            periodSeconds: 20
    service:
      type: NodePort
      selector:
        dask.org/cluster-name: simple-cluster
        dask.org/component: scheduler
      ports:
      - name: comm
        protocol: TCP
        port: 8786
        targetPort: "comm"
      - name: dashboard
        protocol: TCP
        port: 8787
        targetPort: "dashboard"

Editing this file will change the default configuration of you Dask cluster. See the Configuration Reference Additional Worker Groups. Now apply cluster.yaml

$ kubectl apply -f cluster.yaml
daskcluster.kubernetes.dask.org/simple-cluster created

We can list our clusters:

$ kubectl get daskclusters
NAME             AGE
simple-cluster   47s

To connect to this Dask cluster we can use the service that was created for us.

$ kubectl get svc -l dask.org/cluster-name=simple-cluster
NAME                     TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
simple-cluster-service   ClusterIP   10.96.85.120   <none>        8786/TCP,8787/TCP   86s

We can see here that port 8786 has been exposed for the Dask communication along with 8787 for the Dashboard.

How you access these service endpoints will vary depending on your Kubernetes cluster configuration. For this quick example we could use kubectl to port forward the service to your local machine.

$ kubectl port-forward svc/simple-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
Forwarding from [::1]:8786 -> 8786

Then we can connect to it from a Python session.

>>> from dask.distributed import Client
>>> client = Client("localhost:8786")
>>> print(client)
<Client: 'tcp://10.244.0.12:8786' processes=3 threads=12, memory=23.33 GiB>

We can also list all of the pods created by the operator to run our cluster.

$ kubectl get po -l dask.org/cluster-name=simple-cluster
NAME                                                                          READY   STATUS    RESTARTS   AGE
simple-cluster-default-worker-group-worker-13f4f0d13bbc40a58cfb81eb374f26c3   1/1     Running   0          104s
simple-cluster-default-worker-group-worker-aa79dfae83264321a79f1f0ffe91f700   1/1     Running   0          104s
simple-cluster-default-worker-group-worker-f13c4f2103e14c2d86c1b272cd138fe6   1/1     Running   0          104s
simple-cluster-scheduler                                                      1/1     Running   0          104s

The workers we see here are created by our clusters default workergroup resource that was also created by the operator.

You can scale the workergroup like you would a Deployment or ReplicaSet:

$ kubectl scale --replicas=5 daskworkergroup simple-cluster-default-worker-group
daskworkergroup.kubernetes.dask.org/simple-cluster-default-worker-group scaled

We can verify that new pods have been created.

$ kubectl get po -l dask.org/cluster-name=simple-cluster
NAME                                                                          READY   STATUS    RESTARTS   AGE
simple-cluster-default-worker-group-worker-13f4f0d13bbc40a58cfb81eb374f26c3   1/1     Running   0          5m26s
simple-cluster-default-worker-group-worker-a52bf313590f432d9dc7395875583b52   1/1     Running   0          27s
simple-cluster-default-worker-group-worker-aa79dfae83264321a79f1f0ffe91f700   1/1     Running   0          5m26s
simple-cluster-default-worker-group-worker-f13c4f2103e14c2d86c1b272cd138fe6   1/1     Running   0          5m26s
simple-cluster-default-worker-group-worker-f4223a45b49d49288195c540c32f0fc0   1/1     Running   0          27s
simple-cluster-scheduler                                                      1/1     Running   0          5m26s

Finally we can delete the cluster either by deleting the manifest we applied before, or directly by name:

$ kubectl delete -f cluster.yaml
daskcluster.kubernetes.dask.org "simple-cluster" deleted

$ kubectl delete daskcluster simple-cluster
daskcluster.kubernetes.dask.org "simple-cluster" deleted

Creating a Dask cluster via the cluster manager

Alternatively, with the cluster manager, you can conveniently create and manage a Dask cluster in Python. Then connect a dask.distributed.Client object to it directly and perform your work.

Under the hood the Python cluster manager will interact with ther Kubernetes API to create resources for us as we did above.

To create a cluster in the default namespace, run the following

from dask_kubernetes.experimental import KubeCluster

cluster = KubeCluster(name='foo')

You can change the default configuration of the cluster by passing additional args to the python class (namespace, n_workers, etc.) of your cluster. See the API refernce API

You can scale the cluster

# Scale up the cluster
cluster.scale(5)

# Scale down the cluster
cluster.scale(1)

You can connect to the client

from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

Finally delete the cluster by running

cluster.close()

Additional Worker Groups

The operator also has support for creating additional worker groups. These are extra groups of workers with different configuration settings and can be scaled separately. You can then use resource annotations to schedule different tasks to different groups.

For example you may wish to have a smaller pool of workers that have more memory for memory intensive tasks, or GPUs for compute intensive tasks.

Creating a Worker Group via kubectl

When we create a DaskCluster resource a default worker group is created for us. But we can add more by creating more manifests.

Let’s create an example called highmemworkers.yaml with the following configuration:

# highmemworkers.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
  name: simple-cluster-highmem-worker-group
spec:
  cluster: simple-cluster
  worker:
    replicas: 2
    spec:
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        resources:
          requests:
            memory: "2Gi"
          limits:
            memory: "32Gi"
        args:
          - dask-worker
          # Note the name of the cluster service, which adds "-service" to the end
          - tcp://simple-cluster-service.default.svc.cluster.local:8786

The main thing we need to ensure is that the cluster option matches the name of the cluster we created earlier. This will cause the workers to join that cluster.

See the Configuration Reference Additional Worker Groups. Now apply highmemworkers.yaml

$ kubectl apply -f highmemworkers.yaml
daskworkergroup.kubernetes.dask.org/simple-cluster-highmem-worker-group created

We can list our clusters:

$ kubectl get daskworkergroups
NAME                                  AGE
simple-cluster-default-worker-group   2 hours
simple-cluster-highmem-worker-group   47s

We don’t need to worry about deleting this worker group seperately, because it has joined the existing cluster it will be deleted when the DaskCluster resource is deleted.

Scaling works the same was as the default worker group and can be done with the kubectl scale command.

Creating an additional worker group via the cluster manager

Additional worker groups can also be created via the cluster manager in Python.

from dask_kubernetes.experimental import KubeCluster

cluster = KubeCluster(name='foo')

cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}})

We can also scale the worker groups by name from the cluster object.

cluster.scale(5, worker_group="highmem")

Additional worker groups can also be deleted in Python.

cluster.delete_worker_group(name="highmem")

Any additional worker groups you create will be deleted when the cluster is deleted.

Configuration Reference

Full DaskCluster spec reference.

apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: example
spec:
  worker:
    replicas: 2 # number of replica workers to spawn
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
  scheduler:
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core
    service: ... # ServiceSpec, standard k8s service - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#servicespec-v1-core

Full DaskWorkerGroup spec reference.

apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
  name: example
spec:
  cluster: "name of DaskCluster to associate worker group with"
  worker:
    replicas: 2 # number of replica workers to spawn
    spec: ... # PodSpec, standard k8s pod - https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podspec-v1-core

API

KubeCluster(name[, namespace, image, ...])

Launch a Dask Cluster on Kubernetes using the Operator

KubeCluster.scale(n[, worker_group])

Scale cluster to n workers

KubeCluster.get_logs()

Get logs for Dask scheduler and workers.

KubeCluster.add_worker_group(name[, ...])

Create a dask worker group by name

KubeCluster.delete_worker_group(name)

Delete a dask worker group by name

KubeCluster.close([timeout])

Delete the dask cluster

class dask_kubernetes.experimental.KubeCluster(name, namespace='default', image='ghcr.io/dask/dask:latest', n_workers=3, resources={}, env=[], loop=None, asynchronous=False, auth=[<dask_kubernetes.common.auth.InCluster object>, <dask_kubernetes.common.auth.KubeConfig object>], port_forward_cluster_ip=None, create_mode=CreateMode.CREATE_OR_CONNECT, shutdown_on_close=None, **kwargs)[source]

Launch a Dask Cluster on Kubernetes using the Operator

This cluster manager creates a Dask cluster by deploying the necessary kubernetes resources the Dask Operator needs to create pods. It can also connect to an existing cluster by providing the name of the cluster.

Parameters
name: str (required)

Name given the Dask cluster.

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

image: str (optional)

Image to run in Scheduler and Worker Pods.

n_workers: int

Number of workers on initial launch. Use scale to change this number in the future

resources: Dict[str, str]
env: List[dict] | Dict[str, str]

List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are running it locally it should be the port you are forwarding to <port>.

create_mode: CreateMode (optional)

How to handle cluster creation if the cluster resource already exists. Default behaviour is to create a new clustser if one with that name doesn’t exist, or connect to an existing one if it does. You can also set CreateMode.CREATE_ONLY to raise an exception if a cluster with that name already exists. Or CreateMode.CONNECT_ONLY to raise an exception if a cluster with that name doesn’t exist.

shutdown_on_close: bool (optional)

Whether or not to delete the cluster resource when this object is closed. Defaults to True when creating a cluster and False when connecting to an existing one.

**kwargs: dict

Additional keyword arguments to pass to LocalCluster

Examples

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster(name="foo")

You can add another group of workers (default is 3 workers) >>> cluster.add_worker_group(‘additional’, n=4)

You can then resize the cluster with the scale method >>> cluster.scale(10)

And optionally scale a specific worker group >>> cluster.scale(10, worker_group=’additional’)

You can also resize the cluster adaptively and give it a range of workers >>> cluster.adapt(20, 50)

You can pass this cluster directly to a Dask client >>> from dask.distributed import Client >>> client = Client(cluster)

You can also access cluster logs >>> cluster.get_logs()

You can also connect to an existing cluster >>> existing_cluster = KubeCluster.from_name(name=”ialreadyexist”)

Attributes
asynchronous

Are we running in the event loop?

cluster_name
dashboard_link
name
observed
plan
requested
scheduler_address

Methods

adapt(*args, **kwargs)

Turn on adaptivity

add_worker_group(name[, n_workers, image, ...])

Create a dask worker group by name

close([timeout])

Delete the dask cluster

delete_worker_group(name)

Delete a dask worker group by name

from_name(name, **kwargs)

Create an instance of this class to represent an existing cluster by name.

get_logs()

Get logs for Dask scheduler and workers.

scale(n[, worker_group])

Scale cluster to n workers

sync(func, *args[, asynchronous, ...])

Call func with args synchronously or asynchronously depending on the calling context

logs

adapt(*args, **kwargs)[source]

Turn on adaptivity

add_worker_group(name, n_workers=3, image=None, resources=None, env=None)[source]

Create a dask worker group by name

Parameters
name: str

Name of the worker group

n_workers: int

Number of workers on initial launch. Use .scale(n_workers, worker_group=name) to change this number in the future.

image: str (optional)

Image to run in Scheduler and Worker Pods. If ommitted will use the cluster default.

resources: Dict[str, str]

Resources to be passed to the underlying pods. If ommitted will use the cluster default.

env: List[dict]

List of environment variables to pass to worker pod. If ommitted will use the cluster default.

Examples

>>> cluster.add_worker_group("high-mem-workers", n_workers=5)
close(timeout=3600)[source]

Delete the dask cluster

delete_worker_group(name)[source]

Delete a dask worker group by name

Parameters
name: str

Name of the worker group

Examples

>>> cluster.delete_worker_group("high-mem-workers")
classmethod from_name(name, **kwargs)[source]

Create an instance of this class to represent an existing cluster by name.

Will fail if a cluster with that name doesn’t already exist.

Parameters
name: str

Name of the cluster to connect to

Examples

>>> cluster = KubeCluster.from_name(name="simple-cluster")
get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'foo-cluster-scheduler': ...,
'foo-cluster-default-worker-group-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ...,
'foo-cluster-default-worker-group-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ...,
'foo-cluster-default-worker-group-worker-d65bee23bdae423b8d40c5da7a1065b6': ...}
Each log will be a string of all logs for that container. To view
it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.244.0.222:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n, worker_group='default')[source]

Scale cluster to n workers

Parameters
nint

Target number of workers

worker_groupstr

Worker group to scale

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
>>> cluster.scale(7, worker_group="high-mem-workers") # scale worker group high-mem-workers to seven workers