Skip to content

kubex.client

Auto-generated reference for the kubex.client module.

BaseClient and factory

kubex.client.client

BaseClient

BaseClient(
    configuration: ClientConfiguration,
    options: ClientOptions | None = None,
)

Bases: ABC

Source code in kubex/client/client.py
def __init__(
    self,
    configuration: ClientConfiguration,
    options: ClientOptions | None = None,
) -> None:
    super().__init__()
    self._configuration = configuration
    self._options = options if options is not None else ClientOptions()
    self._inner_client: Any = self._create_inner_client()

connect_websocket async

connect_websocket(
    request: Request, subprotocols: Sequence[str]
) -> "WebSocketConnection"

Open a WebSocket connection for a streaming subresource.

.. warning::

Experimental. The WebSocket transport (used by exec, attach and portforward) is still under active development and the surrounding API may change in future releases without notice.

Source code in kubex/client/client.py
async def connect_websocket(
    self,
    request: Request,
    subprotocols: Sequence[str],
) -> "WebSocketConnection":
    """Open a WebSocket connection for a streaming subresource.

    .. warning::

       **Experimental.** The WebSocket transport (used by ``exec``,
       ``attach`` and ``portforward``) is still under active development
       and the surrounding API may change in future releases without
       notice.
    """
    raise NotImplementedError("WebSocket not supported by this client")

Client options

kubex.client.options

ClientOptions

Bases: BaseModel

Operational options for a kubex HTTP client.

These settings are per-process choices about how the HTTP client should behave at request time. They do not come from kubeconfig or the in-cluster environment — use :class:~kubex.configuration.ClientConfiguration for those.

Example::

from kubex.client import ClientOptions, create_client
from kubex.core.params import Timeout

client = await create_client(
    configuration=...,
    options=ClientOptions(
        log_api_warnings=False,
        timeout=Timeout(total=30.0),
        proxy="http://proxy.corp.example.com:8080",
        keep_alive_timeout=60.0,
        pool_size=50,
        ws_max_message_size=8 * 1024 * 1024,
    ),
)

Backend asymmetries

Some fields are silently ignored on certain backends (a :class:UserWarning is emitted at client construction time):

  • buffer_size: httpx has no equivalent buffer-size knob; the value is ignored and a warning is raised.
  • pool_size_per_host: httpx has no per-host pool limit; the value is ignored and a warning is raised.
  • keep_alive_timeout=None: aiohttp has no "unlimited keep-alive" mode. The value is ignored (aiohttp falls back to its own default) and a warning is raised.
  • proxy=dict: aiohttp supports only a single session-level proxy URL. The entry whose key matches the API server's URL scheme is used; all other entries are dropped with a warning. httpx applies all dict entries via mounts=.

buffer_size class-attribute instance-attribute

buffer_size: int | None | EllipsisType = Field(
    default_factory=lambda: ...
)

HTTP-response read buffer size in bytes.

Accepted values:

  • ... (default) — kubex default of 2**21 bytes (preserves current aiohttp behavior).
  • None — use the HTTP library's own default (aiohttp default: 2**16 bytes).
  • int > 0 — explicit buffer size in bytes.

Backend asymmetry: httpx has no equivalent buffer-size knob. This field is ignored on httpx and a :class:UserWarning is emitted if it is not ....

keep_alive class-attribute instance-attribute

keep_alive: bool = True

Whether to reuse idle connections (connection keep-alive).

Set to False to close each connection immediately after use. This maps to Limits(max_keepalive_connections=0) on httpx and TCPConnector(force_close=True) on aiohttp.

keep_alive_timeout class-attribute instance-attribute

keep_alive_timeout: float | None | EllipsisType = Field(
    default_factory=lambda: ...
)

Idle-connection lifetime in seconds.

Accepted values:

  • ... (default) — use the HTTP library's own default (httpx: 5 s; aiohttp: 15 s).
  • None — keep idle connections indefinitely. On httpx this passes keepalive_expiry=None; on aiohttp this is not supported (aiohttp has no "unlimited" mode) and a :class:UserWarning is emitted.
  • float >= 0 — explicit lifetime in seconds.

log_api_warnings class-attribute instance-attribute

log_api_warnings: bool = True

Emit Python :mod:warnings for any Warning HTTP headers returned by the API server.

The Kubernetes API server emits Warning: response headers to signal deprecated API usage (e.g. calling a removed API version). When this is True (the default), kubex converts each header value into a :class:UserWarning via :func:warnings.warn. Set to False to silence those warnings.

pool_size class-attribute instance-attribute

pool_size: int | None | EllipsisType = Field(
    default_factory=lambda: ...
)

Total connection pool size (all hosts combined).

Accepted values:

  • ... (default) — use the HTTP library's own default (httpx: 100; aiohttp: 100).
  • None — unlimited (passes None to httpx, 0 to aiohttp).
  • int > 0 — explicit connection limit.

pool_size_per_host class-attribute instance-attribute

pool_size_per_host: int | None | EllipsisType = Field(
    default_factory=lambda: ...
)

Per-host connection pool size.

Accepted values:

  • ... (default) — use the HTTP library's own default (aiohttp: 0, meaning no per-host limit).
  • None — unlimited (passes 0 to aiohttp).
  • int > 0 — explicit per-host connection limit.

Backend asymmetry: httpx has no per-host pool limit. This field is ignored on httpx and a :class:UserWarning is emitted if it is not ....

proxy class-attribute instance-attribute

proxy: str | dict[str, str] | None = None

Outbound HTTP proxy for all requests.

Accepted values:

  • None (default) — no proxy.
  • str — a single proxy URL applied to every request, e.g. "http://proxy.example.com:8080" or "http://user:[email protected]:8080" for basic auth.
  • dict[str, str] — per-scheme map with "http" and/or "https" keys, e.g. {"https": "http://proxy.example.com:8080"}. Only "http" and "https" scheme keys are accepted.

Backend asymmetry: httpx routes all dict entries via mounts=; aiohttp supports only a single session-level proxy URL, so only the entry matching the API server's URL scheme is used (others are dropped with a warning).

timeout class-attribute instance-attribute

timeout: TimeoutTypes | EllipsisType = Field(
    default_factory=lambda: ...
)

Default HTTP timeout for every request made by this client.

Accepted values:

  • ... (default) — use the HTTP library's own default timeout.
  • None — disable timeouts entirely.
  • int or float — treat the value as a total timeout in seconds; coerced to :class:~kubex.core.params.Timeout automatically.
  • :class:~kubex.core.params.Timeout — used as-is for fine-grained per-phase control.

Individual calls can override this via the request_timeout= parameter.

ws_max_message_size class-attribute instance-attribute

ws_max_message_size: int | None | EllipsisType = Field(
    default_factory=lambda: ...
)

Maximum WebSocket message size in bytes for exec/attach/portforward.

Accepted values:

  • ... (default) — kubex default of 2**21 bytes (preserves current behavior on both backends).
  • None — no cap on aiohttp (passes 0 to max_msg_size). On httpx, None is not supported; falls back to httpx-ws default (65536 bytes) and a :class:UserWarning is emitted.
  • int > 0 — explicit cap in bytes.

resolve_ws_max_message_size

resolve_ws_max_message_size(
    ws_max_message_size: int | None | EllipsisType,
) -> int

Resolve ws_max_message_size to a concrete integer for HTTP backends.

  • ...2**21 (kubex default; preserves pre-option behavior on both backends)
  • None0 (aiohttp treats 0 as "no cap"; httpx does not pass this at all)
  • int → that int
Source code in kubex/client/options.py
def resolve_ws_max_message_size(ws_max_message_size: int | None | EllipsisType) -> int:
    """Resolve ``ws_max_message_size`` to a concrete integer for HTTP backends.

    - ``...`` → ``2**21`` (kubex default; preserves pre-option behavior on both backends)
    - ``None`` → ``0`` (aiohttp treats 0 as "no cap"; httpx does not pass this at all)
    - ``int``  → that int
    """
    if isinstance(ws_max_message_size, EllipsisType):
        return 2**21
    if ws_max_message_size is None:
        return 0
    return ws_max_message_size

WebSocket abstraction

kubex.client.websocket

WebSocketConnection

Bases: ABC

Async WebSocket connection abstraction used by exec/attach/port-forward.

Implementations adapt a concrete WebSocket client (e.g. aiohttp or httpx-ws) to a uniform binary-frame interface. receive_bytes raises StopAsyncIteration once the peer closes the connection. Non-binary frames (e.g. text frames) are a protocol violation for the v5 channel protocol; implementations must surface them as KubexClientException rather than leaking backend-specific exceptions or silently coercing them to bytes.

Httpx client

kubex.client.httpx

HttpxWebSocketConnection

HttpxWebSocketConnection(
    cm: AbstractAsyncContextManager[AsyncWebSocketSession],
    session: AsyncWebSocketSession,
)

Bases: WebSocketConnection

Adapter wrapping an :class:httpx_ws.AsyncWebSocketSession.

Source code in kubex/client/httpx.py
def __init__(
    self,
    cm: AbstractAsyncContextManager[AsyncWebSocketSession],
    session: AsyncWebSocketSession,
) -> None:
    self._cm = cm
    self._session = session
    # ``_eof`` short-circuits further ``receive_bytes()`` calls once the
    # peer closes or the transport breaks. ``_cm_exited`` guards the
    # underlying ``httpx_ws`` context manager so its ``__aexit__`` runs
    # exactly once — the EOF path must not skip releasing the HTTP stream.
    self._eof = False
    self._cm_exited = False

Aiohttp client

kubex.client.aiohttp

AioHttpClient

AioHttpClient(
    configuration: ClientConfiguration,
    options: ClientOptions | None = None,
)

Bases: BaseClient

Source code in kubex/client/aiohttp.py
def __init__(
    self,
    configuration: ClientConfiguration,
    options: ClientOptions | None = None,
) -> None:
    self._default_headers = {
        constants.CONTENT_TYPE_HEADER: constants.APPLICATION_JSON_MIME_TYPE,
        constants.ACCEPT_HEADER: constants.APPLICATION_JSON_MIME_TYPE,
    }
    self._resolved_proxy: str | None = None
    super().__init__(configuration, options)

AioHttpWebSocketConnection

AioHttpWebSocketConnection(ws: ClientWebSocketResponse)

Bases: WebSocketConnection

Adapter wrapping an :class:aiohttp.ClientWebSocketResponse.

Source code in kubex/client/aiohttp.py
def __init__(self, ws: ClientWebSocketResponse) -> None:
    self._ws = ws
    self._client_closed = False
    self._peer_closed_clean = False