Glauth K8S
- Identity Charmers
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/stable | 16 | 20 May 2024 | |
latest/edge | 32 | 20 Nov 2024 |
juju deploy glauth-k8s
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
Platform:
charms.glauth_k8s.v0.ldap
-
- Last updated 02 Sep 2024
- Revision Library version 0.6
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
"""# Juju Charm Library for the `ldap` Juju Interface.
This juju charm library contains the Provider and Requirer classes for handling
the `ldap` interface.
## Requirer Charm
The requirer charm is expected to:
- Provide information for the provider charm to deliver LDAP related
information in the juju integration, in order to communicate with the LDAP
server and authenticate LDAP operations
- Listen to the custom juju event `LdapReadyEvent` to obtain the LDAP
related information from the integration
- Listen to the custom juju event `LdapUnavailableEvent` to handle the
situation when the LDAP integration is broken
```python
from charms.glauth_k8s.v0.ldap import (
LdapRequirer,
LdapReadyEvent,
LdapUnavailableEvent,
)
class RequirerCharm(CharmBase):
# LDAP requirer charm that integrates with an LDAP provider charm.
def __init__(self, *args):
super().__init__(*args)
self.ldap_requirer = LdapRequirer(self)
self.framework.observe(
self.ldap_requirer.on.ldap_ready,
self._on_ldap_ready,
)
self.framework.observe(
self.ldap_requirer.on.ldap_unavailable,
self._on_ldap_unavailable,
)
def _on_ldap_ready(self, event: LdapReadyEvent) -> None:
# Consume the LDAP related information
ldap_data = self.ldap_requirer.consume_ldap_relation_data(
relation=event.relation,
)
# Configure the LDAP requirer charm
...
def _on_ldap_unavailable(self, event: LdapUnavailableEvent) -> None:
# Handle the situation where the LDAP integration is broken
...
```
As shown above, the library offers custom juju events to handle specific
situations, which are listed below:
- ldap_ready: event emitted when the LDAP related information is ready for
requirer charm to use.
- ldap_unavailable: event emitted when the LDAP integration is broken.
Additionally, the requirer charmed operator needs to declare the `ldap`
interface in the `metadata.yaml`:
```yaml
requires:
ldap:
interface: ldap
```
## Provider Charm
The provider charm is expected to:
- Use the information provided by the requirer charm to provide LDAP related
information for the requirer charm to connect and authenticate to the LDAP
server
- Listen to the custom juju event `LdapRequestedEvent` to offer LDAP related
information in the integration
```python
from charms.glauth_k8s.v0.ldap import (
LdapProvider,
LdapRequestedEvent,
)
class ProviderCharm(CharmBase):
# LDAP provider charm.
def __init__(self, *args):
super().__init__(*args)
self.ldap_provider = LdapProvider(self)
self.framework.observe(
self.ldap_provider.on.ldap_requested,
self._on_ldap_requested,
)
def _on_ldap_requested(self, event: LdapRequestedEvent) -> None:
# Consume the information provided by the requirer charm
requirer_data = event.data
# Prepare the LDAP related information using the requirer's data
ldap_data = ...
# Update the integration data
self.ldap_provider.update_relations_app_data(
relation.id,
ldap_data,
)
```
As shown above, the library offers custom juju events to handle specific
situations, which are listed below:
- ldap_requested: event emitted when the requirer charm is requesting the
LDAP related information in order to connect and authenticate to the LDAP server
"""
import json
from functools import wraps
from string import Template
from typing import Any, Callable, List, Literal, Optional, Union
import ops
from ops.charm import (
CharmBase,
RelationBrokenEvent,
RelationChangedEvent,
RelationCreatedEvent,
RelationEvent,
)
from ops.framework import EventSource, Object, ObjectEvents
from ops.model import Relation, SecretNotFoundError
from pydantic import (
BaseModel,
ConfigDict,
Field,
StrictBool,
ValidationError,
field_serializer,
field_validator,
)
# The unique CharmHub library identifier, never change it
LIBID = "5a535b3c4d0b40da98e29867128e57b9"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 6
PYDEPS = ["pydantic~=2.5.3"]
DEFAULT_RELATION_NAME = "ldap"
BIND_ACCOUNT_SECRET_LABEL_TEMPLATE = Template("relation-$relation_id-bind-account-secret")
def leader_unit(func: Callable) -> Callable:
@wraps(func)
def wrapper(
obj: Union["LdapProvider", "LdapRequirer"], *args: Any, **kwargs: Any
) -> Optional[Any]:
if not obj.unit.is_leader():
return None
return func(obj, *args, **kwargs)
return wrapper
@leader_unit
def _update_relation_app_databag(
ldap: Union["LdapProvider", "LdapRequirer"], relation: Relation, data: dict
) -> None:
if relation is None:
return
data = {k: str(v) if v else "" for k, v in data.items()}
relation.data[ldap.app].update(data)
class Secret:
def __init__(self, secret: ops.Secret = None) -> None:
self._secret: ops.Secret = secret
@property
def uri(self) -> str:
return self._secret.id if self._secret else ""
@classmethod
def load(
cls,
charm: CharmBase,
label: str,
*,
content: Optional[dict[str, str]] = None,
) -> "Secret":
try:
secret = charm.model.get_secret(label=label)
except SecretNotFoundError:
secret = charm.app.add_secret(label=label, content=content)
return Secret(secret)
@classmethod
def create_or_update(cls, charm: CharmBase, label: str, content: dict[str, str]) -> "Secret":
try:
secret = charm.model.get_secret(label=label)
secret.set_content(content=content)
except SecretNotFoundError:
secret = charm.app.add_secret(label=label, content=content)
return Secret(secret)
def grant(self, relation: Relation) -> None:
self._secret.grant(relation)
def remove(self) -> None:
self._secret.remove_all_revisions()
class LdapProviderBaseData(BaseModel):
urls: List[str] = Field(frozen=True)
base_dn: str = Field(frozen=True)
starttls: StrictBool = Field(frozen=True)
@field_validator("urls", mode="before")
@classmethod
def validate_ldap_urls(cls, vs: List[str] | str) -> List[str]:
if isinstance(vs, str):
vs = json.loads(vs)
if isinstance(vs, str):
vs = [vs]
for v in vs:
if not v.startswith("ldap://"):
raise ValidationError.from_exception_data("Invalid LDAP URL scheme.")
return vs
@field_serializer("urls")
def serialize_list(self, urls: List[str]) -> str:
return str(json.dumps(urls))
@field_validator("starttls", mode="before")
@classmethod
def deserialize_bool(cls, v: str | bool) -> bool:
if isinstance(v, str):
return True if v.casefold() == "true" else False
return v
@field_serializer("starttls")
def serialize_bool(self, starttls: bool) -> str:
return str(starttls)
class LdapProviderData(LdapProviderBaseData):
bind_dn: str = Field(frozen=True)
bind_password: str = Field(exclude=True)
bind_password_secret: Optional[str] = None
auth_method: Literal["simple"] = Field(frozen=True)
class LdapRequirerData(BaseModel):
model_config = ConfigDict(frozen=True)
user: str
group: str
class LdapRequestedEvent(RelationEvent):
"""An event emitted when the LDAP integration is built."""
@property
def data(self) -> Optional[LdapRequirerData]:
relation_data = self.relation.data.get(self.relation.app)
return LdapRequirerData(**relation_data) if relation_data else None
class LdapProviderEvents(ObjectEvents):
ldap_requested = EventSource(LdapRequestedEvent)
class LdapReadyEvent(RelationEvent):
"""An event when the LDAP related information is ready."""
class LdapUnavailableEvent(RelationEvent):
"""An event when the LDAP integration is unavailable."""
class LdapRequirerEvents(ObjectEvents):
ldap_ready = EventSource(LdapReadyEvent)
ldap_unavailable = EventSource(LdapUnavailableEvent)
class LdapProvider(Object):
on = LdapProviderEvents()
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
) -> None:
super().__init__(charm, relation_name)
self.charm = charm
self.app = charm.app
self.unit = charm.unit
self._relation_name = relation_name
self.framework.observe(
self.charm.on[self._relation_name].relation_changed,
self._on_relation_changed,
)
self.framework.observe(
self.charm.on[self._relation_name].relation_broken,
self._on_relation_broken,
)
@leader_unit
def _on_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle the event emitted when the requirer charm provides the necessary data."""
self.on.ldap_requested.emit(event.relation)
@leader_unit
def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle the event emitted when the LDAP integration is broken."""
secret = Secret.load(
self.charm,
label=BIND_ACCOUNT_SECRET_LABEL_TEMPLATE.substitute(relation_id=event.relation.id),
)
secret.remove()
def get_bind_password(self, relation_id: int) -> Optional[str]:
"""Retrieve the bind account password for a given integration."""
try:
secret = self.charm.model.get_secret(
label=BIND_ACCOUNT_SECRET_LABEL_TEMPLATE.substitute(relation_id=relation_id)
)
except SecretNotFoundError:
return None
return secret.get_content().get("password")
def update_relations_app_data(
self,
data: Union[LdapProviderBaseData, LdapProviderData],
/,
relation_id: Optional[int] = None,
) -> None:
"""An API for the provider charm to provide the LDAP related information."""
if not (relations := self.charm.model.relations.get(self._relation_name)):
return
if relation_id is not None and isinstance(data, LdapProviderData):
relations = [relation for relation in relations if relation.id == relation_id]
secret = Secret.create_or_update(
self.charm,
BIND_ACCOUNT_SECRET_LABEL_TEMPLATE.substitute(relation_id=relation_id),
{"password": data.bind_password},
)
secret.grant(relations[0])
data.bind_password_secret = secret.uri
for relation in relations:
_update_relation_app_databag(self.charm, relation, data.model_dump())
class LdapRequirer(Object):
"""An LDAP requirer to consume data delivered by an LDAP provider charm."""
on = LdapRequirerEvents()
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
*,
data: Optional[LdapRequirerData] = None,
) -> None:
super().__init__(charm, relation_name)
self.charm = charm
self.app = charm.app
self.unit = charm.unit
self._relation_name = relation_name
self._data = data
self.framework.observe(
self.charm.on[self._relation_name].relation_created,
self._on_ldap_relation_created,
)
self.framework.observe(
self.charm.on[self._relation_name].relation_changed,
self._on_ldap_relation_changed,
)
self.framework.observe(
self.charm.on[self._relation_name].relation_broken,
self._on_ldap_relation_broken,
)
def _on_ldap_relation_created(self, event: RelationCreatedEvent) -> None:
"""Handle the event emitted when an LDAP integration is created."""
user = self._data.user if self._data else self.app.name
group = self._data.group if self._data else self.model.name
_update_relation_app_databag(self.charm, event.relation, {"user": user, "group": group})
def _on_ldap_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle the event emitted when the LDAP related information is ready."""
provider_app = event.relation.app
if not event.relation.data.get(provider_app):
return
self.on.ldap_ready.emit(event.relation)
def _on_ldap_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle the event emitted when the LDAP integration is broken."""
self.on.ldap_unavailable.emit(event.relation)
def consume_ldap_relation_data(
self,
/,
relation: Optional[Relation] = None,
relation_id: Optional[int] = None,
) -> Optional[LdapProviderData]:
"""An API for the requirer charm to consume the LDAP related information in the application databag."""
if not relation:
relation = self.charm.model.get_relation(self._relation_name, relation_id)
if not relation:
return None
provider_data = dict(relation.data.get(relation.app))
if secret_id := provider_data.get("bind_password_secret"):
secret = self.charm.model.get_secret(id=secret_id)
provider_data["bind_password"] = secret.get_content().get("password")
return LdapProviderData(**provider_data) if provider_data else None
def _is_relation_active(self, relation: Relation) -> bool:
"""Whether the relation is active based on contained data."""
try:
_ = repr(relation.data)
return True
except (RuntimeError, ops.ModelError):
return False
@property
def relations(self) -> List[Relation]:
"""The list of Relation instances associated with this relation_name."""
return [
relation
for relation in self.charm.model.relations[self._relation_name]
if self._is_relation_active(relation)
]
def _ready_for_relation(self, relation: Relation) -> bool:
if not relation.app:
return False
return "urls" in relation.data[relation.app] and "bind_dn" in relation.data[relation.app]
def ready(self, relation_id: Optional[int] = None) -> bool:
"""Check if the resource has been created.
This function can be used to check if the Provider answered with data in the charm code
when outside an event callback.
Args:
relation_id (int, optional): When provided the check is done only for the relation id
provided, otherwise the check is done for all relations
Returns:
True or False
Raises:
IndexError: If relation_id is provided but that relation does not exist
"""
if relation_id is None:
return (
all(self._ready_for_relation(relation) for relation in self.relations)
if self.relations
else False
)
try:
relation = [relation for relation in self.relations if relation.id == relation_id][0]
return self._ready_for_relation(relation)
except IndexError:
raise IndexError(f"relation id {relation_id} cannot be accessed")