Skip to content

kubex.api

Auto-generated reference for the kubex.api module. Private descriptors (e.g. _LogsDescriptor) are excluded; they exist to wire the subresource accessors onto Api[T] and are not part of the public API.

Api class

kubex.api.api

Api

Api(
    resource_type: Type[ResourceType],
    client: BaseClient,
    *,
    namespace: NamespaceTypes = None,
)

Bases: Generic[ResourceType]

API for interacting with Kubernetes resource.

Source code in kubex/api/api.py
def __init__(
    self,
    resource_type: Type[ResourceType],
    client: BaseClient,
    *,
    namespace: NamespaceTypes = None,
) -> None:
    self._resource = resource_type
    self._client = client
    self._request_builder = RequestBuilder(
        resource_config=resource_type.__RESOURCE_CONFIG__,
    )
    self._namespace = namespace
    ensure_optional_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    self.metadata = MetadataAccessor(
        client=self._client,
        request_builder=self._request_builder,
        namespace=self._namespace,
        scope=self._resource.__RESOURCE_CONFIG__.scope,
        resource_type=self._resource,
    )

create async

create(
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Create a resource.

Parameters:

Name Type Description Default
data ResourceType

The resource instance to create.

required
namespace ApiNamespaceTypes

The namespace to create the resource in. If not provided, the namespace provided when creating the API will be used. The namespace is required for namespaced resources. If namespace is provided for cluster-scoped resources, an error will be raised.

Ellipsis
dry_run DryRunTypes

Whether to perform a dry run of the operation.

None
field_manager str

The value to use for the fieldManager attribute of the created resource.

None
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis

Returns: ResourceType: the created resource instance.

Source code in kubex/api/api.py
async def create(
    self,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Create a resource.

    Args:
        data (ResourceType): The resource instance to create.
        namespace: The namespace to create the resource in. If not provided,
            the namespace provided when creating the API will be used.
            The namespace is required for namespaced resources.
            If namespace is provided for cluster-scoped resources, an error will be raised.
        dry_run: Whether to perform a dry run of the operation.
        field_manager (str): The value to use for the fieldManager attribute of the created resource.
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    Returns:
        ResourceType: the created resource instance.
    """
    _namespace = ensure_required_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.create(
        _namespace,
        options,
        data.model_dump_json(by_alias=True, exclude_unset=True, exclude_none=True),
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource.model_validate_json(response.content)

delete async

delete(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    grace_period_seconds: int | None = None,
    propagation_policy: PropagationPolicyTypes = None,
    preconditions: Precondition | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status | ResourceType

Delete the specified resource.

Parameters:

Name Type Description Default
name str

The name of the resource to delete.

required
namespace ApiNamespaceTypes

The namespace of the namespaced resource to delete. If not provided, the namespace provided when creating the API will be used. The namespace is required for namespaced resources. If namespace is provided for cluster-scoped resources, an error will be raised.

Ellipsis
dry_run DryRunTypes

Whether to perform a dry run of the operation.

None
grace_period_seconds int | None

The duration in seconds before the object should be deleted.

None
propagation_policy PropagationPolicyTypes

Whether and how garbage collection will be performed.

None
preconditions Precondition | None

Preconditions for the operation.

None
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis

Returns: Status: the resource has been fully deleted. ResourceType: the resource instance deletion process has started, but the resource is not gone yet due to finalization process. For details see Resource deletion documentation.

Source code in kubex/api/api.py
async def delete(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    grace_period_seconds: int | None = None,
    propagation_policy: PropagationPolicyTypes = None,
    preconditions: Precondition | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status | ResourceType:
    """Delete the specified resource.

    Args:
        name: The name of the resource to delete.
        namespace: The namespace of the namespaced resource to delete. If not provided,
            the namespace provided when creating the API will be used.
            The namespace is required for namespaced resources.
            If namespace is provided for cluster-scoped resources, an error will be raised.
        dry_run: Whether to perform a dry run of the operation.
        grace_period_seconds: The duration in seconds before the object should be deleted.
        propagation_policy: Whether and how garbage collection will be performed.
        preconditions: Preconditions for the operation.
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    Returns:
        Status: the resource has been fully deleted.
        ResourceType: the resource instance deletion process has started,
            but the resource is not gone yet due to finalization process.
            For details see
            [Resource deletion documentation](https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-deletion).
    """
    _namespace = ensure_required_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = DeleteOptions(
        dry_run=dry_run,
        grace_period_seconds=grace_period_seconds,
        propagation_policy=propagation_policy,
        preconditions=preconditions,
    )
    request = self._request_builder.delete(
        name, _namespace, options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    try:
        return Status.model_validate_json(response.content)
    except ValidationError:
        return self._resource.model_validate_json(response.content)

delete_collection async

delete_collection(
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    dry_run: DryRunTypes = None,
    grace_period_seconds: int | None = None,
    propagation_policy: PropagationPolicyTypes = None,
    preconditions: Precondition | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status | ListEntity[ResourceType]

Delete collection of resources.

Parameters:

Name Type Description Default
timeout_seconds int | None

Server-side timeout (in seconds) for the call; sent as the Kubernetes timeoutSeconds query parameter. For an HTTP client-side timeout, use request_timeout.

None
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis
Source code in kubex/api/api.py
async def delete_collection(
    self,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    dry_run: DryRunTypes = None,
    grace_period_seconds: int | None = None,
    propagation_policy: PropagationPolicyTypes = None,
    preconditions: Precondition | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status | ListEntity[ResourceType]:
    """Delete collection of resources.

    Args:
        timeout_seconds: Server-side timeout (in seconds) for the call;
            sent as the Kubernetes ``timeoutSeconds`` query parameter. For an
            HTTP client-side timeout, use ``request_timeout``.
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    """
    _namespace = ensure_optional_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    list_options = ListOptions(
        label_selector=label_selector,
        field_selector=field_selector,
        timeout_seconds=timeout_seconds,
        limit=limit,
        continue_token=continue_token,
        version_match=version_match,
        resource_version=resource_version,
    )
    delete_options = DeleteOptions(
        dry_run=dry_run,
        grace_period_seconds=grace_period_seconds,
        propagation_policy=propagation_policy,
        preconditions=preconditions,
    )
    request = self._request_builder.delete_collection(
        _namespace,
        list_options,
        delete_options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    list_model = self._resource.__RESOURCE_CONFIG__.list_model
    try:
        return Status.model_validate_json(response.content)
    except ValidationError:
        return list_model.model_validate_json(response.content)

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Read the specified resource.

Parameters:

Name Type Description Default
name str

The name of the resource to read.

required
namespace ApiNamespaceTypes

The namespace of the namespaced resource to read. If not provided, the namespace provided when creating the API will be used. The namespace is required for namespaced resources. If namespace is provided for cluster-scoped resources, an error will be raised.

Ellipsis
resource_version ResourceVersionTypes

The resource version to read. If not provided, the current resource version will be read. For details look at Resource Version Semantics documentation,

None
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis

Returns: ResourceType: the resource instance.

Source code in kubex/api/api.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Read the specified resource.

    Args:
        name: The name of the resource to read.
        namespace: The namespace of the namespaced resource to read. If not provided,
            the namespace provided when creating the API will be used.
            The namespace is required for namespaced resources.
            If namespace is provided for cluster-scoped resources, an error will be raised.
        resource_version: The resource version to read. If not provided,
            the current resource version will be read. For details look at
            [Resource Version Semantics documentation](https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list),
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    Returns:
        ResourceType: the resource instance.
    """
    _namespace = ensure_required_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = GetOptions(resource_version=resource_version)
    request = self._request_builder.get(
        name, _namespace, options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return self._resource.model_validate_json(response.content)

list async

list(
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ListEntity[ResourceType]

List objects of kind.

Parameters:

Name Type Description Default
namespace ApiNamespaceTypes

The namespace of the namespaced resource to list. If not provided, the namespace provided when creating the API will be used. If namespace is provided for cluster-scoped resources, an error will be raised.

Ellipsis
label_selector str | None

A selector to restrict the list of returned objects by their labels. For details look at Label Selectors documentation.

None
field_selector str | None

A selector to restrict the list of returned objects by their fields. For details look at Field Selectors documentation.

None
timeout_seconds int | None

Server-side timeout (in seconds) for the list/watch call; sent as the Kubernetes timeoutSeconds query parameter. For an HTTP client-side timeout, use request_timeout.

None
limit int | None

The maximum number of items to return.

None
continue_token str | None

The continue token for the list call.

None
version_match VersionMatch | None

Whether to watch for changes to a resource.

None
resource_version ResourceVersionTypes

The resource version to list. If not provided, the current resource version will be listed. For details look at Resource Version Semantics documentation,

None
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis

Returns: ListEntity[ResourceType]: the list of resource.

Source code in kubex/api/api.py
async def list(
    self,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ListEntity[ResourceType]:
    """List objects of kind.

    Args:
        namespace: The namespace of the namespaced resource to list. If not provided,
            the namespace provided when creating the API will be used.
            If namespace is provided for cluster-scoped resources, an error will be raised.
        label_selector: A selector to restrict the list of returned objects by their labels.
            For details look at [Label Selectors documentation](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors).
        field_selector: A selector to restrict the list of returned objects by their fields.
            For details look at [Field Selectors documentation](https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/).
        timeout_seconds: Server-side timeout (in seconds) for the list/watch call;
            sent as the Kubernetes ``timeoutSeconds`` query parameter. For an
            HTTP client-side timeout, use ``request_timeout``.
        limit: The maximum number of items to return.
        continue_token: The continue token for the list call.
        version_match: Whether to watch for changes to a resource.
        resource_version: The resource version to list. If not provided,
            the current resource version will be listed. For details look at
            [Resource Version Semantics documentation](https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list),
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    Returns:
        ListEntity[ResourceType]: the list of resource.
    """
    _namespace = ensure_optional_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = ListOptions(
        label_selector=label_selector,
        field_selector=field_selector,
        timeout_seconds=timeout_seconds,
        limit=limit,
        continue_token=continue_token,
        version_match=version_match,
        resource_version=resource_version,
    )
    request = self._request_builder.list(
        _namespace, options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    list_model = self._resource.__RESOURCE_CONFIG__.list_model
    return list_model.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Patch the specified resource.

Parameters:

Name Type Description Default
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis
Source code in kubex/api/api.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Patch the specified resource.

    Args:
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    """
    _namespace = ensure_required_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch(
        name, _namespace, options, patch, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return self._resource.model_validate_json(response.content)

replace async

replace(
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Replace the specified resource.

Parameters:

Name Type Description Default
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default.

Ellipsis
Source code in kubex/api/api.py
async def replace(
    self,
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Replace the specified resource.

    Args:
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default.
    """
    _namespace = ensure_required_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.replace(
        name,
        _namespace,
        options,
        data.model_dump_json(by_alias=True, exclude_unset=True, exclude_none=True),
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource.model_validate_json(response.content)

watch async

watch(
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    allow_bookmarks: bool | None = None,
    send_initial_events: bool | None = None,
    timeout_seconds: int | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[WatchEvent[ResourceType], None]

Watch for changes to the specified resource.

Parameters:

Name Type Description Default
request_timeout ApiRequestTimeoutTypes

HTTP-level timeout override for this call. A number is interpreted as the total timeout in seconds. Pass None to disable timeouts entirely for this call. Omit to use the client default. For long-lived watches a short read/total timeout will terminate the stream; disable the read timeout or leave this unset.

Ellipsis
Source code in kubex/api/api.py
async def watch(
    self,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    allow_bookmarks: bool | None = None,
    send_initial_events: bool | None = None,
    timeout_seconds: int | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[WatchEvent[ResourceType], None]:
    """Watch for changes to the specified resource.

    Args:
        request_timeout: HTTP-level timeout override for this call. A number is
            interpreted as the total timeout in seconds. Pass ``None`` to disable
            timeouts entirely for this call. Omit to use the client default. For
            long-lived watches a short read/total timeout will terminate the
            stream; disable the read timeout or leave this unset.
    """
    _namespace = ensure_optional_namespace(
        namespace, self._namespace, self._resource.__RESOURCE_CONFIG__.scope
    )
    options = WatchOptions(
        label_selector=label_selector,
        field_selector=field_selector,
        allow_bookmarks=allow_bookmarks,
        send_initial_events=send_initial_events,
        timeout_seconds=timeout_seconds,
    )
    request = self._request_builder.watch(
        _namespace,
        options,
        resource_version=resource_version,
        request_timeout=request_timeout,
    )
    async for line in self._client.stream_lines(request):
        yield WatchEvent(self._resource, json.loads(line))

create_api async

create_api(
    resource_type: Type[ResourceType],
    *,
    client: BaseClient | None = None,
    namespace: NamespaceTypes = None,
) -> Api[ResourceType]

Create an API for the specified resource type.

Parameters:

Name Type Description Default
resource_type Type[ResourceType]

The resource type to create an API for.

required
client BaseClient | None

The client to use for the API. If not provided, a new client will be created.

None
namespace NamespaceTypes

The namespace to use for the API. If set all operations will be performed in this namespace. The Api namespace can be overridden by passing a namespace to the individual methods.

None

Returns: An Api instance for the specified resource type.

Source code in kubex/api/api.py
async def create_api(
    resource_type: Type[ResourceType],
    *,
    client: BaseClient | None = None,
    namespace: NamespaceTypes = None,
) -> Api[ResourceType]:
    """Create an API for the specified resource type.

    Args:
        resource_type: The resource type to create an API for.
        client: The client to use for the API. If not provided,
            a new client will be created.
        namespace: The namespace to use for the API. If set all
            operations will be performed in this namespace.
            The Api namespace can be overridden by passing a
            namespace to the individual methods.
    Returns:
        An Api instance for the specified resource type.
    """
    client = client or await create_client()
    return Api(resource_type, client=client, namespace=namespace)

Logs subresource

kubex.api._logs

LogsAccessor

LogsAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for logs subresource operations.

Source code in kubex/api/_logs.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    container: str | None = None,
    limit_bytes: int | None = None,
    pretty: bool | None = None,
    previous: bool | None = None,
    since_seconds: int | None = None,
    tail_lines: int | None = None,
    timestamps: bool | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> str

Read logs of the specified resource.

Source code in kubex/api/_logs.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    container: str | None = None,
    limit_bytes: int | None = None,
    pretty: bool | None = None,
    previous: bool | None = None,
    since_seconds: int | None = None,
    tail_lines: int | None = None,
    timestamps: bool | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> str:
    """Read logs of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = LogOptions(
        container=container,
        limit_bytes=limit_bytes,
        pretty=pretty,
        previous=previous,
        since_seconds=since_seconds,
        tail_lines=tail_lines,
        timestamps=timestamps,
    )
    request = self._request_builder.logs(
        name, _namespace, options=options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return response.text

stream async

stream(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    container: str | None = None,
    limit_bytes: int | None = None,
    pretty: bool | None = None,
    previous: bool | None = None,
    since_seconds: int | None = None,
    tail_lines: int | None = None,
    timestamps: bool | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[str, None]

Stream logs of the specified resource.

Source code in kubex/api/_logs.py
async def stream(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    container: str | None = None,
    limit_bytes: int | None = None,
    pretty: bool | None = None,
    previous: bool | None = None,
    since_seconds: int | None = None,
    tail_lines: int | None = None,
    timestamps: bool | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[str, None]:
    """Stream logs of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = LogOptions(
        container=container,
        limit_bytes=limit_bytes,
        pretty=pretty,
        previous=previous,
        since_seconds=since_seconds,
        tail_lines=tail_lines,
        timestamps=timestamps,
    )
    request = self._request_builder.stream_logs(
        name, _namespace, options=options, request_timeout=request_timeout
    )
    async for line in self._client.stream_lines(request):
        yield line

Scale subresource

kubex.api._scale

ScaleAccessor

ScaleAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for scale subresource operations.

Source code in kubex/api/_scale.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale

Read the scale of the specified resource.

Source code in kubex/api/_scale.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale:
    """Read the scale of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    request = self._request_builder.get_subresource(
        SCALE_SUBRESOURCE, name, _namespace, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return Scale.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale

Patch the scale of the specified resource.

Source code in kubex/api/_scale.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale:
    """Patch the scale of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch_subresource(
        SCALE_SUBRESOURCE,
        name,
        _namespace,
        options=options,
        patch=patch,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return Scale.model_validate_json(response.content)

replace async

replace(
    name: str,
    scale: Scale,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale

Replace the scale of the specified resource.

Source code in kubex/api/_scale.py
async def replace(
    self,
    name: str,
    scale: Scale,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Scale:
    """Replace the scale of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.replace_subresource(
        SCALE_SUBRESOURCE,
        name,
        _namespace,
        data=scale.model_dump_json(
            by_alias=True, exclude_unset=True, exclude_none=True
        ),
        options=options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return Scale.model_validate_json(response.content)

Status subresource

kubex.api._status

StatusAccessor

StatusAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for status subresource operations.

Source code in kubex/api/_status.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Read the status of the specified resource.

Source code in kubex/api/_status.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Read the status of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    request = self._request_builder.get_subresource(
        STATUS_SUBRESOURCE, name, _namespace, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Patch the status of the specified resource.

Source code in kubex/api/_status.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Patch the status of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch_subresource(
        STATUS_SUBRESOURCE,
        name,
        _namespace,
        options=options,
        patch=patch,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

replace async

replace(
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Replace the status of the specified resource.

Source code in kubex/api/_status.py
async def replace(
    self,
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Replace the status of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.replace_subresource(
        STATUS_SUBRESOURCE,
        name,
        _namespace,
        data=data.model_dump_json(
            by_alias=True, exclude_unset=True, exclude_none=True
        ),
        options=options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

Eviction subresource

kubex.api._eviction

EvictionAccessor

EvictionAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for eviction subresource operations.

Source code in kubex/api/_eviction.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

create async

create(
    name: str,
    eviction: Eviction,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status

Create an eviction for the specified resource.

Source code in kubex/api/_eviction.py
async def create(
    self,
    name: str,
    eviction: Eviction,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> Status:
    """Create an eviction for the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.create_subresource(
        EVICTION_SUBRESOURCE,
        name,
        _namespace,
        data=eviction.model_dump_json(
            by_alias=True, exclude_unset=True, exclude_none=True
        ),
        options=options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return Status.model_validate_json(response.content)

Ephemeral containers subresource

kubex.api._ephemeral_containers

EphemeralContainersAccessor

EphemeralContainersAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for ephemeral containers subresource operations.

Source code in kubex/api/_ephemeral_containers.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Read the ephemeral containers of the specified resource.

Source code in kubex/api/_ephemeral_containers.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Read the ephemeral containers of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    request = self._request_builder.get_subresource(
        EPHEMERAL_CONTAINERS_SUBRESOURCE,
        name,
        _namespace,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Patch the ephemeral containers of the specified resource.

Source code in kubex/api/_ephemeral_containers.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Patch the ephemeral containers of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch_subresource(
        EPHEMERAL_CONTAINERS_SUBRESOURCE,
        name,
        _namespace,
        options=options,
        patch=patch,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

replace async

replace(
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Replace the ephemeral containers of the specified resource.

Source code in kubex/api/_ephemeral_containers.py
async def replace(
    self,
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Replace the ephemeral containers of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.replace_subresource(
        EPHEMERAL_CONTAINERS_SUBRESOURCE,
        name,
        _namespace,
        data=data.model_dump_json(
            by_alias=True, exclude_unset=True, exclude_none=True
        ),
        options=options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

Resize subresource

kubex.api._resize

ResizeAccessor

ResizeAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for resize subresource operations.

Source code in kubex/api/_resize.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Read the resize of the specified resource.

Source code in kubex/api/_resize.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Read the resize of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    request = self._request_builder.get_subresource(
        RESIZE_SUBRESOURCE, name, _namespace, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Patch the resize of the specified resource.

Source code in kubex/api/_resize.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Patch the resize of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch_subresource(
        RESIZE_SUBRESOURCE,
        name,
        _namespace,
        options=options,
        patch=patch,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

replace async

replace(
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType

Replace the resize of the specified resource.

Source code in kubex/api/_resize.py
async def replace(
    self,
    name: str,
    data: ResourceType,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ResourceType:
    """Replace the resize of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PostOptions(dry_run=dry_run, field_manager=field_manager)
    request = self._request_builder.replace_subresource(
        RESIZE_SUBRESOURCE,
        name,
        _namespace,
        data=data.model_dump_json(
            by_alias=True, exclude_unset=True, exclude_none=True
        ),
        options=options,
        request_timeout=request_timeout,
    )
    response = await self._client.request(request)
    return self._resource_type.model_validate_json(response.content)

Exec subresource

kubex.api._exec

ExecAccessor

ExecAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    channel_protocols: tuple[
        ChannelProtocol, ...
    ] = DEFAULT_PROTOCOLS,
)

Bases: Generic[ResourceType]

Accessor for the Pod exec subresource.

.. warning::

Experimental. The WebSocket-based subresources (exec, attach, portforward) are still under active development and their API may change in future releases without notice.

Source code in kubex/api/_exec.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    channel_protocols: tuple[ChannelProtocol, ...] = DEFAULT_PROTOCOLS,
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type
    self._channel_protocols = channel_protocols

run async

run(
    name: str,
    *,
    command: Sequence[str],
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bytes | None = None,
    stdout: bool = True,
    stderr: bool = True,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ExecResult

Run a command and collect stdout/stderr until the channel closes.

.. warning::

Experimental. This WebSocket-based API is still under active development and may change in future releases without notice.

Unlike :meth:stream, the request_timeout bound (when provided) applies to both the handshake (via the per-call HTTP timeout propagated to the WebSocket upgrade) and the post-handshake command execution + status collection (via a separate wall-clock cancel scope that does not envelope teardown — see below). The wall-clock bound is derived from Timeout.total if set, otherwise Timeout.read (read's strict per-byte-inactivity semantics do not map cleanly onto a long-lived WebSocket session, so it is treated as a call-level upper bound here as a pragmatic approximation that prevents indefinite hangs). Ellipsis defers to the client default for the handshake without imposing an additional wall-clock bound; an explicit None disables timeouts entirely.

Source code in kubex/api/_exec.py
async def run(
    self,
    name: str,
    *,
    command: Sequence[str],
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bytes | None = None,
    stdout: bool = True,
    stderr: bool = True,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ExecResult:
    """Run a command and collect stdout/stderr until the channel closes.

    .. warning::

       **Experimental.** This WebSocket-based API is still under active
       development and may change in future releases without notice.

    Unlike :meth:`stream`, the ``request_timeout`` bound (when provided)
    applies to both the handshake (via the per-call HTTP timeout
    propagated to the WebSocket upgrade) and the post-handshake command
    execution + status collection (via a separate wall-clock cancel scope
    that does not envelope teardown — see below). The wall-clock bound is
    derived from ``Timeout.total`` if set, otherwise ``Timeout.read``
    (read's strict per-byte-inactivity semantics do not map cleanly onto
    a long-lived WebSocket session, so it is treated as a call-level
    upper bound here as a pragmatic approximation that prevents
    indefinite hangs). ``Ellipsis`` defers to the client default for the
    handshake without imposing an additional wall-clock bound; an
    explicit ``None`` disables timeouts entirely.
    """
    # Resolve the wall-clock bound for the post-handshake portion.
    # ``Ellipsis`` and a ``Timeout`` whose ``total`` and ``read`` are
    # both unset leave ``delay`` as ``None`` — ``anyio.fail_after(None)``
    # is a no-op cancel scope, so the only bound on the call in those
    # cases is whatever the handshake timeout enforces (set via
    # ``request_timeout`` propagation below).
    delay: float | None = None
    if request_timeout is not Ellipsis:
        coerced: Timeout | None = Timeout.coerce(request_timeout)
        if coerced is not None:
            delay = coerced.total if coerced.total is not None else coerced.read

    # Bypass ``stream()`` to use unbounded per-channel buffers. The
    # default bounded buffers exist to apply backpressure when the
    # consumer iterates lazily; ``run()`` instead spawns drainer tasks
    # that race with the read loop, and the read loop is started in
    # ``StreamSession.__aenter__`` *before* those drainers are scheduled.
    # That scheduling gap means a fast command emitting many frames
    # could fill the bounded buffer and trigger the local
    # close-on-overflow path before the drainers ran even once,
    # silently truncating ``ExecResult.stdout`` / ``stderr``. Unbounded
    # buffers are safe here because ``run`` collects all output in
    # memory anyway — OOM risk is identical.
    #
    # The handshake is bounded by ``request_timeout`` propagation into
    # the per-backend WebSocket upgrade (see ``HttpxClient`` /
    # ``AioHttpClient``). The wall-clock ``fail_after`` below covers
    # only the post-handshake portion — keeping it *outside* the
    # ``async with session`` block ensures ``StreamSession.__aexit__``
    # (and the underlying ``connection.close()``) runs in an
    # un-cancelled scope so the HTTP/WebSocket transport is always
    # released, even when the deadline fires mid-call.
    session = await self._open_session(
        name,
        command=command,
        container=container,
        namespace=namespace,
        stdin=stdin is not None,
        stdout=stdout,
        stderr=stderr,
        tty=False,
        request_timeout=request_timeout,
        buffer_size=math.inf,
    )
    async with session:
        stdout_buf = bytearray()
        stderr_buf = bytearray()
        status: Status | None = None

        async def _drain_stdout() -> None:
            async for chunk in session.stdout:
                stdout_buf.extend(chunk)

        async def _drain_stderr() -> None:
            async for chunk in session.stderr:
                stderr_buf.extend(chunk)

        async def _write_stdin() -> None:
            if stdin is not None:
                await session.stdin.write(stdin)
                await session.close_stdin()

        # ``anyio.fail_after`` raises a bare ``TimeoutError`` at scope
        # exit when the deadline fires. The exec layer normalises every
        # other WebSocket-level failure to ``KubexClientException``
        # (see ``_resolve_protocol`` and the per-backend handshake
        # handlers), so convert here too — callers that
        # ``except KubexClientException`` for exec errors must observe
        # a timed-out ``run()`` consistently.
        try:
            with anyio.fail_after(delay):
                async with anyio.create_task_group() as tg:
                    tg.start_soon(_drain_stdout)
                    tg.start_soon(_drain_stderr)
                    tg.start_soon(_write_stdin)

                status = await session.wait_for_status()
        except TimeoutError as exc:
            raise KubexClientException(
                f"exec call exceeded {delay}s wall-clock bound"
            ) from exc
        except BaseExceptionGroup as eg:
            # anyio 4 wraps exceptions raised by tasks in a task group
            # in a ``BaseExceptionGroup`` even when only a single task
            # failed. Unwrap a single-exception group so callers that
            # catch ``KubexClientException`` (the documented exec
            # exception type) observe transport-level failures from
            # ``_drain_*`` or ``_write_stdin`` directly instead of
            # having to reach into an exception group.
            if len(eg.exceptions) == 1:
                raise eg.exceptions[0] from None
            raise
    return ExecResult(
        stdout=bytes(stdout_buf), stderr=bytes(stderr_buf), status=status
    )

stream async

stream(
    name: str,
    *,
    command: Sequence[str],
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[StreamSession]

Open a bidirectional exec session as an async context manager.

.. warning::

Experimental. This WebSocket-based API is still under active development and may change in future releases without notice.

Source code in kubex/api/_exec.py
@asynccontextmanager
async def stream(
    self,
    name: str,
    *,
    command: Sequence[str],
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[StreamSession]:
    """Open a bidirectional exec session as an async context manager.

    .. warning::

       **Experimental.** This WebSocket-based API is still under active
       development and may change in future releases without notice.
    """
    session = await self._open_session(
        name,
        command=command,
        container=container,
        namespace=namespace,
        stdin=stdin,
        stdout=stdout,
        stderr=stderr,
        tty=tty,
        request_timeout=request_timeout,
        buffer_size=None,
    )
    async with session:
        yield session

ExecResult dataclass

ExecResult(
    stdout: bytes,
    stderr: bytes,
    status: Status | None = None,
)

Result of a one-shot ExecAccessor.run call.

exit_code property

exit_code: int | None

Parse the exit code from status.

Returns 0 for a Success status, the integer parsed from status.details.causes (where reason == "ExitCode") for a non-zero exit, or None when status is missing or carries no recognisable exit information. Note that None does not imply success — for failures without a recognisable exit code (e.g. the container could not start), inspect status directly.

Attach subresource

kubex.api._attach

AttachAccessor

AttachAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    *,
    channel_protocols: tuple[
        ChannelProtocol, ...
    ] = DEFAULT_PROTOCOLS,
)

Bases: Generic[ResourceType]

Accessor for the Pod attach subresource.

.. warning::

Experimental. The WebSocket-based subresources (exec, attach, portforward) are still under active development and their API may change in future releases without notice.

Source code in kubex/api/_attach.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    *,
    channel_protocols: tuple[ChannelProtocol, ...] = DEFAULT_PROTOCOLS,
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type
    self._channel_protocols = channel_protocols

stream async

stream(
    name: str,
    *,
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[StreamSession]

Open a bidirectional attach session as an async context manager.

.. warning::

Experimental. This WebSocket-based API is still under active development and may change in future releases without notice.

Source code in kubex/api/_attach.py
@asynccontextmanager
async def stream(
    self,
    name: str,
    *,
    container: str | None = None,
    namespace: ApiNamespaceTypes = Ellipsis,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[StreamSession]:
    """Open a bidirectional attach session as an async context manager.

    .. warning::

       **Experimental.** This WebSocket-based API is still under active
       development and may change in future releases without notice.
    """
    session = await self._open_session(
        name,
        container=container,
        namespace=namespace,
        stdin=stdin,
        stdout=stdout,
        stderr=stderr,
        tty=tty,
        request_timeout=request_timeout,
    )
    async with session:
        yield session

Portforward subresource

kubex.api._portforward

PortForwardStream

PortForwardStream(
    session: PortForwardSession,
    port: int,
    data_channel_id: int,
    recv_stream: MemoryObjectReceiveStream[bytes],
)

Bases: ByteStream

Per-port bidirectional byte stream for the Kubernetes portforward subresource.

Wraps a PortForwardSession's data channel for a single port and implements anyio.abc.ByteStream so it integrates naturally with anyio byte-stream consumers.

Outbound send() encodes the payload as a channel frame addressed to the port's data channel. Inbound receive() reads from the session's per-port memory stream, buffering excess bytes to satisfy max_bytes slicing.

aclose() half-closes the data channel (idempotent via the underlying session helper) and prevents further send() calls. send_eof() performs the same half-close without locking out receives. On the v4 portforward subprotocol — which the kubelet requires — half-close is a local-only state change; the kubelet ignores wire CHANNEL_CLOSE frames on v4 and tears down the channel only when the session WebSocket closes.

Source code in kubex/api/_portforward.py
def __init__(
    self,
    session: PortForwardSession,
    port: int,
    data_channel_id: int,
    recv_stream: MemoryObjectReceiveStream[bytes],
) -> None:
    self._session = session
    self._port = port
    self._data_channel_id = data_channel_id
    self._recv_stream = recv_stream
    self._buffer = b""
    self._send_closed = False

PortForwarder

PortForwarder(session: PortForwardSession)

Holds an active portforward session and exposes per-port streams and errors.

streams[port] is a PortForwardStream (anyio.abc.ByteStream) for sending and receiving raw bytes to/from the remote port. errors[port] is an async iterator of error-text strings sent by the kubelet on the error channel for that port. port_data_truncated[port] reflects the live overflow flag — True when the per-port data buffer filled and the stream was closed locally.

Source code in kubex/api/_portforward.py
def __init__(self, session: PortForwardSession) -> None:
    self._session = session

PortforwardAccessor

PortforwardAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    channel_protocols: tuple[
        ChannelProtocol, ...
    ] = PORTFORWARD_PROTOCOLS,
)

Bases: Generic[ResourceType]

Accessor for the Pod portforward subresource.

.. warning::

Experimental. The WebSocket-based subresources (exec, attach, portforward) are still under active development and their API may change in future releases without notice.

Source code in kubex/api/_portforward.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
    channel_protocols: tuple[ChannelProtocol, ...] = PORTFORWARD_PROTOCOLS,
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type
    self._channel_protocols = channel_protocols

forward async

forward(
    name: str,
    *,
    ports: Sequence[int],
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[PortForwarder]

Open portforward streams to the given ports as an async context manager.

.. warning::

Experimental. This WebSocket-based API is still under active development and may change in future releases without notice.

This is the low-level entry point: a single WebSocket multiplexes all requested ports, and the caller drives I/O directly in Python via per-port anyio.abc.ByteStream objects. No sockets are bound on the host — bytes never leave the process. Use this when your own code speaks to the pod (custom protocols, embedded clients, tests).

For the kubectl-style mode where external processes connect through a real local TCP port, use :meth:listen instead (which is built on top of this method).

Yields a PortForwarder exposing per-port ByteStream objects (pf.streams[port]) and per-port error iterators (pf.errors[port]).

Source code in kubex/api/_portforward.py
@asynccontextmanager
async def forward(
    self,
    name: str,
    *,
    ports: Sequence[int],
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[PortForwarder]:
    """Open portforward streams to the given ports as an async context manager.

    .. warning::

       **Experimental.** This WebSocket-based API is still under active
       development and may change in future releases without notice.

    This is the **low-level** entry point: a single WebSocket multiplexes
    all requested ports, and the caller drives I/O directly in Python via
    per-port ``anyio.abc.ByteStream`` objects. No sockets are bound on the
    host — bytes never leave the process. Use this when your own code
    speaks to the pod (custom protocols, embedded clients, tests).

    For the kubectl-style mode where external processes connect through
    a real local TCP port, use :meth:`listen` instead (which is built on
    top of this method).

    Yields a ``PortForwarder`` exposing per-port ``ByteStream`` objects
    (``pf.streams[port]``) and per-port error iterators (``pf.errors[port]``).
    """
    session = await self._open_session(
        name,
        ports=ports,
        namespace=namespace,
        request_timeout=request_timeout,
    )
    async with session:
        yield PortForwarder(session)

listen async

listen(
    name: str,
    *,
    port_map: Mapping[int, int],
    local_host: str = "127.0.0.1",
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[None]

Open local TCP listeners and forward bytes bidirectionally to remote ports.

.. warning::

Experimental. This WebSocket-based API is still under active development and may change in future releases without notice.

This is the high-level, kubectl-style entry point: real OS sockets are bound on local_host:local_port so that any process on the host (curl, psql, a browser, …) can connect to the pod through a local port. Each accepted local connection opens its own portforward WebSocket session bound to that single remote port — one session per connection, matching kubectl port-forward semantics. The method itself yields None; you don't drive I/O through it.

For the low-level mode where your own Python code reads/writes bytes directly without binding any sockets, use :meth:forward instead (which this method is built on top of).

port_map maps remote port (kubelet-side) to local port. Example: {80: 18080} opens a local listener on port 18080 that forwards to the pod's port 80.

Source code in kubex/api/_portforward.py
@asynccontextmanager
async def listen(
    self,
    name: str,
    *,
    port_map: Mapping[int, int],
    local_host: str = "127.0.0.1",
    namespace: ApiNamespaceTypes = Ellipsis,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncIterator[None]:
    """Open local TCP listeners and forward bytes bidirectionally to remote ports.

    .. warning::

       **Experimental.** This WebSocket-based API is still under active
       development and may change in future releases without notice.

    This is the **high-level**, kubectl-style entry point: real OS sockets
    are bound on ``local_host:local_port`` so that any process on the host
    (``curl``, ``psql``, a browser, …) can connect to the pod through a
    local port. Each accepted local connection opens its own portforward
    WebSocket session bound to that single remote port — one session per
    connection, matching ``kubectl port-forward`` semantics. The method
    itself yields ``None``; you don't drive I/O through it.

    For the low-level mode where your own Python code reads/writes bytes
    directly without binding any sockets, use :meth:`forward` instead
    (which this method is built on top of).

    ``port_map`` maps **remote port** (kubelet-side) to **local port**.
    Example: ``{80: 18080}`` opens a local listener on port 18080 that
    forwards to the pod's port 80.
    """
    if not port_map:
        raise ValueError("port_map must contain at least one entry")
    # Validate ports up front so a config error fails at context entry
    # instead of being deferred to the first incoming connection where
    # it would surface as a swallowed log line. Booleans are rejected
    # explicitly because ``bool`` is an ``int`` subclass — without this,
    # ``port_map={80: False}`` would silently bind port 0 (ephemeral),
    # which the caller has no way to discover since ``listen()`` yields
    # ``None``.
    for remote_port, local_port in port_map.items():
        if isinstance(remote_port, bool) or not isinstance(remote_port, int):
            raise TypeError(
                f"remote port must be int, got {type(remote_port).__name__}"
            )
        if not 1 <= remote_port <= 65535:
            raise ValueError(f"remote port {remote_port} out of range 1..65535")
        if isinstance(local_port, bool) or not isinstance(local_port, int):
            raise TypeError(
                f"local port must be int, got {type(local_port).__name__}"
            )
        if not 1 <= local_port <= 65535:
            raise ValueError(f"local port {local_port} out of range 1..65535")
    local_ports = list(port_map.values())
    if len(set(local_ports)) != len(local_ports):
        raise ValueError(f"duplicate local ports in port_map: {local_ports}")
    async with anyio.create_task_group() as tg:
        for remote_port, local_port in port_map.items():
            await tg.start(
                self._serve_port,
                name,
                remote_port,
                local_port,
                local_host,
                namespace,
                request_timeout,
            )
        try:
            yield
        finally:
            tg.cancel_scope.cancel()

Metadata accessor

kubex.api._metadata

MetadataAccessor

MetadataAccessor(
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
)

Bases: Generic[ResourceType]

Accessor for metadata subresource operations.

Source code in kubex/api/_metadata.py
def __init__(
    self,
    client: BaseClient,
    request_builder: RequestBuilder,
    namespace: NamespaceTypes,
    scope: Scope,
    resource_type: Type[ResourceType],
) -> None:
    self._client = client
    self._request_builder = request_builder
    self._namespace = namespace
    self._scope = scope
    self._resource_type = resource_type

get async

get(
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> PartialObjectMetadata

Read metadata of the specified resource.

Source code in kubex/api/_metadata.py
async def get(
    self,
    name: str,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> PartialObjectMetadata:
    """Read metadata of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = GetOptions(resource_version=resource_version)
    request = self._request_builder.get_metadata(
        name, _namespace, options=options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return PartialObjectMetadata.model_validate_json(response.content)

list async

list(
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ListEntity[PartialObjectMetadata]

List metadata of resources.

Source code in kubex/api/_metadata.py
async def list(
    self,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    timeout_seconds: int | None = None,
    limit: int | None = None,
    continue_token: str | None = None,
    version_match: VersionMatch | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> ListEntity[PartialObjectMetadata]:
    """List metadata of resources."""
    _namespace = ensure_optional_namespace(namespace, self._namespace, self._scope)
    options = ListOptions(
        label_selector=label_selector,
        field_selector=field_selector,
        timeout_seconds=timeout_seconds,
        limit=limit,
        continue_token=continue_token,
        version_match=version_match,
        resource_version=resource_version,
    )
    request = self._request_builder.list_metadata(
        _namespace, options, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    model = PartialObjectMetadata.__RESOURCE_CONFIG__.list_model
    return model.model_validate_json(response.content)

patch async

patch(
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> PartialObjectMetadata

Patch metadata of the specified resource.

Source code in kubex/api/_metadata.py
async def patch(
    self,
    name: str,
    patch: Patch,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    dry_run: DryRunTypes = None,
    field_manager: str | None = None,
    force: bool | None = None,
    field_validation: FieldValidation | None = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> PartialObjectMetadata:
    """Patch metadata of the specified resource."""
    _namespace = ensure_required_namespace(namespace, self._namespace, self._scope)
    options = PatchOptions(
        dry_run=dry_run,
        field_manager=field_manager,
        force=force,
        field_validation=field_validation,
    )
    request = self._request_builder.patch_metadata(
        name, _namespace, options, patch, request_timeout=request_timeout
    )
    response = await self._client.request(request)
    return PartialObjectMetadata.model_validate_json(response.content)

watch async

watch(
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    allow_bookmarks: bool | None = None,
    send_initial_events: bool | None = None,
    timeout_seconds: int | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[
    WatchEvent[PartialObjectMetadata], None
]

Watch for metadata changes of resources.

Source code in kubex/api/_metadata.py
async def watch(
    self,
    *,
    namespace: ApiNamespaceTypes = Ellipsis,
    label_selector: str | None = None,
    field_selector: str | None = None,
    allow_bookmarks: bool | None = None,
    send_initial_events: bool | None = None,
    timeout_seconds: int | None = None,
    resource_version: ResourceVersionTypes = None,
    request_timeout: ApiRequestTimeoutTypes = Ellipsis,
) -> AsyncGenerator[
    WatchEvent[PartialObjectMetadata],
    None,
]:
    """Watch for metadata changes of resources."""
    _namespace = ensure_optional_namespace(namespace, self._namespace, self._scope)
    options = WatchOptions(
        label_selector=label_selector,
        field_selector=field_selector,
        allow_bookmarks=allow_bookmarks,
        send_initial_events=send_initial_events,
        timeout_seconds=timeout_seconds,
    )
    request = self._request_builder.watch_metadata(
        _namespace,
        options,
        resource_version=resource_version,
        request_timeout=request_timeout,
    )
    async for line in self._client.stream_lines(request):
        yield WatchEvent(PartialObjectMetadata, json.loads(line))

Stream session

kubex.api._stream_session

StreamSession

StreamSession(
    connection: WebSocketConnection,
    protocol: ChannelProtocol,
    *,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    buffer_size: float = _DEFAULT_CHANNEL_BUFFER,
)

Bases: _BaseChannelSession

Multiplexes Kubernetes channel-protocol streams over a single :class:WebSocketConnection.

The session owns a background read loop that decodes incoming binary frames and dispatches their payloads to per-channel queues. Outgoing writes (stdin, resize, close_stdin) share a lock so concurrent callers do not interleave bytes on the underlying WebSocket.

Used by both the exec and attach subresource accessors.

Source code in kubex/api/_stream_session.py
def __init__(
    self,
    connection: WebSocketConnection,
    protocol: ChannelProtocol,
    *,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    buffer_size: float = _DEFAULT_CHANNEL_BUFFER,
) -> None:
    super().__init__(connection, protocol)
    # Bounded per-channel buffers prevent unbounded memory growth when a
    # consumer enables a channel but drains it slower than the remote
    # produces. The read loop pushes via ``send_nowait`` and on
    # ``WouldBlock`` closes that channel locally — so the wire-reading
    # loop is never blocked (the error channel and EOF detection keep
    # flowing) and total client memory is strictly bounded. A consumer
    # that observes premature stream closure has fallen behind; re-running
    # with a slimmer command or tighter draining is the documented remedy.
    # ``run()`` opts out by passing ``buffer_size=math.inf`` because it
    # owns the lifecycle and collects all output in memory anyway — the
    # bounded-buffer drop semantics would silently truncate ``ExecResult``
    # if the read loop got a head start over the drainer tasks.
    send_out, recv_out = anyio.create_memory_object_stream[bytes](
        max_buffer_size=buffer_size
    )
    send_err, recv_err = anyio.create_memory_object_stream[bytes](
        max_buffer_size=buffer_size
    )
    self._stdout_send: MemoryObjectSendStream[bytes] = send_out
    self._stdout_recv: MemoryObjectReceiveStream[bytes] = recv_out
    self._stderr_send: MemoryObjectSendStream[bytes] = send_err
    self._stderr_recv: MemoryObjectReceiveStream[bytes] = recv_err
    # Channels the kubelet will not open are closed locally up-front so
    # consumers iterating over ``stdout`` / ``stderr`` see an immediate
    # end-of-stream instead of blocking until socket teardown. The
    # kubelet merges stderr into stdout when ``tty=True`` and never
    # opens the dedicated stderr channel.
    self._stdout_open = stdout
    self._stderr_open = stderr and not tty
    if not self._stdout_open:
        self._stdout_send.close()
    if not self._stderr_open:
        self._stderr_send.close()
    # Per-channel overflow flags: set when ``_dispatch_nowait`` had to
    # close the channel locally because the bounded buffer was full.
    # Consumers iterating ``stdout`` / ``stderr`` only see an EOF, so
    # without these flags they cannot distinguish "command finished"
    # from "we fell behind and lost output". Check after iteration ends.
    self._stdout_truncated = False
    self._stderr_truncated = False
    self._status: Status | None = None
    self._status_event = anyio.Event()
    self._stdin_enabled = stdin
    # When stdin is not enabled the channel was never opened, so writes
    # must fail locally and ``close_stdin`` must be a no-op (no CLOSE
    # frame on the wire for a channel that does not exist).
    self._stdin_closed = not stdin
    self._stdin = _StdinWriter(self)

stderr_truncated property

stderr_truncated: bool

True if the stderr buffer overflowed and frames were dropped.

Counterpart to :attr:stdout_truncated.

stdout_truncated property

stdout_truncated: bool

True if the stdout buffer overflowed and frames were dropped.

Consumers should check this after session.stdout ends to tell a normal EOF apart from a local close triggered by backpressure. See the buffer-management note on __init__.

wait_for_status async

wait_for_status() -> Status | None

Wait for a terminal Status from the error channel.

Resolves to None if the connection closes before any error frame arrives.

Source code in kubex/api/_stream_session.py
async def wait_for_status(self) -> Status | None:
    """Wait for a terminal Status from the error channel.

    Resolves to ``None`` if the connection closes before any error frame
    arrives.
    """
    await self._status_event.wait()
    return self._status

Portforward session

kubex.api._portforward_session

PortForwardSession

PortForwardSession(
    connection: WebSocketConnection,
    protocol: ChannelProtocol,
    ports: Sequence[int],
    *,
    buffer_size: float = _DEFAULT_BUFFER,
    block_on_full: bool = False,
)

Bases: _BaseChannelSession

Port-aware WebSocket channel multiplexer for the Kubernetes portforward subresource.

Inherits lifecycle management (write lock, exit stack, task group) from _BaseChannelSession. Each port gets a data MemoryObjectStream[bytes] and an error MemoryObjectStream[str]. The first frame on each channel carries a 2-byte little-endian port-number prefix which is stripped and validated; subsequent frames are routed as raw bytes (data) or UTF-8 text (error) without any prefix.

When block_on_full=True the read loop awaits space in each per-port buffer instead of closing the stream on overflow. This propagates backpressure all the way to the WebSocket — correct for single-port TCP proxy sessions created by listen() where data loss is unacceptable. When block_on_full=False (the default) the read loop uses a non-blocking send: if a port's buffer is full the port stream is closed locally and _truncated[port] is set, leaving all other ports unaffected. This prevents head-of-line blocking across ports in multi-port forward() sessions at the cost of surfacing overflow as EndOfStream rather than stalling traffic.

Source code in kubex/api/_portforward_session.py
def __init__(
    self,
    connection: WebSocketConnection,
    protocol: ChannelProtocol,
    ports: Sequence[int],
    *,
    buffer_size: float = _DEFAULT_BUFFER,
    block_on_full: bool = False,
) -> None:
    super().__init__(connection, protocol)
    self._ports: tuple[int, ...] = tuple(ports)
    self._block_on_full = block_on_full

    # Channel-id → port lookup tables (built once, queried in _read_loop)
    self._data_ch_to_port: dict[int, int] = {}
    self._error_ch_to_port: dict[int, int] = {}

    # Per-port memory streams (keyed by port number)
    self._streams_send: dict[int, MemoryObjectSendStream[bytes]] = {}
    self._streams_recv: dict[int, MemoryObjectReceiveStream[bytes]] = {}
    self._errors_send: dict[int, MemoryObjectSendStream[str]] = {}
    self._errors_recv: dict[int, MemoryObjectReceiveStream[str]] = {}

    # Per-port PortForwardStream objects exposing the anyio.abc.ByteStream API.
    self._streams: dict[int, PortForwardStream] = {}

    # Overflow flags: set when a per-port data buffer fills and is closed locally.
    self._truncated: dict[int, bool] = {}

    # Whether each port's data / error channel is still accepting inbound frames.
    self._data_open: dict[int, bool] = {}
    self._error_open: dict[int, bool] = {}

    # Channels whose first frame (port prefix) has already been consumed.
    self._data_first_seen: set[int] = set()
    self._error_first_seen: set[int] = set()

    from kubex.api._portforward import PortForwardStream as _PortForwardStream

    for i, port in enumerate(self._ports):
        dc = data_channel_for_port_index(i)
        ec = error_channel_for_port_index(i)
        self._data_ch_to_port[dc] = port
        self._error_ch_to_port[ec] = port

        send_d, recv_d = anyio.create_memory_object_stream[bytes](
            max_buffer_size=buffer_size
        )
        send_e, recv_e = anyio.create_memory_object_stream[str](
            max_buffer_size=buffer_size
        )
        self._streams_send[port] = send_d
        self._streams_recv[port] = recv_d
        self._errors_send[port] = send_e
        self._errors_recv[port] = recv_e
        self._streams[port] = _PortForwardStream(self, port, dc, recv_d)
        self._truncated[port] = False
        self._data_open[port] = True
        self._error_open[port] = True

close_port_data async

close_port_data(port: int) -> None

Send a half-close frame for the data channel of port.

Idempotent: a second call for the same port is a no-op. The emitted frame is protocol.encode(CHANNEL_CLOSE, bytes([data_channel_id])).

Source code in kubex/api/_portforward_session.py
async def close_port_data(self, port: int) -> None:
    """Send a half-close frame for the data channel of ``port``.

    Idempotent: a second call for the same port is a no-op.
    The emitted frame is ``protocol.encode(CHANNEL_CLOSE, bytes([data_channel_id]))``.
    """
    if port not in self._ports:
        raise ValueError(
            f"port {port} is not part of this portforward session (ports={list(self._ports)})"
        )
    port_index = self._ports.index(port)
    dc = data_channel_for_port_index(port_index)
    await self._send_close_for_channel(dc)

Protocol helpers

kubex.api._protocol

CachedSubresourceDescriptor

Base for non-data descriptors that guard on a marker interface and cache the accessor.