Vault
- Canonical Telco
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/edge | 89 | 31 Jan 2024 | |
latest/edge | 9 | 27 Jan 2023 | |
1.16/stable | 280 | 04 Oct 2024 | |
1.16/candidate | 280 | 04 Oct 2024 | |
1.16/beta | 280 | 04 Oct 2024 | |
1.16/edge | 291 | 19 Nov 2024 | |
1.15/stable | 248 | 24 Jul 2024 | |
1.15/candidate | 248 | 24 Jul 2024 | |
1.15/beta | 248 | 24 Jul 2024 | |
1.15/edge | 248 | 10 Jul 2024 |
juju deploy vault-k8s --channel 1.16/stable
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
Platform:
charms.vault_k8s.v0.vault_autounseal
-
- Last updated 27 Sep 2024
- Revision Library version 0.4
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# Licensed under the Apache2.0. See LICENSE file in charm source for details.
"""Library for the vault-autounseal relation.
This library contains the Requires and Provides classes for handling the
vault-autounseal interface.
The provider side of the interface is responsible for enabling the vault
transit engine and creating the necessary keys and policies for an external
vault to be able to autounseal itself.
The requirer side of the interface is responsible for retrieving the necessary
details to autounseal the vault instance, and configuring the vault instance to
use them.
## Getting Started
From a charm directory, fetch the library using `charmcraft`:
```shell
charmcraft fetch-lib charms.vault_k8s.v0.vault_autounseal
```
### Provider charm
The provider charm is the charm that provides a Vault instance that can be
used to autounseal other Vault instances via the Vault transit backend.
Add the following to `metadata.yaml`:
```yaml
provides:
vault-autounseal-provides:
interface: vault-autounseal
```
### Requirer charm
The requirer charm is the charm that wishes to autounseal a Vault instance via
the Vault transit backend.
Add the following to `metadata.yaml`:
```yaml
requires:
vault-autounseal-requires:
interface: vault-autounseal
limit: 1
```
### Integration
You can integrate both charms by running:
```bash
juju integrate <vault a>:vault-autounseal-provides <vault b>:vault-autounseal-requires
```
where `vault a` is the Vault app which will provide the autounseal service, and
`vault b` is the Vault app which will be configured for autounseal via `vault a`.
"""
import logging
from dataclasses import dataclass
from typing import Any, Dict, List
import ops
from interface_tester import DataBagSchema
from ops import Relation, RelationDataContent, SecretNotFoundError, model
from pydantic import BaseModel, Field, ValidationError
# The unique Charmhub library identifier, never change it
LIBID = "c33e0a12506444e2b644ac2893ac9394"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 4
class LogAdapter(logging.LoggerAdapter):
"""Adapter for the logger to prepend a prefix to all log lines."""
prefix = "vault_autounseal"
def process(self, msg, kwargs):
"""Prepend the prefix to the log message."""
return f"[{self.prefix}] {msg}", kwargs
logger = LogAdapter(logging.getLogger(__name__), {})
class VaultAutounsealProviderSchema(BaseModel):
"""Provider side of the vault-autounseal relation interface."""
address: str = Field(description="The address of the Vault server to connect to.")
mount_path: str = Field(
description="The path to the transit engine mount point where the key is stored."
)
key_name: str = Field(description="The name of the transit key to use for autounseal.")
credentials_secret_id: str = Field(
description=(
"The secret id of the Juju secret which stores the credentials for authenticating with the Vault server."
)
)
ca_certificate: str = Field(
description="The CA certificate to use when validating the Vault server's certificate."
)
class ProviderSchema(DataBagSchema):
"""The schema for the provider side of this interface."""
app: VaultAutounsealProviderSchema # pyright: ignore[reportIncompatibleVariableOverride, reportGeneralTypeIssues]
class VaultAutounsealDetailsReadyEvent(ops.EventBase):
"""Event emitted on the requirer when Vault autounseal details are ready in the databag."""
def __init__(
self, handle: ops.Handle, address, mount_path, key_name, role_id, secret_id, ca_certificate
):
"""VaultAutounsealDetailsReadyEvent.
Args:
handle: ops.Handle
address: The address of the Vault server to connect to.
mount_path: The path to the transit engine mount point where the key is stored.
key_name: The name of the transit key to use for autounseal.
role_id: Approle role ID.
secret_id: Approle secret ID.
ca_certificate: The CA certificate to use when validating the Vault server's certificate.
"""
super().__init__(handle)
self.address = address
self.mount_path = mount_path
self.key_name = key_name
self.role_id = role_id
self.secret_id = secret_id
self.ca_certificate = ca_certificate
def snapshot(self) -> Dict[str, Any]:
"""Return snapshot data that should be persisted."""
return dict(
super().snapshot(),
address=self.address,
mount_path=self.mount_path,
key_name=self.key_name,
role_id=self.role_id,
secret_id=self.secret_id,
ca_certificate=self.ca_certificate,
)
def restore(self, snapshot: Dict[str, Any]) -> None:
"""Restore the event from a snapshot."""
super().restore(snapshot)
self.address = snapshot["address"]
self.mount_path = snapshot["mount_path"]
self.key_name = snapshot["key_name"]
self.role_id = snapshot["role_id"]
self.secret_id = snapshot["secret_id"]
self.ca_certificate = snapshot["ca_certificate"]
class VaultAutounsealProviderRemoved(ops.EventBase):
"""Event emitted when the vault that provided autounseal capabilities is removed."""
class VaultAutounsealRequirerRelationCreated(ops.EventBase):
"""Event emitted when Vault autounseal should be initialized for a new application."""
def __init__(self, handle: ops.Handle, relation: model.Relation):
super().__init__(handle)
self.relation = relation
def snapshot(self) -> Dict[str, Any]:
"""Return snapshot data that should be persisted."""
return dict(
super().snapshot(),
relation_id=self.relation.id,
relation_name=self.relation.name,
)
def restore(self, snapshot: Dict[str, Any]) -> None:
"""Restore the event from a snapshot."""
super().restore(snapshot)
relation = self.framework.model.get_relation(
snapshot["relation_name"], snapshot["relation_id"]
)
if relation is None:
raise ValueError(
f"Unable to restore {self}: relation {snapshot['relation_name']} (id={snapshot['relation_id']}) not found."
)
self.relation = relation
class VaultAutounsealRequirerRelationBroken(ops.EventBase):
"""Event emitted on the Provider when a relation to a Requirer is broken."""
def __init__(self, handle: ops.Handle, relation: model.Relation):
super().__init__(handle)
self.relation = relation
def snapshot(self) -> Dict[str, Any]:
"""Return snapshot data that should be persisted."""
return dict(
super().snapshot(),
relation_id=self.relation.id,
relation_name=self.relation.name,
)
def restore(self, snapshot: Dict[str, Any]) -> None:
"""Restore the event from a snapshot."""
super().restore(snapshot)
relation = self.framework.model.get_relation(
snapshot["relation_name"], snapshot["relation_id"]
)
if relation is None:
raise ValueError(
f"Unable to restore {self}: relation {snapshot['relation_name']} (id={snapshot['relation_id']}) not found."
)
self.relation = relation
class VaultAutounsealProvidesEvents(ops.ObjectEvents):
"""Events raised by the vault-autounseal relation on the provider side."""
vault_autounseal_requirer_relation_created = ops.EventSource(
VaultAutounsealRequirerRelationCreated
)
vault_autounseal_requirer_relation_broken = ops.EventSource(
VaultAutounsealRequirerRelationBroken
)
class VaultAutounsealRequireEvents(ops.ObjectEvents):
"""Events raised by the vault-autounseal relation on the requirer side."""
vault_autounseal_details_ready = ops.EventSource(VaultAutounsealDetailsReadyEvent)
vault_autounseal_provider_relation_broken = ops.EventSource(VaultAutounsealProviderRemoved)
@dataclass
class AutounsealDetails:
"""The details required to autounseal a vault instance."""
address: str
mount_path: str
key_name: str
role_id: str
secret_id: str
ca_certificate: str
@dataclass
class ApproleDetails:
"""The details required to authenticate with Vault using the approle auth method."""
role_id: str
secret_id: str
class VaultAutounsealProvides(ops.Object):
"""Manages the vault-autounseal relation from the provider side."""
on: VaultAutounsealProvidesEvents = VaultAutounsealProvidesEvents() # type: ignore
def __init__(self, charm: ops.CharmBase, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_created, self._on_relation_created
)
self.framework.observe(
self.charm.on[relation_name].relation_broken, self._on_relation_broken
)
def _on_relation_created(self, event: ops.RelationCreatedEvent) -> None:
"""Handle the relation created event and emit a custom event."""
self.on.vault_autounseal_requirer_relation_created.emit(relation=event.relation)
def _on_relation_broken(self, event: ops.RelationBrokenEvent) -> None:
"""Handle the relation broken event and emit a custom event."""
self.on.vault_autounseal_requirer_relation_broken.emit(relation=event.relation)
def _create_autounseal_credentials_secret(
self, relation: ops.Relation, role_id: str, secret_id: str
) -> str:
"""Create a Juju secret with the autounseal credentials.
Args:
relation: The relation to grant access to the secret.
role_id: The AppRole Role ID to store in the secret.
secret_id: The AppRole Secret ID to store in the secret.
Returns:
The secret id of the created secret.
"""
secret = self.charm.app.add_secret(
{
"role-id": role_id,
"secret-id": secret_id,
},
)
secret.grant(relation)
if secret.id is None:
raise ValueError("Secret id is None")
return secret.id
def set_autounseal_data(
self,
relation: ops.Relation,
vault_address: str,
mount_path: str,
key_name: str,
approle_role_id: str,
approle_secret_id: str,
ca_certificate: str,
) -> None:
"""Set the autounseal data in the relation databag.
Args:
relation: The Juju relation to set the autounseal data in.
vault_address: The address of the Vault server which will be used for autounseal
mount_path: The path to the transit engine mount point where the key is stored.
key_name: The name of the transit key to use for autounseal.
approle_role_id: The AppRole Role ID to use when authenticating with the external Vault server.
approle_secret_id: The AppRole Secret ID to use when authenticating with the external Vault server.
ca_certificate: The CA certificate to use when validating the external Vault server's certificate.
"""
if not self.charm.unit.is_leader():
logger.warning(
"Attempting to set the auto-unseal data without being the leader. Ignoring the request."
)
return
if relation is None:
logger.warning("No relation found")
return
if not relation.active:
logger.warning("Relation is not active")
return
credentials_secret_id = self._create_autounseal_credentials_secret(
relation, approle_role_id, approle_secret_id
)
relation.data[self.charm.app].update(
{
"address": vault_address,
"mount_path": mount_path,
"key_name": key_name,
"credentials_secret_id": credentials_secret_id,
"ca_certificate": ca_certificate,
}
)
def get_outstanding_requests(self, relation_id: int | None = None) -> List[Relation]:
"""Get the outstanding requests for the relation.
This will retrieve any vault-autounseal relations that have not yet had
credentials issued for them.
"""
outstanding_requests: List[Relation] = []
requirer_requests = self.get_active_relations(relation_id=relation_id)
outstanding_requests = [
relation
for relation in requirer_requests
if not self._credentials_issued_for_request(relation_id=relation.id)
]
return outstanding_requests
def get_active_relations(self, relation_id: int | None = None) -> List[Relation]:
"""Get all active relations on the relation name this class was initialized with.
Args:
relation_id: The relation ID to filter by. If None, all active relations are returned.
Returns:
A list of active relations.
"""
relations = (
[
relation
for relation in self.model.relations.get(self.relation_name, [])
if relation.id == relation_id
]
if relation_id is not None
else self.model.relations.get(self.relation_name, [])
)
return [relation for relation in relations if relation.active]
def _credentials_issued_for_request(self, relation_id: int) -> bool:
# If Id is none and more than on relation is present we get an error
relation = self.model.get_relation(self.relation_name, relation_id)
if not relation:
return False
credentials = self._get_credentials(relation)
return credentials is not None
def _get_credentials(self, relation: ops.Relation) -> ApproleDetails | None:
"""Retrieve the credentials from the Juju secret.
Args:
relation: The relation to get the credentials for.
Returns:
An ApproleDetails object if the credentials are found, None otherwise.
"""
if not relation.active:
logger.warning("Relation is not active")
return None
if relation.app is None:
logger.warning("No remote application yet")
return None
credentials_secret_id = relation.data[self.charm.app].get("credentials_secret_id")
if credentials_secret_id is None:
return None
secret = self.model.get_secret(id=credentials_secret_id)
return _get_credentials_from_secret(secret)
def _is_provider_data_valid(data: RelationDataContent) -> bool:
"""Use the pydantic schema to validate the data."""
try:
ProviderSchema(app=VaultAutounsealProviderSchema(**data))
return True
except ValidationError as e:
logger.warning("Invalid data: %s", e)
return False
class VaultAutounsealRequires(ops.Object):
"""Manages the vault-autounseal relation from the requirer side."""
on: VaultAutounsealRequireEvents = VaultAutounsealRequireEvents() # type: ignore
def __init__(self, charm: ops.CharmBase, relation_name: str):
super().__init__(charm, relation_name)
self.relation_name = relation_name
self.framework.observe(charm.on[relation_name].relation_changed, self._on_relation_changed)
self.framework.observe(charm.on[relation_name].relation_broken, self._on_relation_broken)
def _on_relation_changed(self, event: ops.RelationChangedEvent) -> None:
data = event.relation.data[event.app]
if _is_provider_data_valid(data):
details = self.get_details()
if not details:
logger.warning("Missing details, but somehow we passed validation")
return
self.on.vault_autounseal_details_ready.emit(
details.address,
details.mount_path,
details.key_name,
details.role_id,
details.secret_id,
details.ca_certificate,
)
def _on_relation_broken(self, event: ops.RelationBrokenEvent) -> None:
self.on.vault_autounseal_provider_relation_broken.emit()
def get_details(self) -> AutounsealDetails | None:
"""Return the vault address, role id, secret id and ca certificate from the relation databag.
Returns:
An AutounsealDetails object if the data is valid, None otherwise.
"""
relation = self.framework.model.get_relation(self.relation_name)
if not relation:
return None
if not relation.active:
return None
if relation.app is None:
logger.warning("No remote application yet")
return None
data = relation.data[relation.app]
address = data.get("address")
mount_path = data.get("mount_path")
key_name = data.get("key_name")
ca_certificate = data.get("ca_certificate")
credentials = self._get_credentials(relation)
if not (address and mount_path and key_name and ca_certificate and credentials):
return None
return AutounsealDetails(
address,
mount_path,
key_name,
credentials.role_id,
credentials.secret_id,
ca_certificate,
)
def _get_credentials(self, relation: ops.Relation) -> ApproleDetails | None:
"""Return the token from the Juju secret.
Returns:
A tuple containing the role id and secret id
"""
if not relation.active:
logger.warning("Relation is not active")
return None
if relation.app is None:
logger.warning("No remote application yet")
return None
credentials_secret_id = relation.data[relation.app].get("credentials_secret_id")
if not credentials_secret_id:
return None
secret = self.model.get_secret(id=credentials_secret_id)
return _get_credentials_from_secret(secret)
def _get_credentials_from_secret(secret: ops.Secret) -> ApproleDetails | None:
"""Retrieve the Approle credentials from the Juju secret.
Args:
secret: The secret to get the credentials for.
Returns:
An ApproleDetails object if the credentials are found, None otherwise.
"""
try:
secret_content = secret.get_content(refresh=True)
except SecretNotFoundError:
logger.warning("Secret not found")
return None
role_id = secret_content.get("role-id")
secret_id = secret_content.get("secret-id")
return ApproleDetails(role_id, secret_id) if role_id and secret_id else None