
Kubernetes
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/edge | 1422 | 15 Sep 2025 | |
latest/edge | 1421 | 15 Sep 2025 | |
latest/edge | 1420 | 15 Sep 2025 | |
latest/edge | 1419 | 15 Sep 2025 | |
latest/edge | 578 | 19 Mar 2025 | |
latest/edge | 576 | 19 Mar 2025 | |
1.32/stable | 1386 | 09 Sep 2025 | |
1.32/stable | 1383 | 09 Sep 2025 | |
1.32/stable | 1382 | 09 Sep 2025 | |
1.32/stable | 1385 | 09 Sep 2025 | |
1.32/stable | 1384 | 09 Sep 2025 | |
1.32/stable | 1381 | 09 Sep 2025 | |
1.32/candidate | 1385 | 05 Sep 2025 | |
1.32/candidate | 1381 | 05 Sep 2025 | |
1.32/candidate | 1383 | 05 Sep 2025 | |
1.32/candidate | 1382 | 05 Sep 2025 | |
1.32/candidate | 1386 | 05 Sep 2025 | |
1.32/candidate | 1384 | 05 Sep 2025 | |
1.32/beta | 1406 | 11 Sep 2025 | |
1.32/beta | 1405 | 11 Sep 2025 | |
1.32/beta | 1404 | 11 Sep 2025 | |
1.32/beta | 1403 | 11 Sep 2025 | |
1.32/beta | 1402 | 11 Sep 2025 | |
1.32/beta | 1401 | 11 Sep 2025 | |
1.33/stable | 1350 | 04 Sep 2025 | |
1.33/stable | 1348 | 04 Sep 2025 | |
1.33/stable | 1351 | 04 Sep 2025 | |
1.33/stable | 1347 | 04 Sep 2025 | |
1.33/candidate | 1390 | 05 Sep 2025 | |
1.33/candidate | 1387 | 05 Sep 2025 | |
1.33/candidate | 1388 | 05 Sep 2025 | |
1.33/candidate | 1389 | 05 Sep 2025 | |
1.33/beta | 1390 | 05 Sep 2025 | |
1.33/beta | 1389 | 05 Sep 2025 | |
1.33/beta | 1388 | 05 Sep 2025 | |
1.33/beta | 1387 | 05 Sep 2025 | |
1.31/candidate | 142 | 11 Dec 2024 | |
1.31/candidate | 141 | 27 Nov 2024 | |
1.30/beta | 65 | 23 May 2024 |
juju deploy k8s --channel 1.32/stable
Deploy universal operators easily with Juju, the Universal Operator Lifecycle Manager.
Platform:
24.04
22.04
20.04
-
- Last updated
- Revision Library version 0.8
# Copyright 2025 Canonical Ltd.
# See LICENSE file for licensing details.
"""Module for managing k8sd API interactions.
This module provides a high-level interface for interacting with K8sd. It
simplifies tasks such as token management and component updates.
The core of the module is the K8sdAPIManager, which handles the creation
and management of HTTP connections to interact with the k8sd API. This
class utilises different connection factories (UnixSocketConnectionFactory
and HTTPConnectionFactory) to establish connections through either Unix
sockets or HTTP protocols.
Example usage for creating a join token for K8sd:
```python
try:
factory = UnixSocketConnectionFactory('/path/to/socket')
api_manager = K8sdAPIManager(factory)
join_token = api_manager.create_join_token('node-name')
except K8sdAPIManagerError as e:
logger.error("An error occurred: %s", e.message)
```
Similarly, the module allows for requesting authentication tokens and
managing K8s components.
"""
import enum
import json
import logging
import socket
from contextlib import contextmanager
from datetime import datetime
from http.client import HTTPConnection, HTTPException
from typing import Any, Dict, Generator, List, Optional, Type, TypeVar
import yaml
from pydantic import (
BaseModel,
ConfigDict,
Field,
SecretStr,
ValidationInfo,
field_serializer,
field_validator,
)
# The unique Charmhub library identifier, never change it
LIBID = "6a5f235306864667a50437c08ba7e83f"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 8
logger = logging.getLogger(__name__)
class ErrorCodes(enum.Enum):
"""Enumerate the response codes from the k8s api.
Attributes:
STATUS_NODE_UNAVAILABLE: returned when the node isn't in the cluster
STATUS_NODE_IN_USE: returned when the node is in the cluster already
"""
STATUS_NODE_UNAVAILABLE = 520
STATUS_NODE_IN_USE = 521
class K8sdAPIManagerError(Exception):
"""Base exception for K8sd API Manager errors."""
class K8sdConnectionError(K8sdAPIManagerError):
"""Raised when there is a connection error."""
class InvalidResponseError(K8sdAPIManagerError):
"""Raised when the response is invalid or unexpected.
Attributes:
code (int): HTTP Status code
"""
def __init__(
self,
code: int,
msg: str,
method: Optional[str] = None,
endpoint: Optional[str] = None,
reason: Optional[str] = None,
body: Optional[str] = None,
) -> None:
"""Initialise the InvalidResponseError.
Args:
code (int): http response code
msg (str): Message associated with the error
method (Optional[str]): The method used to make the request
endpoint (Optional[str]): The endpoint used to make the request
reason (Optional[str]): The reason for the error
body (Optional[str]): The body of the response
"""
super().__init__(f"Error status {code}\n" + msg)
self.code = code
self.method = method
self.endpoint = endpoint
self.reason = reason
self.body = body
class BaseRequestModel(BaseModel):
"""Base model for k8s request responses.
Attributes:
type (str): The type of the request.
status (str): The status of the response, defaults to an empty string.
status_code (int): The status code of the response.
operation (str): The operation of the request, defaults to an empty string.
error_code (int): The error code associated with the response.
error (str): The error message, defaults to an empty string.
"""
type: str
status: str = Field(default="")
status_code: int
operation: str = Field(default="")
error_code: int
error: str = Field(default="")
@field_validator("status_code", mode="after")
def check_status_code(cls, v):
"""Validate the status_code field.
Args:
v (int): The value of the status_code field to validate.
Returns:
int: The validated status code if it is 200.
Raises:
ValueError: If the status_code is not 200.
"""
if v != 200:
raise ValueError(f"Status code must be 200. Received {v}")
return v
@field_validator("error_code", mode="after")
def check_error_code(cls, v, info: ValidationInfo):
"""Validate the error_code field.
Args:
v (int): The value of the error_code field to validate.
info (ValidationInfo): The validation information.
Returns:
int: The validated error code if it is 0.
Raises:
ValueError: If the error_code is not 0.
"""
if v != 0:
error_message = info.data.get("error", "Unknown error")
raise ValueError(f"Error code must be 0, received {v}. Error message: {error_message}")
return v
class EmptyResponse(BaseRequestModel):
"""Response model for request that do not expect any return value."""
class TokenMetadata(BaseModel):
"""Model representing metadata for a token.
Attributes:
token (SecretStr): The token string. (accessible via .get_secret_value() )
"""
token: SecretStr
class AuthTokenResponse(BaseRequestModel):
"""Response model for Kubernetes authentication token requests.
Attributes:
metadata (TokenMetadata): Metadata containing the authentication token.
"""
metadata: TokenMetadata
class CreateJoinTokenResponse(BaseRequestModel):
"""Response model for join token creation requests.
Attributes:
metadata (TokenMetadata): Metadata containing the join token.
"""
metadata: TokenMetadata
class ClusterMember(BaseModel):
"""Represents a member in the k8sd cluster.
Attributes:
name (str): Name of the cluster member.
address (str): Address of the cluster member.
cluster_role (str): Cluster Role of the node in the cluster.
datastore_role (str): Role of the member in the cluster.
"""
name: str
address: str
cluster_role: Optional[str] = Field(default=None, alias="cluster-role")
datastore_role: Optional[str] = Field(default=None, alias="datastore-role")
class DNSConfig(BaseModel):
"""Configuration for the DNS settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which.
cluster_domain: The domain name of the cluster.
service_ip: The IP address of the DNS service within the cluster.
upstream_nameservers: List of upstream nameservers for DNS resolution.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
cluster_domain: Optional[str] = Field(default=None, alias="cluster-domain")
service_ip: Optional[str] = Field(default=None, alias="service-ip")
upstream_nameservers: Optional[List[str]] = Field(default=None, alias="upstream-nameservers")
class IngressConfig(BaseModel):
"""Configuration for the ingress settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of Ingress.
enable_proxy_protocol: Optional flag to enable or disable proxy protocol.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
enable_proxy_protocol: Optional[bool] = Field(default=None, alias="enable-proxy-protocol")
class LoadBalancerConfig(BaseModel):
"""Configuration for the load balancer settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of LoadBalancer.
cidrs: List of CIDR blocks for the load balancer.
l2_mode: Optional flag to enable or disable layer 2 mode.
l2_interfaces: List of layer 2 interfaces for the load balancer.
bgp_mode: Optional flag to enable or disable BGP.
bgp_local_asn: The local ASN for BGP configuration.
bgp_peer_address: The peer address for BGP configuration.
bgp_peer_asn: The peer ASN for BGP configuration.
bgp_peer_port: The port for BGP peering.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
cidrs: Optional[List[str]] = Field(default=None)
l2_mode: Optional[bool] = Field(default=None, alias="l2-mode")
l2_interfaces: Optional[List[str]] = Field(default=None, alias="l2-interfaces")
bgp_mode: Optional[bool] = Field(default=None, alias="bgp-mode")
bgp_local_asn: Optional[int] = Field(default=None, alias="bgp-local-asn")
bgp_peer_address: Optional[str] = Field(default=None, alias="bgp-peer-address")
bgp_peer_asn: Optional[int] = Field(default=None, alias="bgp-peer-asn")
bgp_peer_port: Optional[int] = Field(default=None, alias="bgp-peer-port")
class LocalStorageConfig(BaseModel):
"""Configuration for the local storage settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of Storage.
local_path: The local path for storage.
reclaim_policy: The policy for reclaiming local storage.
set_default: Optional flag to set this as the default storage option.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
local_path: Optional[str] = Field(default=None, alias="local-path")
reclaim_policy: Optional[str] = Field(default=None, alias="reclaim-policy")
set_default: Optional[bool] = Field(default=None, alias="set-default")
class NetworkConfig(BaseModel):
"""Configuration for the network settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of Network.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
class GatewayConfig(BaseModel):
"""Configuration for the gateway settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of Gateway.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
class MetricsServerConfig(BaseModel):
"""Configuration for the metrics server settings of the cluster.
Attributes:
model_config: ConfigDict instance for the model.
enabled: Optional flag which represents the status of MetricsServer.
"""
model_config = ConfigDict(populate_by_name=True)
enabled: Optional[bool] = Field(default=None)
class UserFacingClusterConfig(BaseModel):
"""Aggregated configuration model for the user-facing aspects of a cluster.
Attributes:
model_config: ConfigDict instance for the model.
network: Network configuration for the cluster.
dns: DNS configuration for the cluster.
ingress: Ingress configuration for the cluster.
load_balancer: Load balancer configuration for the cluster.
local_storage: Local storage configuration for the cluster.
gateway: Gateway configuration for the cluster.
metrics_server: Metrics server configuration for the cluster.
cloud_provider: The cloud provider for the cluster.
annotations: Dictionary that can be used to store arbitrary metadata configuration.
"""
model_config = ConfigDict(populate_by_name=True)
network: Optional[NetworkConfig] = Field(default=None)
dns: Optional[DNSConfig] = Field(default=None)
ingress: Optional[IngressConfig] = Field(default=None)
load_balancer: Optional[LoadBalancerConfig] = Field(default=None, alias="load-balancer")
local_storage: Optional[LocalStorageConfig] = Field(default=None, alias="local-storage")
gateway: Optional[GatewayConfig] = Field(default=None)
metrics_server: Optional[MetricsServerConfig] = Field(default=None, alias="metrics-server")
cloud_provider: Optional[str] = Field(default=None, alias="cloud-provider")
annotations: Optional[Dict[str, str]] = Field(default=None)
class UserFacingDatastoreConfig(BaseModel):
"""Aggregated configuration model for the user-facing datastore aspects of a cluster.
Attributes:
model_config: ConfigDict instance for the model.
type: Type of the datastore. For runtime updates, this needs to be "external".
servers: Server addresses of the external datastore.
ca_crt: CA certificate of the external datastore cluster in PEM format.
client_crt: client certificate of the external datastore cluster in PEM format.
client_key: client key of the external datastore cluster in PEM format.
"""
model_config = ConfigDict(populate_by_name=True)
type: Optional[str] = Field(default=None)
servers: Optional[List[str]] = Field(default=None)
ca_crt: Optional[str] = Field(default=None, alias="ca-crt")
client_crt: Optional[str] = Field(default=None, alias="client-crt")
client_key: Optional[str] = Field(default=None, alias="client-key")
class BootstrapConfig(BaseModel):
"""Configuration model for bootstrapping a Canonical K8s cluster.
Attributes:
cluster_config (UserFacingClusterConfig): The cluster configuration settings.
control_plane_taints (List[str]): Register with the following control-plane taints
pod_cidr (str): The IP address range for the cluster's pods.
service_cidr (str): The IP address range for the cluster services.
disable_rbac (bool): Flag to disable role-based access control
secure_port (int): The secure port used for Kubernetes.
k8s_dqlite_port (int): The port used by Dqlite.
datastore_type (str): The type of datastore used by the cluster.
datastore_servers (List[str]): The servers used by the datastore.
datastore_ca_cert (str): The CA certificate for the datastore.
datastore_client_cert (str): The client certificate for accessing the datastore.
datastore_client_key (str): The client key for accessing the datastore.
extra_sans (List[str]): List of extra sans for the self-signed certificates
ca_cert (str): The CA certificate for Kubernetes services.
ca_key (str): The CA key for Kubernetes services.
client_ca_cert (str): The client CA certificate for Kubernetes services.
client_ca_key (str): The client CA key for Kubernetes services.
front_proxy_ca_cert (str): The front proxy CA certificate.
front_proxy_ca_key (str): The front proxy CA key.
front_proxy_client_cert (str): The front proxy client certificate.
front_proxy_client_key (str): The front proxy client key.
api_server_kubelet_client_cert (str): The kubelet client certificate for the API server.
api_server_kubelet_client_key (str): The kubelet client key for the API server.
admin_client_cert (str): The admin client certificate.
admin_client_key (str): The admin client key.
kube_proxy_client_cert (str): The kube-proxy client certificate.
kube_proxy_client_key (str): The kube-proxy client key.
kube_scheduler_client_cert (str): The kube-scheduler client certificate.
kube_scheduler_client_key (str): The kube-scheduler client key.
kube_controller_manager_client_cert (str): The controller manager client certificate.
kube_controller_manager_client_key (str): The controller manager client key.
service_account_key (str): The service account key.
api_server_cert (str): The API server certificate.
api_server_key (str): The API server key.
kubelet_cert (str): The kubelet certificate.
kubelet_key (str): The kubelet key.
kubelet_client_cert (str): The kubelet client certificate.
kubelet_client_key (str): The kubelet client key.
extra_node_config_files (Dict[str, str]): Additional configuration files for nodes.
extra_node_kube_apiserver_args (Dict[str, Optional[str]]): key-value
service args .
extra_node_kube_controller_manager_args (Dict[str, Optional[str]]):
key-value service args .
extra_node_kube_scheduler_args (Dict[str, Optional[str]]): key-value
service args .
extra_node_kube_proxy_args (Dict[str, Optional[str]]): key-value
service args .
extra_node_kubelet_args (Dict[str, Optional[str]]): key-value service
args .
extra_node_containerd_args (Dict[str, Optional[str]]): key-value
service args .
extra_node_k8s_dqlite_args (Dict[str, Optional[str]]): key-value
service args
extra_node_containerd_config (Dict[str, Any]): key-value config args
containerd_base_dir (str): The base directory for containerd.
"""
cluster_config: Optional[UserFacingClusterConfig] = Field(default=None, alias="cluster-config")
control_plane_taints: Optional[List[str]] = Field(default=None, alias="control-plane-taints")
pod_cidr: Optional[str] = Field(default=None, alias="pod-cidr")
service_cidr: Optional[str] = Field(default=None, alias="service-cidr")
disable_rbac: Optional[bool] = Field(default=None, alias="disable-rbac")
secure_port: Optional[int] = Field(default=None, alias="secure-port")
k8s_dqlite_port: Optional[int] = Field(default=None, alias="k8s-dqlite-port")
datastore_type: Optional[str] = Field(default=None, alias="datastore-type")
datastore_servers: Optional[List[str]] = Field(default=None, alias="datastore-servers")
datastore_ca_cert: Optional[str] = Field(default=None, alias="datastore-ca-crt")
datastore_client_cert: Optional[str] = Field(default=None, alias="datastore-client-crt")
datastore_client_key: Optional[str] = Field(default=None, alias="datastore-client-key")
extra_sans: Optional[List[str]] = Field(default=None, alias="extra-sans")
# Cluster-wide external certificates
ca_cert: Optional[str] = Field(default=None, alias="ca-crt")
ca_key: Optional[str] = Field(default=None, alias="ca-key")
client_ca_cert: Optional[str] = Field(default=None, alias="client-ca-crt")
client_ca_key: Optional[str] = Field(default=None, alias="client-ca-key")
front_proxy_ca_cert: Optional[str] = Field(default=None, alias="front-proxy-ca-crt")
front_proxy_ca_key: Optional[str] = Field(default=None, alias="front-proxy-ca-key")
front_proxy_client_cert: Optional[str] = Field(default=None, alias="front-proxy-client-crt")
front_proxy_client_key: Optional[str] = Field(default=None, alias="front-proxy-client-key")
api_server_kubelet_client_cert: Optional[str] = Field(
default=None, alias="apiserver-kubelet-client-crt"
)
api_server_kubelet_client_key: Optional[str] = Field(
default=None, alias="apiserver-kubelet-client-key"
)
admin_client_cert: Optional[str] = Field(default=None, alias="admin-client-crt")
admin_client_key: Optional[str] = Field(default=None, alias="admin-client-key")
kube_proxy_client_cert: Optional[str] = Field(default=None, alias="kube-proxy-client-crt")
kube_proxy_client_key: Optional[str] = Field(default=None, alias="kube-proxy-client-key")
kube_scheduler_client_cert: Optional[str] = Field(
default=None, alias="kube-scheduler-client-crt"
)
kube_scheduler_client_key: Optional[str] = Field(
default=None, alias="kube-scheduler-client-key"
)
kube_controller_manager_client_cert: Optional[str] = Field(
default=None, alias="kube-controller-manager-client-crt"
)
kube_controller_manager_client_key: Optional[str] = Field(
default=None, alias="kube-controller-manager-client-key"
)
service_account_key: Optional[str] = Field(default=None, alias="service-account-key")
# Node-specific external certificates
api_server_cert: Optional[str] = Field(default=None, alias="apiserver-crt")
api_server_key: Optional[str] = Field(default=None, alias="apiserver-key")
kubelet_cert: Optional[str] = Field(default=None, alias="kubelet-crt")
kubelet_key: Optional[str] = Field(default=None, alias="kubelet-key")
kubelet_client_cert: Optional[str] = Field(default=None, alias="kubelet-client-crt")
kubelet_client_key: Optional[str] = Field(default=None, alias="kubelet-client-key")
# Extra configuration files
extra_node_config_files: Optional[Dict[str, str]] = Field(
default=None, alias="extra-node-config-files"
)
# Extra service arguments (values can be None to delete)
extra_node_kube_apiserver_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-apiserver-args"
)
extra_node_kube_controller_manager_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-controller-manager-args"
)
extra_node_kube_scheduler_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-scheduler-args"
)
extra_node_kube_proxy_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-proxy-args"
)
extra_node_kubelet_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kubelet-args"
)
extra_node_containerd_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-containerd-args"
)
extra_node_k8s_dqlite_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-k8s-dqlite-args"
)
extra_node_containerd_config: Optional[Dict[str, Any]] = Field(
default=None, alias="extra-node-containerd-config"
)
containerd_base_dir: Optional[str] = Field(default=None, alias="containerd-base-dir")
class CreateClusterRequest(BaseModel):
"""Request model for creating a new Canonical Kubernetes cluster.
Attributes:
name (str): The name of the cluster to be created.
address (str): The address where the cluster is hosted.
config (BootstrapConfig): Additional configuration parameters for the cluster.
"""
name: str
address: str
config: BootstrapConfig
class UpdateClusterConfigRequest(BaseModel):
"""Request model for updating Cluster config.
Attributes:
config (Optional[UserFacingClusterConfig]): The cluster configuration.
datastore (Optional[UserFacingDatastoreConfig]): The clusters datastore configuration.
"""
config: Optional[UserFacingClusterConfig] = Field(default=None)
datastore: Optional[UserFacingDatastoreConfig] = Field(default=None)
class NodeJoinConfig(BaseModel):
"""Request model for the config on a node joining the cluster.
Attributes:
model_config: ConfigDict instance for the model.
kubelet_cert (str): node's certificate
kubelet_key (str): node's certificate key
kubelet_client_cert (str): Kubelet client certificate
kubelet_client_key (str): Kubelet client key
kube_proxy_client_cert (str): Kube-proxy client certificate
kube_proxy_client_key (str): Kube-proxy client key
extra_node_kube_proxy_args (Dict[str, str]): key-value service args
extra_node_kubelet_args (Dict[str, str]): key-value service args
extra_node_containerd_args ([Dict[str,str]]): key-value service args
extra_node_containerd_config ([Dict[str,Any]]): key-value config args
containerd_base_dir (str): The base directory for containerd.
"""
model_config = ConfigDict(populate_by_name=True)
kubelet_cert: Optional[str] = Field(default=None, alias="kubelet-crt")
kubelet_key: Optional[str] = Field(default=None, alias="kubelet-key")
kubelet_client_cert: Optional[str] = Field(default=None, alias="kubelet-client-crt")
kubelet_client_key: Optional[str] = Field(default=None, alias="kubelet-client-key")
kube_proxy_client_cert: Optional[str] = Field(default=None, alias="kube-proxy-client-crt")
kube_proxy_client_key: Optional[str] = Field(default=None, alias="kube-proxy-client-key")
extra_node_kube_proxy_args: Optional[Dict[str, str]] = Field(
default=None, alias="extra-node-kube-proxy-args"
)
extra_node_kubelet_args: Optional[Dict[str, str]] = Field(
default=None, alias="extra-node-kubelet-args"
)
extra_node_containerd_args: Optional[Dict[str, str]] = Field(
default=None, alias="extra-node-containerd-args"
)
extra_node_containerd_config: Optional[Dict[str, Any]] = Field(
default=None, alias="extra-node-containerd-config"
)
containerd_base_dir: Optional[str] = Field(default=None, alias="containerd-base-dir")
class ControlPlaneNodeJoinConfig(NodeJoinConfig):
"""Request model for the config on a control-plane node joining the cluster.
Attributes:
model_config: ConfigDict instance for the model.
extra_sans (List[str]): List of extra sans for the self-signed certificates
apiserver_crt (str): API server certificate
apiserver_key (str): API server certificate key
front_proxy_client_crt (str): Front proxy client certificate
front_proxy_client_key (str): Front proxy client key
admin_client_cert (str): Admin client certificate
admin_client_key (str): Admin client key
kube_scheduler_client_cert (str): Kube-scheduler client certificate
kube_scheduler_client_key (str): Kube-scheduler client key
kube_controller_manager_client_cert (str): Controller manager client certificate
kube_controller_manager_client_key (str): Controller manager client key
extra_node_config_files (Dict[str, str]): Additional node config files
extra_node_kube_apiserver_args (Dict[str, Optional[str]]): API server args .
extra_node_kube_controller_manager_args (Dict[str, Optional[str]]): Controller manager args
extra_node_kube_scheduler_args (Dict[str, Optional[str]]): Scheduler args
extra_node_k8s_dqlite_args (Dict[str, Optional[str]]): Dqlite args
extra_node_containerd_config (Dict[str, Any]): Containerd config
"""
model_config = ConfigDict(populate_by_name=True)
extra_sans: Optional[List[str]] = Field(default=None, alias="extra-sans")
apiserver_crt: Optional[str] = Field(default=None, alias="apiserver-crt")
apiserver_key: Optional[str] = Field(default=None, alias="apiserver-key")
front_proxy_client_crt: Optional[str] = Field(default=None, alias="front-proxy-client-crt")
front_proxy_client_key: Optional[str] = Field(default=None, alias="front-proxy-client-key")
admin_client_cert: Optional[str] = Field(default=None, alias="admin-client-crt")
admin_client_key: Optional[str] = Field(default=None, alias="admin-client-key")
kube_scheduler_client_cert: Optional[str] = Field(
default=None, alias="kube-scheduler-client-crt"
)
kube_scheduler_client_key: Optional[str] = Field(
default=None, alias="kube-scheduler-client-key"
)
kube_controller_manager_client_cert: Optional[str] = Field(
default=None, alias="kube-controller-manager-client-crt"
)
kube_controller_manager_client_key: Optional[str] = Field(
default=None, alias="kube-controller-manager-client-key"
)
extra_node_config_files: Optional[Dict[str, str]] = Field(
default=None, alias="extra-node-config-files"
)
extra_node_kube_apiserver_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-apiserver-args"
)
extra_node_kube_controller_manager_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-controller-manager-args"
)
extra_node_kube_scheduler_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-kube-scheduler-args"
)
extra_node_k8s_dqlite_args: Optional[Dict[str, Optional[str]]] = Field(
default=None, alias="extra-node-k8s-dqlite-args"
)
extra_node_containerd_config: Optional[Dict[str, Any]] = Field(
default=None, alias="extra-node-containerd-config"
)
class JoinClusterRequest(BaseModel):
"""Request model for a node joining the cluster.
Attributes:
model_config: ConfigDict instance for the model.
name (str): node's certificate
address (str): node's certificate key
token (str): token
config (NodeJoinConfig): Node Config
"""
model_config = ConfigDict(populate_by_name=True)
name: str
address: str
token: SecretStr
config: Optional[NodeJoinConfig] = Field(default=None)
@field_serializer("token")
def serialize_token(self, token: SecretStr) -> str:
"""Serialize the token to a string.
Args:
token (SecretStr): The token to serialize.
Returns:
str: The serialized token.
"""
return token.get_secret_value()
@field_serializer("config")
def serialize_config(self, config: Optional[NodeJoinConfig]) -> Optional[str]:
"""Serialize the config to a YAML string.
Args:
config (Optional[NodeJoinConfig]): The config to serialize.
Returns:
Optional[str]: The serialized config.
"""
if config is not None:
return yaml.safe_dump(config.model_dump(exclude_none=True, mode="json", by_alias=True))
return None
class DatastoreStatus(BaseModel):
"""information regarding the active datastore.
Attributes:
datastore_type (str): external or k8s-dqlite datastore
servers: (List(str)): list of server addresses of the external datastore cluster.
"""
datastore_type: Optional[str] = Field(default=None, alias="type")
servers: Optional[List[str]] = Field(default=None, alias="servers")
class ClusterStatus(BaseModel):
"""Represents the overall status of the k8sd cluster.
Attributes:
ready (bool): Indicates if the cluster is ready.
members (List[ClusterMember]): List of members in the cluster.
config (UserFacingClusterConfig): information about the cluster configuration.
datastore (DatastoreStatus): information regarding the active datastore.
"""
ready: bool = Field(False)
members: Optional[List[ClusterMember]] = Field(default=None)
config: Optional[UserFacingClusterConfig] = Field(default=None)
datastore: Optional[DatastoreStatus] = Field(default=None)
class ClusterMetadata(BaseModel):
"""Metadata containing status information about the k8sd cluster.
Attributes:
status (ClusterStatus): The status of the k8sd cluster.
"""
status: ClusterStatus
class GetClusterStatusResponse(BaseRequestModel):
"""Response model for getting the status of the k8sd cluster.
Attributes:
metadata (ClusterMetadata): Metadata containing the cluster status.
Can be None if the status is not available.
"""
metadata: Optional[ClusterMetadata] = Field(default=None)
class KubeConfigMetadata(BaseModel):
"""Metadata containing kubeconfig.
Attributes:
kubeconfig (KubeConfigMetadata): The status of the k8sd cluster.
"""
kubeconfig: str
class GetKubeConfigResponse(BaseRequestModel):
"""Response model for getting the kubeconfig from the cluster.
Attributes:
metadata (KubeconfigMetadata): Metadata containing the kubeconfig.
"""
metadata: KubeConfigMetadata
class RefreshCertificatesPlanMetadata(BaseModel):
"""Metadata for the certificates plan response.
Attributes:
model_config: ConfigDict instance for the model.
seed (int): The seed for the new certificates.
certificate_signing_requests (Optional[list[str]]): List of names
of the CertificateSigningRequests that need to be signed externally (for worker nodes).
"""
model_config = ConfigDict(populate_by_name=True)
# NOTE(Hue): Alias is because of a naming mismatch:
# https://github.com/canonical/k8s-snap-api/blob/6d4139295b37800fb2b3fcce9fc260e6caf284b9/api/v1/rpc_refresh_certificates_plan.go#L12
seed: Optional[int] = Field(default=None, alias="seconds")
certificate_signing_requests: Optional[List[str]] = Field(
default=None, alias="certificate-signing-requests"
)
class RefreshCertificatesPlanResponse(BaseRequestModel):
"""Response model for the refresh certificates plan.
Attributes:
metadata (RefreshCertificatesPlanMetadata): Metadata for the certificates plan response.
"""
metadata: RefreshCertificatesPlanMetadata
class RefreshCertificatesRunRequest(BaseModel):
"""Request model for running the refresh certificates run.
Attributes:
model_config: ConfigDict instance for the model.
seed (int): The seed for the new certificates from plan response.
expiration_seconds (int): The duration of the new certificates.
extra_sans (list[str]): List of extra sans for the new certificates.
"""
model_config = ConfigDict(populate_by_name=True)
seed: int
expiration_seconds: int = Field(alias="expiration-seconds")
extra_sans: Optional[List[str]] = Field(alias="extra-sans")
class RefreshCertificatesRunMetadata(BaseModel):
"""Metadata for RefreshCertificatesRunResponse.
Attributes:
model_config: ConfigDict instance for the model.
expiration_seconds (int): The duration of the new certificates
(might not match the requested value).
"""
model_config = ConfigDict(populate_by_name=True)
expiration_seconds: int = Field(alias="expiration-seconds")
class RefreshCertificatesRunResponse(BaseRequestModel):
"""Response model for the refresh certificates run.
Attributes:
metadata (RefreshCertificatesRunMetadata): Metadata for the certificates run response.
"""
metadata: RefreshCertificatesRunMetadata
class GetClusterConfigMetadata(BaseModel):
"""Metadata containing the cluster config.
Attributes:
status (UserFacingClusterConfig): The configuration of the cluster.
datastore (UserFacingDatastoreConfig): The configuration of the datastore.
pod_cidr (str): The CIDR range for the pods in the cluster.
service_cidr (str): The CIDR range for the services in the cluster.
"""
model_config = ConfigDict(populate_by_name=True)
status: UserFacingClusterConfig
datastore: Optional[UserFacingDatastoreConfig] = Field(default=None)
pod_cidr: Optional[str] = Field(default=None, alias="pod-cidr")
service_cidr: Optional[str] = Field(default=None, alias="service-cidr")
class GetClusterConfigResponse(BaseRequestModel):
"""Response model for the get cluster config endpoint.
Attributes:
metadata (GetClusterConfigMetadata): Metadata containing the cluster config.
"""
metadata: GetClusterConfigMetadata
class NodeStatus(BaseModel):
"""Represents the status of a node in the cluster.
Attributes:
name (Optional[str]): The name of the node.
address (Optional[str]): The address of the node.
cluster_role (Optional[str]): The role of the node in the cluster.
datastore_role (Optional[str]): The role of the node in the datastore cluster.
"""
model_config = ConfigDict(populate_by_name=True)
name: Optional[str] = Field(default=None)
address: Optional[str] = Field(default=None)
cluster_role: Optional[str] = Field(default=None, alias="cluster-role")
datastore_role: Optional[str] = Field(default=None, alias="datastore-role")
class GetNodeStatusMetadata(BaseModel):
"""Metadata containing the node status.
Attributes:
status (NodeStatus): The status of the local node.
taints (Optional[List[str]]): List of taints applied to the node.
"""
status: NodeStatus
taints: Optional[List[str]] = Field(default=None)
class GetNodeStatusResponse(BaseRequestModel):
"""Response model for the get node status endpoint.
Attributes:
metadata (GetNodeStatusMetadata): Metadata containing the node status.
"""
metadata: GetNodeStatusMetadata
T = TypeVar("T", bound=BaseRequestModel)
class UnixSocketHTTPConnection(HTTPConnection):
"""HTTP connection over a Unix socket."""
def __init__(self, unix_socket: str, timeout: int = 30):
"""Initialise the UnixSocketHTTPConnection.
Args:
unix_socket (str): Path to the Unix socket.
timeout (int): Connection timeout in seconds.
"""
super().__init__("localhost", timeout=timeout)
self.unix_socket = unix_socket
def connect(self):
"""Establish a connection to the server using a Unix socket.
Raises:
K8sdConnectionError: If there is an error connecting to the Unix socket.
"""
try:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.sock.connect(self.unix_socket)
except socket.error as e:
raise K8sdConnectionError(f"Error connecting to socket: {self.unix_socket}") from e
class ConnectionFactory:
"""Abstract factory for creating connection objects."""
@contextmanager
def create_connection(self) -> Generator[HTTPConnection, None, None]:
"""Create a new connection instance.
Raises:
NotImplementedError: If create_connection is not implemented by the subclass.
"""
raise NotImplementedError("create_connection must be implemented by subclasses")
class UnixSocketConnectionFactory(ConnectionFactory):
"""Concrete factory for creating Unix socket connections."""
def __init__(self, unix_socket: str, timeout: int = 30):
"""Initialize a new instance of UnixSocketConnectionFactory.
Args:
unix_socket (str): The file path to the Unix socket.
timeout (int, optional): The timeout for the connection in seconds.
Defaults to 30 seconds.
"""
self.unix_socket = unix_socket
self.timeout = timeout
@contextmanager
def create_connection(self):
"""Create and manage a Unix socket HTTP connection.
Yields:
UnixSocketHTTPConnection: The created Unix socket HTTP connection.
"""
conn = UnixSocketHTTPConnection(self.unix_socket, self.timeout)
try:
yield conn
finally:
conn.close()
class HTTPConnectionFactory(ConnectionFactory):
"""Concrete factory for creating HTTP connections."""
def __init__(self, host: str, port=None, timeout: int = 30):
"""Initialize a new instance of HTTPConnectionFactory.
Args:
host (str): Hostname for the HTTP connection.
port (int): Port for the HTTP connection.
timeout (int, optional): The timeout for the connection in seconds.
Defaults to 30 seconds.
"""
self.host = host
self.port = port
self.timeout = timeout
@contextmanager
def create_connection(self):
"""Create and manage an HTTP connection.
Yields:
HTTPConnection: The created HTTP connection.
"""
conn = HTTPConnection(self.host, self.port, self.timeout)
try:
yield conn
finally:
conn.close()
class K8sdAPIManager:
"""Manager for K8sd API interactions."""
def __init__(self, factory: ConnectionFactory):
"""Initialise the K8sdAPIManager.
Args:
factory (ConnectionFactory): An instance of a connection factory that will be used
to create connections. This factory determines the type
of connection (e.g., Unix socket or HTTP).
"""
self.factory = factory
def _send_request(
self, endpoint: str, method: str, response_cls: Type[T], body: Optional[dict] = None
) -> T:
"""Send a request to the k8sd API endpoint.
Args:
endpoint (str): The endpoint to send the request to.
method (str): HTTP method for the request.
body (dict): Body of the request.
response_cls (Type[T]): The response class to deserialize the response into.
Raises:
K8sdConnectionError: If there's an HTTP or socket error.
InvalidResponseError: If the response has invalid JSON or fails validation.
Returns:
T: An instance of the response class with the response data.
"""
try:
with self.factory.create_connection() as connection:
body_data = json.dumps(body) if body is not None else None
headers = {"Content-Type": "application/json"} if body_data is not None else {}
connection.request(
method,
endpoint,
body=body_data,
headers=headers,
)
response = connection.getresponse()
data = response.read().decode()
if not 200 <= response.status < 300:
raise InvalidResponseError(
response.status,
f"\tmethod={method}\n"
f"\tendpoint={endpoint}\n"
f"\treason={response.reason}\n"
f"\tbody={data}",
method=method,
endpoint=endpoint,
reason=response.reason,
body=data,
)
return response_cls.parse_raw(data)
except ValueError as e:
raise InvalidResponseError(
response.status,
f"Request failed:\n\tmethod={method}\n\tendpoint={endpoint}",
) from e
except (socket.error, HTTPException) as e:
raise K8sdConnectionError(
f"HTTP or Socket error\tmethod={method}\n\tendpoint={endpoint}"
) from e
def create_join_token(self, name: str, worker: bool = False) -> SecretStr:
"""Create a join token.
Args:
name (str): Name of the node.
worker (bool): Whether the node should join as control-plane or worker.
Returns:
SecretStr: The generated join token if successful.
"""
endpoint = "/1.0/k8sd/cluster/tokens"
body = {
"name": name,
"worker": worker,
}
join_response = self._send_request(endpoint, "POST", CreateJoinTokenResponse, body)
return join_response.metadata.token
def join_cluster(self, config: JoinClusterRequest):
"""Join a node to the k8s cluster.
Args:
config: JoinClusterRequest: config to join the cluster
"""
endpoint = "/1.0/k8sd/cluster/join"
request = config.model_dump(
exclude_none=True, by_alias=True, mode="json", exclude_unset=True
)
self._send_request(endpoint, "POST", EmptyResponse, request)
def remove_node(self, name: str, force: bool = True):
"""Remove a node from the cluster.
Args:
name (str): Name of the node that should be removed.
force (bool): Forcibly remove the node
"""
endpoint = "/1.0/k8sd/cluster/remove"
body = {"name": name, "force": force}
self._send_request(endpoint, "POST", EmptyResponse, body)
def update_cluster_config(self, config: UpdateClusterConfigRequest):
"""Enable or disable a k8s component.
Args:
config (UpdateClusterConfigRequest): The cluster configuration.
"""
endpoint = "/1.0/k8sd/cluster/config"
body = config.model_dump(exclude_none=True, by_alias=True)
self._send_request(endpoint, "PUT", EmptyResponse, body)
def get_cluster_config(self) -> GetClusterConfigResponse:
"""Retrieve the cluster configuration.
Worker nodes are not allowed to call this endpoint:
https://github.com/canonical/k8s-snap/blob/0a5edd2/src/k8s/pkg/k8sd/api/endpoints.go#L121-L126
Returns:
GetClusterConfigResponse: The cluster configuration.
"""
return self._send_request("/1.0/k8sd/cluster/config", "GET", GetClusterConfigResponse)
def get_node_status(self) -> GetNodeStatusResponse:
"""Retrieve the status of the local node.
Returns:
GetNodeStatusResponse: The status of the local node.
"""
return self._send_request("/1.0/k8sd/node", "GET", GetNodeStatusResponse)
def get_cluster_status(self) -> GetClusterStatusResponse:
"""Retrieve cluster status.
Returns:
cluster_status: status of the cluster.
"""
return self._send_request("/1.0/k8sd/cluster", "GET", GetClusterStatusResponse)
def is_cluster_bootstrapped(self) -> bool:
"""Check if K8sd has been bootstrapped.
Returns:
bool: True if the cluster has been bootstrapped, False otherwise.
"""
try:
status = self.get_cluster_status()
return status.error_code == 0
except (K8sdConnectionError, InvalidResponseError) as e:
logger.error("Invalid response while checking if cluster is bootstrapped: %s", e)
return False
def is_cluster_ready(self):
"""Check if the Kubernetes cluster is ready.
The cluster is ready if at least one k8s node is in READY state.
Returns:
bool: True if the cluster is ready, False otherwise.
"""
status = self.get_cluster_status()
return status.metadata and status.metadata.status.ready
def check_k8sd_ready(self):
"""Check if k8sd is ready using various microcluster endpoints.
Raises:
K8sdConnectionError: If the response is Not Found on all endpoints.
"""
ready_endpoints = ["/core/1.0/ready", "/cluster/1.0/ready"]
for i, endpoint in enumerate(ready_endpoints):
try:
self._send_request(endpoint, "GET", EmptyResponse)
break
except InvalidResponseError as ex:
if ex.code == 404:
logger.warning(
"micro-cluster unavailable @ %s (%s of %s): %s",
endpoint,
i + 1,
len(ready_endpoints),
ex,
)
# Try the next endpoint if the current one is not found
continue
raise
else:
raise K8sdConnectionError(
"Exhausted all endpoints while checking if micro-cluster is ready"
)
def bootstrap_k8s_snap(self, request: CreateClusterRequest) -> None:
"""Bootstrap the k8s cluster.
Args:
request (CreateClusterRequest): The request model to bootstrap the cluster.
"""
endpoint = "/1.0/k8sd/cluster"
body = request.dict(exclude_none=True, by_alias=True)
self._send_request(endpoint, "POST", EmptyResponse, body)
def request_auth_token(self, username: str, groups: List[str]) -> SecretStr:
"""Request a Kubernetes authentication token.
Args:
username (str): Username for which the token is requested.
groups (List[str]): Groups associated with the user.
Returns:
SecretStr: The authentication token.
"""
endpoint = "/1.0/kubernetes/auth/tokens"
body = {"username": username, "groups": groups}
auth_response = self._send_request(endpoint, "POST", AuthTokenResponse, body)
return auth_response.metadata.token
def revoke_auth_token(self, token: str) -> None:
"""Revoke a Kubernetes authentication token.
Args:
token (str): The authentication token.
"""
endpoint = "/1.0/kubernetes/auth/tokens"
body = {"token": token}
self._send_request(endpoint, "DELETE", EmptyResponse, body)
def get_kubeconfig(self, server: Optional[str]) -> str:
"""Request a Kubernetes admin config.
Args:
server (str): Optional server to replace in the kubeconfig endpoint
Returns:
str: The authentication token.
"""
endpoint = "/1.0/k8sd/kubeconfig"
body = {"server": server or ""}
response = self._send_request(endpoint, "GET", GetKubeConfigResponse, body)
return response.metadata.kubeconfig
def refresh_certs(
self, extra_sans: List[str], expiration_seconds: Optional[int] = None
) -> None:
"""Refresh the certificates for the cluster.
Args:
extra_sans (list[str]): List of extra SANs for the certificates.
expiration_seconds (Optional[int]): The duration of the new certificates.
"""
plan_endpoint = "/1.0/k8sd/refresh-certs/plan"
plan_resp = self._send_request(plan_endpoint, "POST", RefreshCertificatesPlanResponse, {})
# NOTE(Hue): Default certificate expiration is set to 20 years:
# https://github.com/canonical/k8s-snap/blob/32e35128394c0880bcc4ce87447f4247cc315ba5/src/k8s/pkg/k8sd/app/hooks_bootstrap.go#L331-L338
if expiration_seconds is None:
now = datetime.now()
twenty_years_later = datetime(
now.year + 20, now.month, now.day, now.hour, now.minute, now.second
)
expiration_seconds = int((twenty_years_later - now).total_seconds())
run_endpoint = "/1.0/k8sd/refresh-certs/run"
run_req = RefreshCertificatesRunRequest( # type: ignore
seed=plan_resp.metadata.seed,
expiration_seconds=expiration_seconds,
extra_sans=extra_sans,
)
body = run_req.dict(exclude_none=True, by_alias=True)
self._send_request(run_endpoint, "POST", RefreshCertificatesRunResponse, body)