Source code for dask_kubernetes.aiopykube.objects

from typing import Optional

from requests import Response


from pykube.http import HTTPClient
from pykube.objects import (
    ObjectManager,
    APIObject as _APIObject,
    NamespacedAPIObject as _NamespacedAPIObject,
    ConfigMap as _ConfigMap,
    CronJob as _CronJob,
    DaemonSet as _DaemonSet,
    Deployment as _Deployment,
    Endpoint as _Endpoint,
    Event as _Event,
    LimitRange as _LimitRange,
    ResourceQuota as _ResourceQuota,
    ServiceAccount as _ServiceAccount,
    Ingress as _Ingress,
    Job as _Job,
    Namespace as _Namespace,
    Node as _Node,
    Pod as _Pod,
    ReplicationController as _ReplicationController,
    ReplicaSet as _ReplicaSet,
    Secret as _Secret,
    Service as _Service,
    PersistentVolume as _PersistentVolume,
    PersistentVolumeClaim as _PersistentVolumeClaim,
    HorizontalPodAutoscaler as _HorizontalPodAutoscaler,
    StatefulSet as _StatefulSet,
    Role as _Role,
    RoleBinding as _RoleBinding,
    ClusterRole as _ClusterRole,
    ClusterRoleBinding as _ClusterRoleBinding,
    PodSecurityPolicy as _PodSecurityPolicy,
    PodDisruptionBudget as _PodDisruptionBudget,
    CustomResourceDefinition as _CustomResourceDefinition,
)
from dask_kubernetes.aiopykube.query import Query
from dask_kubernetes.aiopykube.mixins import AsyncScalableMixin, AsyncMixin


class AsyncObjectManager(ObjectManager):
    def __call__(self, api: HTTPClient, namespace: str = None):
        query = super().__call__(api=api, namespace=namespace)
        return query._clone(Query)


class AsyncObjectMixin(AsyncMixin):
    objects = AsyncObjectManager()

    async def exists(self, ensure=False):
        return await self._sync(super().exists, ensure=ensure)

    async def create(self):
        return await self._sync(super().create)

    async def reload(self):
        return await self._sync(super().reload)

    def watch(self):
        return super().watch()

    async def patch(self, strategic_merge_patch, *, subresource=None):
        return await self._sync(
            super().patch, strategic_merge_patch, subresource=subresource
        )

    async def update(self, is_strategic=True, *, subresource=None):
        return await self._sync(super().update, is_strategic, subresource=subresource)

    async def delete(self, propagation_policy: str = None):
        return await self._sync(super().delete, propagation_policy=propagation_policy)

    exists.__doc__ = _APIObject.exists.__doc__
    create.__doc__ = _APIObject.create.__doc__
    reload.__doc__ = _APIObject.reload.__doc__
    watch.__doc__ = _APIObject.watch.__doc__
    patch.__doc__ = _APIObject.patch.__doc__
    update.__doc__ = _APIObject.update.__doc__
    delete.__doc__ = _APIObject.delete.__doc__


class APIObject(AsyncObjectMixin, _APIObject):
    """APIObject."""


class NamespacedAPIObject(AsyncObjectMixin, _NamespacedAPIObject):
    """APIObject."""


[docs]class ConfigMap(AsyncObjectMixin, _ConfigMap): """ConfigMap."""
[docs]class CronJob(AsyncObjectMixin, _CronJob): """CronJob."""
[docs]class DaemonSet(AsyncObjectMixin, _DaemonSet): """DaemonSet."""
[docs]class Deployment(AsyncScalableMixin, AsyncObjectMixin, _Deployment): """Deployment."""
[docs] async def rollout_undo(self, target_revision=None): return await self._sync(super().rollout_undo, target_revision)
rollout_undo.__doc__ = _Deployment.rollout_undo.__doc__
[docs]class Endpoint(AsyncObjectMixin, _Endpoint): """Endpoint."""
[docs]class Event(AsyncObjectMixin, _Event): """Event."""
[docs]class LimitRange(AsyncObjectMixin, _LimitRange): """LimitRange."""
[docs]class ResourceQuota(AsyncObjectMixin, _ResourceQuota): """ResourceQuota."""
[docs]class ServiceAccount(AsyncObjectMixin, _ServiceAccount): """ServiceAccount."""
[docs]class Ingress(AsyncObjectMixin, _Ingress): """Ingress."""
[docs]class Job(AsyncScalableMixin, AsyncObjectMixin, _Job): """Job."""
[docs]class Namespace(AsyncObjectMixin, _Namespace): """Namespace."""
[docs]class Node(AsyncObjectMixin, _Node): """Node."""
[docs]class Pod(AsyncObjectMixin, _Pod): """Pod"""
[docs] async def logs(self, *args, **kwargs): return await self._sync(super().logs, *args, **kwargs)
logs.__doc__ = _Pod.logs.__doc__
[docs]class ReplicationController( AsyncScalableMixin, AsyncObjectMixin, _ReplicationController ): """ReplicationController."""
[docs]class ReplicaSet(AsyncScalableMixin, AsyncObjectMixin, _ReplicaSet): """ReplicaSet."""
[docs]class Secret(AsyncObjectMixin, _Secret): """Secret."""
[docs]class Service(AsyncObjectMixin, _Service): """Service."""
[docs] async def proxy_http_request(self, *args, **kwargs): return await self._sync(super().proxy_http_request, *args, **kwargs)
[docs] async def proxy_http_get( self, path: str, port: Optional[int] = None, **kwargs ) -> Response: return await self.proxy_http_request("GET", path, port, **kwargs)
[docs] async def proxy_http_post( self, path: str, port: Optional[int] = None, **kwargs ) -> Response: return await self.proxy_http_request("POST", path, port, **kwargs)
[docs] async def proxy_http_put( self, path: str, port: Optional[int] = None, **kwargs ) -> Response: return await self.proxy_http_request("PUT", path, port, **kwargs)
[docs] async def proxy_http_delete( self, path: str, port: Optional[int] = None, **kwargs ) -> Response: return await self.proxy_http_request("DELETE", path, port, **kwargs)
proxy_http_request.__doc__ = _Service.proxy_http_request.__doc__ proxy_http_get.__doc__ = _Service.proxy_http_get.__doc__ proxy_http_post.__doc__ = _Service.proxy_http_post.__doc__ proxy_http_put.__doc__ = _Service.proxy_http_put.__doc__ proxy_http_delete.__doc__ = _Service.proxy_http_delete.__doc__
[docs]class PersistentVolume(AsyncObjectMixin, _PersistentVolume): """PersistentVolume."""
[docs]class PersistentVolumeClaim(AsyncObjectMixin, _PersistentVolumeClaim): """PersistentVolumeClaim."""
[docs]class HorizontalPodAutoscaler(AsyncObjectMixin, _HorizontalPodAutoscaler): """HorizontalPodAutoscaler."""
[docs]class StatefulSet(AsyncScalableMixin, AsyncObjectMixin, _StatefulSet): """StatefulSet."""
[docs]class Role(AsyncObjectMixin, _Role): """Role."""
[docs]class RoleBinding(AsyncObjectMixin, _RoleBinding): """RoleBinding."""
[docs]class ClusterRole(AsyncObjectMixin, _ClusterRole): """ClusterRole."""
[docs]class ClusterRoleBinding(AsyncObjectMixin, _ClusterRoleBinding): """ClusterRoleBinding."""
[docs]class PodSecurityPolicy(AsyncObjectMixin, _PodSecurityPolicy): """PodSecurityPolicy."""
[docs]class PodDisruptionBudget(AsyncObjectMixin, _PodDisruptionBudget): """PodDisruptionBudget."""
[docs]class CustomResourceDefinition(AsyncObjectMixin, _CustomResourceDefinition): """CustomResourceDefinition."""