API

KubeCluster([pod_template, name, namespace, …]) Launch a Dask cluster on Kubernetes
KubeCluster.adapt(*args[, minimum, maximum]) Turn on adaptivity
KubeCluster.from_dict(pod_spec, **kwargs) Create cluster with worker pod spec defined by Python dictionary
KubeCluster.from_yaml(yaml_path, **kwargs) Create cluster with worker pod spec defined by a YAML file
KubeCluster.logs(*args, **kwargs)
KubeCluster.pods
KubeCluster.scale(n) Scale cluster to n workers
HelmCluster([release_name, auth, namespace, …]) Connect to a Dask cluster deployed via the Helm Chart.
HelmCluster.scale(n_workers) Scale cluster to n workers.
HelmCluster.adapt(*args, **kwargs) Turn on adaptivity (Not recommended).
HelmCluster.logs(*args, **kwargs)
InCluster Configure the Kubernetes connection from a container’s environment.
KubeConfig([config_file, context, …]) Configure the Kubernetes connection from a kubeconfig file.
KubeAuth(host, **kwargs) Configure the Kubernetes connection explicitly.
class dask_kubernetes.KubeCluster(pod_template=None, name=None, namespace=None, n_workers=None, host=None, port=None, env=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], idle_timeout=None, deploy_mode=None, interface=None, protocol=None, dashboard_address=None, security=None, scheduler_service_wait_timeout=None, scheduler_pod_template=None, **kwargs)[source]

Launch a Dask cluster on Kubernetes

This starts a local Dask scheduler and then dynamically launches Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken to be either the current one on which this code is running, or as a fallback, the default one configured in a kubeconfig file.

Environments

Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See examples below for suggestions on how to manage and check for this.

Network

Since the Dask scheduler is launched locally, for it to work, we need to be able to open network connections between this local node and all the workers nodes on the Kubernetes cluster. If the current process is not already on a Kubernetes node, some network configuration will likely be required to make this work.

Resources

Your Kubernetes resource limits and requests should match the --memory-limit and --nthreads parameters given to the dask-worker command.

Parameters:
pod_template: kubernetes.client.V1Pod

A Kubernetes specification for a Pod for a dask worker.

scheduler_pod_template: kubernetes.client.V1Pod (optional)

A Kubernetes specification for a Pod for a dask scheduler. Defaults to the pod_template.

name: str (optional)

Name given to the pods. Defaults to dask-$USER-random

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

n_workers: int

Number of workers on initial launch. Use scale to change this number in the future

env: Dict[str, str]

Dictionary of environment variables to pass to worker pod

host: str

Listen address for local scheduler. Defaults to 0.0.0.0

port: int

Port of local scheduler

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

idle_timeout: str (optional)

The scheduler task will exit after this amount of time if there are no requests from the client. Default is to never timeout.

scheduler_service_wait_timeout: int (optional)

Timeout, in seconds, to wait for the remote scheduler service to be ready. Defaults to 30 seconds. Set to 0 to disable the timeout (not recommended).

deploy_mode: str (optional)

Run the scheduler as “local” or “remote”. Defaults to "local".

**kwargs: dict

Additional keyword arguments to pass to LocalCluster

See also

KubeCluster.from_yaml
KubeCluster.from_dict
KubeCluster.adapt

Examples

>>> from dask_kubernetes import KubeCluster, make_pod_spec
>>> pod_spec = make_pod_spec(image='daskdev/dask:latest',
...                          memory_limit='4G', memory_request='4G',
...                          cpu_limit=1, cpu_request=1,
...                          env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})
>>> cluster = KubeCluster(pod_spec)
>>> cluster.scale(10)

You can also create clusters with worker pod specifications as dictionaries or stored in YAML files

>>> cluster = KubeCluster.from_yaml('worker-template.yml')
>>> cluster = KubeCluster.from_dict({...})

Rather than explicitly setting a number of workers you can also ask the cluster to allocate workers dynamically based on current workload

>>> cluster.adapt()

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can verify that your local environment matches your worker environments by calling client.get_versions(check=True). This will raise an informative error if versions do not match.

>>> client.get_versions(check=True)

The daskdev/dask docker images support EXTRA_PIP_PACKAGES, EXTRA_APT_PACKAGES and EXTRA_CONDA_PACKAGES environment variables to help with small adjustments to the worker environments. We recommend the use of pip over conda in this case due to a much shorter startup time. These environment variables can be modified directly from the KubeCluster constructor methods using the env= keyword. You may list as many packages as you like in a single string like the following:

>>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed'
>>> conda = '-c conda-forge scikit-learn'
>>> KubeCluster.from_yaml(..., env={'EXTRA_PIP_PACKAGES': pip,
...                                 'EXTRA_CONDA_PACKAGES': conda})

You can also start a KubeCluster with no arguments if the worker template is specified in the Dask config files, either as a full template in kubernetes.worker-template or a path to a YAML file in kubernetes.worker-template-path.

See https://docs.dask.org/en/latest/configuration.html for more information about setting configuration values.:

$ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml
>>> cluster = KubeCluster()  # automatically finds 'worker_template.yaml'
Attributes:
asynchronous
dashboard_link
name
namespace
observed
plan
requested
scheduler_address

Methods

adapt(*args[, minimum, maximum]) Turn on adaptivity
from_dict(pod_spec, **kwargs) Create cluster with worker pod spec defined by Python dictionary
from_yaml(yaml_path, **kwargs) Create cluster with worker pod spec defined by a YAML file
get_logs([cluster, scheduler, workers]) Return logs for the cluster, scheduler and workers
new_worker_spec() Return name and spec for the next worker
scale(n) Scale cluster to n workers
scale_up([n, memory, cores]) Scale cluster to n workers
close  
logs  
scale_down  
sync  
classmethod from_dict(pod_spec, **kwargs)[source]

Create cluster with worker pod spec defined by Python dictionary

Examples

>>> spec = {
...     'metadata': {},
...     'spec': {
...         'containers': [{
...             'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)',
...                      '--nthreads', '1',
...                      '--death-timeout', '60'],
...             'command': None,
...             'image': 'daskdev/dask:latest',
...             'name': 'dask-worker',
...         }],
...     'restartPolicy': 'Never',
...     }
... }
>>> cluster = KubeCluster.from_dict(spec, namespace='my-ns')  # doctest: +SKIP
classmethod from_yaml(yaml_path, **kwargs)[source]

Create cluster with worker pod spec defined by a YAML file

We can start a cluster with pods defined in an accompanying YAML file like the following:

kind: Pod
metadata:
  labels:
    foo: bar
    baz: quux
spec:
  containers:
  - image: daskdev/dask:latest
    name: dask-worker
    args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB]
  restartPolicy: Never

Examples

>>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns')  # doctest: +SKIP
scale(n)[source]

Scale cluster to n workers

Parameters:
n: int

Target number of workers

Examples

>>> cluster.scale(10)  # scale cluster to ten workers
class dask_kubernetes.HelmCluster(release_name=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, loop=None, asynchronous=False)[source]

Connect to a Dask cluster deployed via the Helm Chart.

This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.

Parameters:
release_name: str

Name of the helm release to connect to.

namespace: str (optional)

Namespace in which to launch the workers. Defaults to current namespace if available or “default”

port_forward_cluster_ip: bool (optional)

If the chart uses ClusterIP type services, forward the ports locally. If you are using HelmCluster from the Jupyter session that was installed by the helm chart this should be False. If you are running it locally it should be True.

auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

**kwargs: dict

Additional keyword arguments to pass to Cluster

See also

HelmCluster.scale
HelmCluster.logs

Examples

>>> from dask_kubernetes import HelmCluster
>>> cluster = HelmCluster(release_name="myhelmrelease")

You can then resize the cluster with the scale method

>>> cluster.scale(10)

You can pass this cluster directly to a Dask client

>>> from dask.distributed import Client
>>> client = Client(cluster)

You can also access cluster logs

>>> cluster.get_logs()
Attributes:
asynchronous
dashboard_link
observed
plan
requested
scheduler_address

Methods

adapt(*args, **kwargs) Turn on adaptivity (Not recommended).
get_logs() Get logs for Dask scheduler and workers.
scale(n_workers) Scale cluster to n workers.
check_helm_dependency  
close  
logs  
sync  
adapt(*args, **kwargs)[source]

Turn on adaptivity (Not recommended).

get_logs()[source]

Get logs for Dask scheduler and workers.

Examples

>>> cluster.get_logs()
{'testdask-scheduler-5c8ffb6b7b-sjgrg': ...,
'testdask-worker-64c8b78cc-992z8': ...,
'testdask-worker-64c8b78cc-hzpdc': ...,
'testdask-worker-64c8b78cc-wbk4f': ...}

Each log will be a string of all logs for that container. To view it is recommeded that you print each log.

>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"])
...
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.1.6.131:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
...
scale(n_workers)[source]

Scale cluster to n workers.

This sets the Dask worker deployment size to the requested number. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.

Examples

>>> cluster
HelmCluster('tcp://localhost:8786', workers=3, threads=18, memory=18.72 GB)
>>> cluster.scale(4)
>>> cluster
HelmCluster('tcp://localhost:8786', workers=4, threads=24, memory=24.96 GB)
class dask_kubernetes.ClusterAuth[source]

An abstract base class for methods for configuring a connection to a Kubernetes API server.

Examples

>>> from dask_kubernetes import KubeConfig
>>> auth = KubeConfig(context='minikube')
>>> from dask_kubernetes import KubeAuth
>>> auth = KubeAuth(host='https://localhost', username='superuser', password='pass')

Methods

load() Load Kubernetes configuration and set as default
load_first([auth]) Load the first valid configuration in the list auth.
load()[source]

Load Kubernetes configuration and set as default

Raises:
kubernetes.client.KubeConfigException
static load_first(auth=None)[source]

Load the first valid configuration in the list auth. A single configuration method can be passed.

Parameters:
auth: List[ClusterAuth] (optional)

Configuration methods to attempt in order. Defaults to [InCluster(), KubeConfig()].

class dask_kubernetes.InCluster[source]

Configure the Kubernetes connection from a container’s environment.

This authentication method is intended for use when the client is running in a container started by Kubernetes with an authorized service account. This loads the mounted service account token and discovers the Kubernetes API via Kubernetes service discovery.

Methods

load() Load Kubernetes configuration and set as default
load_first([auth]) Load the first valid configuration in the list auth.
class dask_kubernetes.KubeConfig(config_file=None, context=None, persist_config=True)[source]

Configure the Kubernetes connection from a kubeconfig file.

Parameters:
config_file: str (optional)

The path of the kubeconfig file to load. Defaults to the value of the KUBECONFIG environment variable, or the string ~/.kube/config.

context: str (optional)

The kubeconfig context to use. Defaults to the value of current-context in the configuration file.

persist_config: bool (optional)

Whether changes to the configuration will be saved back to disk (e.g. GCP token refresh). Defaults to True.

Methods

load() Load Kubernetes configuration and set as default
load_first([auth]) Load the first valid configuration in the list auth.
class dask_kubernetes.KubeAuth(host, **kwargs)[source]

Configure the Kubernetes connection explicitly.

Parameters:
host: str

The base URL of the Kubernetes host to connect

username: str (optional)

Username for HTTP basic authentication

password: str (optional)

Password for HTTP basic authentication

debug: bool (optional)

Debug switch

verify_ssl: bool (optional)

Set this to false to skip verifying SSL certificate when calling API from https server. Defaults to True.

ssl_ca_cert: str (optional)

Set this to customize the certificate file to verify the peer.

cert_file: str (optional)

Client certificate file

key_file: str (optional)

Client key file

assert_hostname: bool (optional)

Set this to True/False to enable/disable SSL hostname verification. Defaults to True.

proxy: str (optional)

URL for a proxy to connect through

Methods

load() Load Kubernetes configuration and set as default
load_first([auth]) Load the first valid configuration in the list auth.