aiopykube

Throughout the lifetime of this library we have used almost every Python Kubernetes library available. They all seem to have their own drawbacks but my favourite for clean code and readibility is pykube-ng.

However, pykube is built on requests and doesn’t natively support asyncio which we heavily use in Dask. To make it a little easier to work with we have a dask_kubernetes.aiopykube submodule which provides an async shim that has the same API as pykube and runs all IO calls in a threadpool executor.

The shim also includes a dask_kubernetes.aiopykube.dask submodule with custom objects to represent Dask custom resources.

Examples

Iterate over Pods

from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.objects import Pod

api = HTTPClient(KubeConfig.from_env())

async for pod in Pod.objects(namespace="default"):
    # Do something

Create a new Pod

from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.objects import Pod

api = HTTPClient(KubeConfig.from_env())

pod = Pod(
    api,
    {
        "apiVersion": "v1",
        "kind": "Pod",
        "metadata": {"name": name},
        "spec": {
            "containers": [
                {"name": "pause", "image": "gcr.io/google_containers/pause"}
            ]
        },
    },
)

await pod.create()

Scale an existing deployment

from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.objects import Deployment

api = HTTPClient(KubeConfig.from_env())

deployment = await Deployment.objects(api).get_by_name("mydeployment")
await deployment.scale(5)

Delete a DaskCluster custom resource

from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import DaskCluster

api = HTTPClient(KubeConfig.from_env())
cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name("mycluster")
await cluster.delete()

API

dask_kubernetes.aiopykube.query

dask_kubernetes.aiopykube.query.Query(api, ...)

Attributes

dask_kubernetes.aiopykube.query.WatchQuery(...)

Attributes

Query

class dask_kubernetes.aiopykube.query.Query(api: pykube.http.HTTPClient, api_obj_class, namespace: Optional[str] = None)[source]
Attributes
query_cache
response

Methods

as_table()

Execute query and return result as Table (similar to what kubectl does) See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables

filter([namespace, selector, field_selector])

Filter objects by namespace, labels, or fields

get(*args, **kwargs)

Get a single object by name, namespace, label, ..

get_by_name(name)

Get object by name, raises ObjectDoesNotExist if not found

get_or_none(*args, **kwargs)

Get object by name, return None if not found

iterator()

Execute the API request and return an iterator over the objects.

all

execute

watch

async as_table() pykube.query.Table[source]

Execute query and return result as Table (similar to what kubectl does) See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables

async get(*args, **kwargs)[source]

Get a single object by name, namespace, label, ..

async get_by_name(name: str)[source]

Get object by name, raises ObjectDoesNotExist if not found

async get_or_none(*args, **kwargs)[source]

Get object by name, return None if not found

async iterator()[source]

Execute the API request and return an iterator over the objects. This method does not use the query cache.

WatchQuery

class dask_kubernetes.aiopykube.query.WatchQuery(*args, **kwargs)[source]
Attributes
response

Methods

filter([namespace, selector, field_selector])

Filter objects by namespace, labels, or fields

all

object_stream

dask_kubernetes.aiopykube.dask

dask_kubernetes.aiopykube.dask.DaskCluster(...)

Attributes

dask_kubernetes.aiopykube.dask.DaskWorkerGroup(...)

Attributes

dask_kubernetes.aiopykube.dask.DaskJob(api, obj)

Attributes

dask_kubernetes.aiopykube.dask.DaskAutoscaler(...)

Attributes

DaskCluster

class dask_kubernetes.aiopykube.dask.DaskCluster(api: pykube.http.HTTPClient, obj: dict)[source]
Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

DaskWorkerGroup

class dask_kubernetes.aiopykube.dask.DaskWorkerGroup(api: pykube.http.HTTPClient, obj: dict)[source]
Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

DaskJob

class dask_kubernetes.aiopykube.dask.DaskJob(api: pykube.http.HTTPClient, obj: dict)[source]
Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

DaskAutoscaler

class dask_kubernetes.aiopykube.dask.DaskAutoscaler(api: pykube.http.HTTPClient, obj: dict)[source]
Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

dask_kubernetes.aiopykube.objects

dask_kubernetes.aiopykube.objects.ConfigMap(...)

ConfigMap.

dask_kubernetes.aiopykube.objects.CronJob(...)

CronJob.

dask_kubernetes.aiopykube.objects.DaemonSet(...)

DaemonSet.

dask_kubernetes.aiopykube.objects.Deployment(...)

Deployment.

dask_kubernetes.aiopykube.objects.Endpoint(...)

Endpoint.

dask_kubernetes.aiopykube.objects.Event(api, obj)

Event.

dask_kubernetes.aiopykube.objects.LimitRange(...)

LimitRange.

dask_kubernetes.aiopykube.objects.ResourceQuota(...)

ResourceQuota.

dask_kubernetes.aiopykube.objects.ServiceAccount(...)

ServiceAccount.

dask_kubernetes.aiopykube.objects.Ingress(...)

Ingress.

dask_kubernetes.aiopykube.objects.Job(api, obj)

Job.

dask_kubernetes.aiopykube.objects.Namespace(...)

Namespace.

dask_kubernetes.aiopykube.objects.Node(api, obj)

Node.

dask_kubernetes.aiopykube.objects.Pod(api, obj)

Attributes

dask_kubernetes.aiopykube.objects.ReplicationController(...)

ReplicationController.

dask_kubernetes.aiopykube.objects.ReplicaSet(...)

ReplicaSet.

dask_kubernetes.aiopykube.objects.Secret(...)

Secret.

dask_kubernetes.aiopykube.objects.Service(...)

Service.

dask_kubernetes.aiopykube.objects.PersistentVolume(...)

PersistentVolume.

dask_kubernetes.aiopykube.objects.PersistentVolumeClaim(...)

PersistentVolumeClaim.

dask_kubernetes.aiopykube.objects.HorizontalPodAutoscaler(...)

HorizontalPodAutoscaler.

dask_kubernetes.aiopykube.objects.StatefulSet(...)

StatefulSet.

dask_kubernetes.aiopykube.objects.Role(api, obj)

Role.

dask_kubernetes.aiopykube.objects.RoleBinding(...)

RoleBinding.

dask_kubernetes.aiopykube.objects.ClusterRole(...)

ClusterRole.

dask_kubernetes.aiopykube.objects.ClusterRoleBinding(...)

ClusterRoleBinding.

dask_kubernetes.aiopykube.objects.PodSecurityPolicy(...)

PodSecurityPolicy.

dask_kubernetes.aiopykube.objects.PodDisruptionBudget(...)

PodDisruptionBudget.

dask_kubernetes.aiopykube.objects.CustomResourceDefinition(...)

CustomResourceDefinition.

ConfigMap

class dask_kubernetes.aiopykube.objects.ConfigMap(api: pykube.http.HTTPClient, obj: dict)[source]

ConfigMap.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

CronJob

class dask_kubernetes.aiopykube.objects.CronJob(api: pykube.http.HTTPClient, obj: dict)[source]

CronJob.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

DaemonSet

class dask_kubernetes.aiopykube.objects.DaemonSet(api: pykube.http.HTTPClient, obj: dict)[source]

DaemonSet.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Deployment

class dask_kubernetes.aiopykube.objects.Deployment(api: pykube.http.HTTPClient, obj: dict)[source]

Deployment.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

ready
replicas
scalable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

rollout_undo([target_revision])

Produces same action as kubectl rollout undo deployment command.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

scale

set_obj

watch

async rollout_undo(target_revision=None)[source]

Produces same action as kubectl rollout undo deployment command. Input variable is revision to rollback to (in kubectl, –to-revision)

Endpoint

class dask_kubernetes.aiopykube.objects.Endpoint(api: pykube.http.HTTPClient, obj: dict)[source]

Endpoint.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Event

class dask_kubernetes.aiopykube.objects.Event(api: pykube.http.HTTPClient, obj: dict)[source]

Event.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

LimitRange

class dask_kubernetes.aiopykube.objects.LimitRange(api: pykube.http.HTTPClient, obj: dict)[source]

LimitRange.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

ResourceQuota

class dask_kubernetes.aiopykube.objects.ResourceQuota(api: pykube.http.HTTPClient, obj: dict)[source]

ResourceQuota.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

ServiceAccount

class dask_kubernetes.aiopykube.objects.ServiceAccount(api: pykube.http.HTTPClient, obj: dict)[source]

ServiceAccount.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Ingress

class dask_kubernetes.aiopykube.objects.Ingress(api: pykube.http.HTTPClient, obj: dict)[source]

Ingress.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Job

class dask_kubernetes.aiopykube.objects.Job(api: pykube.http.HTTPClient, obj: dict)[source]

Job.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

parallelism
scalable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

scale

set_obj

watch

Namespace

class dask_kubernetes.aiopykube.objects.Namespace(api: pykube.http.HTTPClient, obj: dict)[source]

Namespace.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Node

class dask_kubernetes.aiopykube.objects.Node(api: pykube.http.HTTPClient, obj: dict)[source]

Node.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace
unschedulable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

cordon

create

exists

objects

reload

set_obj

uncordon

watch

Pod

class dask_kubernetes.aiopykube.objects.Pod(api: pykube.http.HTTPClient, obj: dict)[source]
Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

ready

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

logs(*args, **kwargs)

Produces the same result as calling kubectl logs pod/<pod-name>.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

async logs(*args, **kwargs)[source]

Produces the same result as calling kubectl logs pod/<pod-name>. Check parameters meaning at http://kubernetes.io/docs/api-reference/v1/operations/, part ‘read log of the specified Pod’. The result is plain text.

ReplicationController

class dask_kubernetes.aiopykube.objects.ReplicationController(api: pykube.http.HTTPClient, obj: dict)[source]

ReplicationController.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

ready
replicas
scalable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

scale

set_obj

watch

ReplicaSet

class dask_kubernetes.aiopykube.objects.ReplicaSet(api: pykube.http.HTTPClient, obj: dict)[source]

ReplicaSet.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

replicas
scalable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

scale

set_obj

watch

Secret

class dask_kubernetes.aiopykube.objects.Secret(api: pykube.http.HTTPClient, obj: dict)[source]

Secret.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

Service

class dask_kubernetes.aiopykube.objects.Service(api: pykube.http.HTTPClient, obj: dict)[source]

Service.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

proxy_http_delete(path[, port])

Issue a HTTP DELETE request to proxy of a Service. Args: :param path: The URI path for the request. :param port: This value can be used to override the default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional 'port' kwarg, which can be used to override the default (first defined) port used to connect to the Service. Returns: The requests.Response object.

proxy_http_get(path[, port])

Issue a HTTP GET request to proxy of a Service. Args: :param path: The URI path for the request. :param port: This value can be used to override the default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional 'port' kwarg, which can be used to override the default (first defined) port used to connect to the Service. Returns: The requests.Response object.

proxy_http_post(path[, port])

Issue a HTTP POST request to proxy of a Service. Args: :param path: The URI path for the request. :param port: This value can be used to override the default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional 'port' kwarg, which can be used to override the default (first defined) port used to connect to the Service. Returns: The requests.Response object.

proxy_http_put(path[, port])

Issue a HTTP PUT request to proxy of a Service. Args: :param path: The URI path for the request. :param port: This value can be used to override the default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional 'port' kwarg, which can be used to override the default (first defined) port used to connect to the Service. Returns: The requests.Response object.

proxy_http_request(*args, **kwargs)

Issue a HTTP request with specific HTTP method to proxy of a Service. Args: :param method: The http request method e.g. 'GET', 'POST' etc. :param path: The URI path for the request. :param port: This value can be used to override the default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object. Returns: The requests.Response object.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

async proxy_http_delete(path: str, port: Optional[int] = None, **kwargs) requests.models.Response[source]

Issue a HTTP DELETE request to proxy of a Service. Args:

param path

The URI path for the request.

param port

This value can be used to override the

default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional ‘port’ kwarg, which can be used to override the default (first defined) port used to connect to the Service.

Returns:

The requests.Response object.

async proxy_http_get(path: str, port: Optional[int] = None, **kwargs) requests.models.Response[source]

Issue a HTTP GET request to proxy of a Service. Args:

param path

The URI path for the request.

param port

This value can be used to override the

default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional ‘port’ kwarg, which can be used to override the default (first defined) port used to connect to the Service.

Returns:

The requests.Response object.

async proxy_http_post(path: str, port: Optional[int] = None, **kwargs) requests.models.Response[source]

Issue a HTTP POST request to proxy of a Service. Args:

param path

The URI path for the request.

param port

This value can be used to override the

default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional ‘port’ kwarg, which can be used to override the default (first defined) port used to connect to the Service.

Returns:

The requests.Response object.

async proxy_http_put(path: str, port: Optional[int] = None, **kwargs) requests.models.Response[source]

Issue a HTTP PUT request to proxy of a Service. Args:

param path

The URI path for the request.

param port

This value can be used to override the

default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object plus the additional ‘port’ kwarg, which can be used to override the default (first defined) port used to connect to the Service.

Returns:

The requests.Response object.

async proxy_http_request(*args, **kwargs)[source]

Issue a HTTP request with specific HTTP method to proxy of a Service. Args:

param method

The http request method e.g. ‘GET’, ‘POST’ etc.

param path

The URI path for the request.

param port

This value can be used to override the

default (first defined) port used to connect to the Service. :param kwargs: Keyword arguments for the proxy_http_get function. They are the same as for requests.models.Request object.

Returns:

The requests.Response object.

PersistentVolume

class dask_kubernetes.aiopykube.objects.PersistentVolume(api: pykube.http.HTTPClient, obj: dict)[source]

PersistentVolume.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

PersistentVolumeClaim

class dask_kubernetes.aiopykube.objects.PersistentVolumeClaim(api: pykube.http.HTTPClient, obj: dict)[source]

PersistentVolumeClaim.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

HorizontalPodAutoscaler

class dask_kubernetes.aiopykube.objects.HorizontalPodAutoscaler(api: pykube.http.HTTPClient, obj: dict)[source]

HorizontalPodAutoscaler.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

StatefulSet

class dask_kubernetes.aiopykube.objects.StatefulSet(api: pykube.http.HTTPClient, obj: dict)[source]

StatefulSet.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

replicas
scalable

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

scale

set_obj

watch

Role

class dask_kubernetes.aiopykube.objects.Role(api: pykube.http.HTTPClient, obj: dict)[source]

Role.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

RoleBinding

class dask_kubernetes.aiopykube.objects.RoleBinding(api: pykube.http.HTTPClient, obj: dict)[source]

RoleBinding.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

ClusterRole

class dask_kubernetes.aiopykube.objects.ClusterRole(api: pykube.http.HTTPClient, obj: dict)[source]

ClusterRole.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

ClusterRoleBinding

class dask_kubernetes.aiopykube.objects.ClusterRoleBinding(api: pykube.http.HTTPClient, obj: dict)[source]

ClusterRoleBinding.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

PodSecurityPolicy

class dask_kubernetes.aiopykube.objects.PodSecurityPolicy(api: pykube.http.HTTPClient, obj: dict)[source]

PodSecurityPolicy.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

PodDisruptionBudget

class dask_kubernetes.aiopykube.objects.PodDisruptionBudget(api: pykube.http.HTTPClient, obj: dict)[source]

PodDisruptionBudget.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Namespace scope of the Kubernetes resource (metadata.namespace)

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

CustomResourceDefinition

class dask_kubernetes.aiopykube.objects.CustomResourceDefinition(api: pykube.http.HTTPClient, obj: dict)[source]

CustomResourceDefinition.

Attributes
annotations

Annotations of the Kubernetes resource (metadata.annotations)

base
labels

Labels of the Kubernetes resource (metadata.labels)

metadata
name

Name of the Kubernetes resource (metadata.name)

namespace

Methods

delete([propagation_policy])

Delete the Kubernetes resource by calling the API.

patch(strategic_merge_patch, *[, subresource])

Patch the Kubernetes resource by calling the API with a "strategic merge" patch.

update([is_strategic, subresource])

Update the Kubernetes resource by calling the API (patch)

api_kwargs

create

exists

objects

reload

set_obj

watch

FAQ

Why roll our own wrapper?

There does appear to be an aiopykube package on PyPI but it hasn’t been updated for a long time and doesn’t link to any source on GitHub or other source repository. This probably fills the same role but we’re apprehensive to depend on it in this state.

Why not release this is a separate package?

In theory we could pull the implementation out of dask-kubernetes into another dependency. If there is demand from the community to do this we could definitely consider it.

Why not use kubernetes, kubernetes-asyncio, aiokubernetes, etc?

The most popular Kubernetes libraries for Python are swagger generated SDKs based on the Kubernetes API spec. This results in libraries that do not feel Pythonic and only have complex reference documentation without any explaination, how-to or tutorial content.

It’s true that pykube is also a little lacking in documentation, however the code is simple enough and written by a human so code-spelunking isn’t too unpleasant.

Why use a threadpool executor instead of aiohttp?

The way pykube leverages requests has a lot of status checking and error handling (which is great). However, this would mean rewriting far more of the code in our shim if we were to switch out the underlying IO library. To try and keep the shim as lightweight as possible we’ve simply wrapped all methods that make blocking IO calls in calls to run_in_executor.

If we were to ever spin this shim out into a separate package we would probably advocate for a deeper rewrite using aiohttp instead of requests.

Why are some methods not implemented?

There are a couple of design decisions in pykube which have limited how much of it can be converted to asyncio. There are a few properties, such as pukube.query.Query.query_cache, that make HTTP calls. Using any kind of IO in a property or dunder like __len__ is generally frowned upon and therefore there is no asyncio way to make or call a property asynchronously.

Instead of changing the API we’ve set these to raise NotImplementedError with useful exceptions that suggest an alternative way to achieve the same thing.