from typing import Optional
from requests import Response
from pykube.http import HTTPClient
from pykube.objects import (
ObjectManager,
APIObject as _APIObject,
NamespacedAPIObject as _NamespacedAPIObject,
ConfigMap as _ConfigMap,
CronJob as _CronJob,
DaemonSet as _DaemonSet,
Deployment as _Deployment,
Endpoint as _Endpoint,
Event as _Event,
LimitRange as _LimitRange,
ResourceQuota as _ResourceQuota,
ServiceAccount as _ServiceAccount,
Ingress as _Ingress,
Job as _Job,
Namespace as _Namespace,
Node as _Node,
Pod as _Pod,
ReplicationController as _ReplicationController,
ReplicaSet as _ReplicaSet,
Secret as _Secret,
Service as _Service,
PersistentVolume as _PersistentVolume,
PersistentVolumeClaim as _PersistentVolumeClaim,
HorizontalPodAutoscaler as _HorizontalPodAutoscaler,
StatefulSet as _StatefulSet,
Role as _Role,
RoleBinding as _RoleBinding,
ClusterRole as _ClusterRole,
ClusterRoleBinding as _ClusterRoleBinding,
PodSecurityPolicy as _PodSecurityPolicy,
PodDisruptionBudget as _PodDisruptionBudget,
CustomResourceDefinition as _CustomResourceDefinition,
)
from dask_kubernetes.aiopykube.query import Query
from dask_kubernetes.aiopykube.mixins import AsyncScalableMixin, AsyncMixin
class AsyncObjectManager(ObjectManager):
def __call__(self, api: HTTPClient, namespace: str = None):
query = super().__call__(api=api, namespace=namespace)
return query._clone(Query)
class AsyncObjectMixin(AsyncMixin):
objects = AsyncObjectManager()
async def exists(self, ensure=False):
return await self._sync(super().exists, ensure=ensure)
async def create(self):
return await self._sync(super().create)
async def reload(self):
return await self._sync(super().reload)
def watch(self):
return super().watch()
async def patch(self, strategic_merge_patch, *, subresource=None):
return await self._sync(
super().patch, strategic_merge_patch, subresource=subresource
)
async def update(self, is_strategic=True, *, subresource=None):
return await self._sync(super().update, is_strategic, subresource=subresource)
async def delete(self, propagation_policy: str = None):
return await self._sync(super().delete, propagation_policy=propagation_policy)
exists.__doc__ = _APIObject.exists.__doc__
create.__doc__ = _APIObject.create.__doc__
reload.__doc__ = _APIObject.reload.__doc__
watch.__doc__ = _APIObject.watch.__doc__
patch.__doc__ = _APIObject.patch.__doc__
update.__doc__ = _APIObject.update.__doc__
delete.__doc__ = _APIObject.delete.__doc__
class APIObject(AsyncObjectMixin, _APIObject):
"""APIObject."""
class NamespacedAPIObject(AsyncObjectMixin, _NamespacedAPIObject):
"""APIObject."""
[docs]class ConfigMap(AsyncObjectMixin, _ConfigMap):
"""ConfigMap."""
[docs]class CronJob(AsyncObjectMixin, _CronJob):
"""CronJob."""
[docs]class DaemonSet(AsyncObjectMixin, _DaemonSet):
"""DaemonSet."""
[docs]class Deployment(AsyncScalableMixin, AsyncObjectMixin, _Deployment):
"""Deployment."""
[docs] async def rollout_undo(self, target_revision=None):
return await self._sync(super().rollout_undo, target_revision)
rollout_undo.__doc__ = _Deployment.rollout_undo.__doc__
[docs]class Endpoint(AsyncObjectMixin, _Endpoint):
"""Endpoint."""
[docs]class Event(AsyncObjectMixin, _Event):
"""Event."""
[docs]class LimitRange(AsyncObjectMixin, _LimitRange):
"""LimitRange."""
[docs]class ResourceQuota(AsyncObjectMixin, _ResourceQuota):
"""ResourceQuota."""
[docs]class ServiceAccount(AsyncObjectMixin, _ServiceAccount):
"""ServiceAccount."""
[docs]class Ingress(AsyncObjectMixin, _Ingress):
"""Ingress."""
[docs]class Job(AsyncScalableMixin, AsyncObjectMixin, _Job):
"""Job."""
[docs]class Namespace(AsyncObjectMixin, _Namespace):
"""Namespace."""
[docs]class Node(AsyncObjectMixin, _Node):
"""Node."""
[docs]class Pod(AsyncObjectMixin, _Pod):
"""Pod"""
[docs] async def logs(self, *args, **kwargs):
return await self._sync(super().logs, *args, **kwargs)
logs.__doc__ = _Pod.logs.__doc__
[docs]class ReplicationController(
AsyncScalableMixin, AsyncObjectMixin, _ReplicationController
):
"""ReplicationController."""
[docs]class ReplicaSet(AsyncScalableMixin, AsyncObjectMixin, _ReplicaSet):
"""ReplicaSet."""
[docs]class Secret(AsyncObjectMixin, _Secret):
"""Secret."""
[docs]class Service(AsyncObjectMixin, _Service):
"""Service."""
[docs] async def proxy_http_request(self, *args, **kwargs):
return await self._sync(super().proxy_http_request, *args, **kwargs)
[docs] async def proxy_http_get(
self, path: str, port: Optional[int] = None, **kwargs
) -> Response:
return await self.proxy_http_request("GET", path, port, **kwargs)
[docs] async def proxy_http_post(
self, path: str, port: Optional[int] = None, **kwargs
) -> Response:
return await self.proxy_http_request("POST", path, port, **kwargs)
[docs] async def proxy_http_put(
self, path: str, port: Optional[int] = None, **kwargs
) -> Response:
return await self.proxy_http_request("PUT", path, port, **kwargs)
[docs] async def proxy_http_delete(
self, path: str, port: Optional[int] = None, **kwargs
) -> Response:
return await self.proxy_http_request("DELETE", path, port, **kwargs)
proxy_http_request.__doc__ = _Service.proxy_http_request.__doc__
proxy_http_get.__doc__ = _Service.proxy_http_get.__doc__
proxy_http_post.__doc__ = _Service.proxy_http_post.__doc__
proxy_http_put.__doc__ = _Service.proxy_http_put.__doc__
proxy_http_delete.__doc__ = _Service.proxy_http_delete.__doc__
[docs]class PersistentVolume(AsyncObjectMixin, _PersistentVolume):
"""PersistentVolume."""
[docs]class PersistentVolumeClaim(AsyncObjectMixin, _PersistentVolumeClaim):
"""PersistentVolumeClaim."""
[docs]class HorizontalPodAutoscaler(AsyncObjectMixin, _HorizontalPodAutoscaler):
"""HorizontalPodAutoscaler."""
[docs]class StatefulSet(AsyncScalableMixin, AsyncObjectMixin, _StatefulSet):
"""StatefulSet."""
[docs]class Role(AsyncObjectMixin, _Role):
"""Role."""
[docs]class RoleBinding(AsyncObjectMixin, _RoleBinding):
"""RoleBinding."""
[docs]class ClusterRole(AsyncObjectMixin, _ClusterRole):
"""ClusterRole."""
[docs]class ClusterRoleBinding(AsyncObjectMixin, _ClusterRoleBinding):
"""ClusterRoleBinding."""
[docs]class PodSecurityPolicy(AsyncObjectMixin, _PodSecurityPolicy):
"""PodSecurityPolicy."""
[docs]class PodDisruptionBudget(AsyncObjectMixin, _PodDisruptionBudget):
"""PodDisruptionBudget."""
[docs]class CustomResourceDefinition(AsyncObjectMixin, _CustomResourceDefinition):
"""CustomResourceDefinition."""