Compound Status

  • Pietro Pasotti
Channel Revision Published Runs on
latest/stable 1 26 Jul 2022
Ubuntu 20.04
juju deploy compound-status
Show information

Platform:

Ubuntu
20.04

charms.compound_status.v0.compound_status

'''This charm lib provides a CompoundStatus utility.

Example usage:

>>> class StatusPool(CompoundStatus):
>>>     SKIP_UNKNOWN = True
>>>
>>>     workload = Status()
>>>     relation_1 = Status()
>>>     relation_2 = Status(tag='rel2')
>>>
>>> class TesterCharm(CharmBase):
>>>     def __init__(self, framework, key=None):
>>>         super().__init__(framework, key)
>>>         status_pool = StatusPool(self)
>>>
>>>         # pro tip: keep the messages short
>>>         status_pool.relation_1 = ActiveStatus('βœ…')
>>>         status_pool.commit()  # sync with juju
>>>         # equivalent to self.unit.status = status_pool.coalesce()
>>>
>>>         status_pool.relation_1.unset()  # send status_1 back to unknown, until you set it again.
>>>
>>>         status_pool.relation_2 = WaitingStatus('πŒ—: foo')
>>>         status_pool.workload.warning('some debug message about why the workload is blocked')
>>>         status_pool.workload.info('some info about the workload')
>>>         status_pool.workload.error('whoopsiedaisies')
>>>         status_pool.workload = BlockedStatus('blocked', 'see debug-log for the reason')
>>>         status_pool.commit()
'''

# The unique Charmhub library identifier, never change it
LIBID = "2dce4f51241e493dbbbfee1c9bdeb48b"

# Increment this major API version when introducing breaking changes
LIBAPI = 0

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 5

import inspect
import json
import logging
import typing
from collections import Counter
from itertools import chain
from logging import getLogger
from operator import itemgetter
from typing import (
    TYPE_CHECKING,
    Dict,
    Literal,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypedDict,
    Union,
)

from ops.charm import CharmBase
from ops.framework import Handle, Object, StoredStateData
from ops.model import (
    ActiveStatus,
    BlockedStatus,
    MaintenanceStatus,
    StatusBase,
    WaitingStatus,
)
from ops.storage import NoSnapshotError

log = getLogger("compound-status")

StatusName = Literal["blocked", "waiting", "maintenance", "unknown", "active"]
# are sorted best-to-worst
STATUSES = ("unknown", "active", "maintenance", "waiting", "blocked")
STATUS_PRIORITIES: Dict[str, int] = {val: i for i, val in enumerate(STATUSES)}
STATUS_NAME_TO_CLASS: Dict[StatusName, Type[StatusBase]] = {
    "blocked": BlockedStatus,
    "waiting": WaitingStatus,
    "maintenance": MaintenanceStatus,
    "active": ActiveStatus
    # omit unknown as it should not be used directly.
}


class _StatusDict(TypedDict, total=False):
    type: Literal["subordinate", "master"]  # noqa
    status: StatusName
    message: str
    tag: str
    attr: str
    user_set: bool


PositiveNumber = Union[float, int]


class Status:
    """Represents a status."""

    _ID = 0

    def __repr__(self):
        return "<Status {} ({}): {}>".format(self._status, self.tag, self._message)

    def __init__(
        self, tag: Optional[str] = None, priority: Optional[PositiveNumber] = None
    ):
        # to keep track of instantiation order
        self._id = Status._ID
        Status._ID += 1

        # if tag is None, we'll guess it from the attr name
        # and late-bind it
        self.tag = tag  # type: Optional[str]
        self._status = "unknown"  # type: StatusName
        self._message = ""

        # externally managed (and henceforth immutable) state
        self._master = None  # type: Optional[MasterStatus]
        self._logger = None  # type: Optional[logging.Logger]
        self._attr = None  # type: Optional[str]

        if priority is not None:
            if not isinstance(priority, (float, int)):
                raise TypeError(f"priority needs to be float|int, not {type(priority)}")
            if priority <= 0:
                raise TypeError(f"priority needs to be > 0, not {priority}")

        self._priority = priority  # type: Optional[float]  # externally managed

    @property
    def priority(self):
        """Return the priority of this status."""
        return self._priority

    @staticmethod
    def priority_key(status: Union["Status", StatusName]):
        """Return the priority key."""
        if isinstance(status, str):
            return STATUS_PRIORITIES[status]
        return STATUS_PRIORITIES[status.status], -(status.priority or 0)

    @staticmethod
    def sort(statuses: Sequence["Status"]):
        """Return the statuses, sorted worst-to-best."""
        return sorted(statuses, key=Status.priority_key, reverse=True)

    def log(self, level: int, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `log`."""
        if not self._logger:
            raise RuntimeError(f"_logger not set on {self}.")
        self._logger.log(level, msg, *args, **kwargs)

    def critical(self, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `critical`."""
        self.log(50, msg, *args, **kwargs)

    def error(self, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `error`."""
        self.log(40, msg, *args, **kwargs)

    def warning(self, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `warning`."""
        self.log(30, msg, *args, **kwargs)

    def info(self, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `info`."""
        self.log(20, msg, *args, **kwargs)

    def debug(self, msg: str, *args, **kwargs):
        """Associate with this status a log entry with level `debug`."""
        self.log(10, msg, *args, **kwargs)

    def _set(self, status: StatusName, msg: str = ""):
        assert status in STATUS_NAME_TO_CLASS, "invalid status: {}".format(status)
        assert isinstance(msg, str), type(msg)

        self._status = status
        self._message = msg

        return self

    def unset(self):
        """Unsets status and message.

        This status will go back to its initial state and be removed from the
        Master clobber.
        """
        self.debug("unset")
        self._status = "unknown"
        self._message = ""

    def __get__(self, instance, owner):
        return self

    def __set__(self, instance, value: StatusBase):
        assert value.name in STATUSES, f"{value} has an invalid name: {value.name}"
        self._set(value.name, value.message)

    @property
    def status(self) -> StatusName:
        """Return the string representing this status."""
        return self._status

    @property
    def name(self) -> StatusName:
        """Alias for interface-compatibility with ops.model.StatusBase."""
        return self.status

    @property
    def message(self) -> str:
        """Return the message associated with this status."""
        return self._message

    def _snapshot(self) -> _StatusDict:
        """Serialize Status for storage."""
        # tag should not change, and is reloaded on each init.
        attr = self._attr
        assert attr, attr  # type guard
        tag = self.tag
        assert tag, tag  # type guard

        dct: _StatusDict = {
            "type": "subordinate",
            "status": self._status,
            "message": self._message,
            "tag": tag,
            "attr": attr,
        }
        return dct

    def _restore(self, dct: _StatusDict):
        """Restore Status from stored state."""
        type_ = dct.get("type")
        assert type_, type_
        assert type_ == "subordinate", type_

        status = dct.get("status")
        message = dct.get("message")
        tag = dct.get("tag")
        attr = dct.get("attr")

        assert status is not None, status
        assert message is not None, message
        assert tag is not None, tag
        assert attr is not None, attr

        self._status = status
        self._message = message
        self.tag = tag
        self._attr = attr

    def __hash__(self):
        return hash((self.tag, self.status, self.message))

    def __eq__(self, other: "Status") -> bool:
        return hash(self) == hash(other)


class Clobberer:
    """Clobberer. Repeat it many times fast."""

    def clobber(self, statuses: Sequence[Status], skip_unknown: bool = False) -> str:
        """Produce a clobbered representation of the statuses."""
        raise NotImplementedError


class WorstOnly(Clobberer):
    """This clobberer provides a worst-only view of the current statuses in the pool.

    e.g. if the status pool has three statuses:
        relation_1 = ActiveStatus('βœ…')
        relation_2 = WaitingStatus('πŒ—: foo')
        workload = BlockedStatus('πŸ’”')

    The Summary clobbered status will have as message::
        (workload) πŸ’”
    """

    def __init__(self, fmt: str = "({0}) {1}", sep: str = "; "):
        self._fmt = fmt

    def clobber(self, statuses: Sequence[Status], skip_unknown: bool = False) -> str:
        """Produce a clobbered representation of the statuses."""
        worst = Status.sort(statuses)[0]
        return self._fmt.format(worst.tag, worst.message)


class Summary(Clobberer):
    """This clobberer provides a worst-first, summarized view of all statuses.

    e.g. if the status pool has three statuses:
        relation_1 = ActiveStatus('βœ…')
        relation_2 = WaitingStatus('πŒ—: foo')
        workload = BlockedStatus('πŸ’”')

    The Summary clobbered status will have as message:
        (workload:blocked) πŸ’”; (relation_1:active) βœ…; (rel2:waiting) πŒ—: foo
    """

    def __init__(self, fmt: str = "({0}:{1}) {2}", sep: str = "; "):
        self._fmt = fmt
        self._sep = sep

    def clobber(self, statuses: Sequence[Status], skip_unknown: bool = False):
        """Produce a clobbered representation of the statuses."""
        msgs = []
        for status in Status.sort(statuses):
            if skip_unknown and status.status == "unknown":
                continue
            msgs.append(self._fmt.format(status.tag, status.status, status.message))
        return self._sep.join(msgs)


class Condensed(Clobberer):
    """This clobberer provides a very compact, summarized view of all statuses.

    e.g. if the status pool has three statuses:
        relation_1 = ActiveStatus('βœ…')
        relation_2 = WaitingStatus('βœ…')
        relation_3 = BlockedStatus('βœ…')
        relation_... = ???
        relation_N = ActiveStatus('βœ…')
        relation_2 = WaitingStatus('πŒ—: foo')
        workload = BlockedStatus('πŸ’”')

    The Condensed clobbered status will have as message:
        15 blocked; 43 waiting; 12 active

    If all are active the message will be empty.
    Priority will be ignored.
    """

    def __init__(self, fmt: str = "{0} {1}", sep: str = "; "):
        self._fmt = fmt
        self._sep = sep

    def clobber(self, statuses: Sequence[Status], skip_unknown: bool = False):
        """Produce a clobbered representation of the statuses."""
        ctr = Counter(s.status for s in statuses)

        if set(ctr) == {
            "active",
        }:  # only active statuses
            return ""

        msgs = []
        for status, count in sorted(
            ctr.items(), key=lambda v: Status.priority_key(v[0]), reverse=True
        ):
            if skip_unknown and status == "unknown":
                continue
            msgs.append(self._fmt.format(count, status))
        return self._sep.join(msgs)


class MasterStatus(Status):
    """The Master status of the pool.

    Parameters:
        - `tag`: the name to associate the master status with.

        - `fmt`: The format for each child status. Needs to contain three {}
            slots, will receive three arguments in this order:

            - the tag of the child status (a string)
            - the name of the child status (e.g. 'blocked', or 'active')
            - the message associated with the child status (another string)

        - `sep`: The separator used to join together the child statuses.
    """

    SKIP_UNKNOWN = False

    def __init__(
        self,
        tag: str = "master",
        clobberer: Clobberer = WorstOnly(),
        priority: Optional[PositiveNumber] = None,
    ):
        super().__init__(tag, priority=priority)
        self.children = ()  # type: Tuple[Status, ...]  # gets populated by CompoundStatus
        self._owner = None  # type: Optional[CharmBase]  # externally managed
        self._user_set = False
        self._clobberer = clobberer

        self._logger = log.getChild(tag)
        self._master = self  # lucky you
        self._attr = "*master*"

    def _add_child(self, status: Status):
        """Add a child status."""
        status._master = self
        logger = self._logger
        assert logger  # type guard
        tag = status.tag
        assert tag  # type guard

        status._logger = logger.getChild(tag)
        self.children = self.children + (status,)

    def _remove_child(self, status: Status):
        """Remove a child status."""
        if status not in self.children:
            raise ValueError(f"{status} not in {self}")

        status._master = None
        status._logger = None
        self.children = tuple(a for a in self.children if a is not status)

    @property
    def message(self) -> str:
        """Return the message associated with this status."""
        if self._user_set:
            return self._message
        return self._clobber_statuses(self.children, self.SKIP_UNKNOWN)

    def _clobber_statuses(
        self, statuses: Sequence[Status], skip_unknown: bool = False
    ) -> str:
        """Produce a message summarizing the child statuses."""
        return self._clobberer.clobber(statuses, skip_unknown)

    @property
    def status(self) -> StatusName:
        """Return the status."""
        if self._user_set:
            return self._status
        return Status.sort(self.children)[0].status

    def coalesce(self) -> StatusBase:
        """Cast to an ops.model.StatusBase instance by clobbering statuses and messages."""
        if self.status == "unknown":
            raise ValueError("cannot coalesce unknown status")
        status_type = STATUS_NAME_TO_CLASS[self.status]
        status_msg = self.message
        return status_type(status_msg)

    def _set(self, status: StatusName, msg: str = ""):
        """Force-set this status and message.

        Should not be called by user code.
        """
        self._user_set = True
        super()._set(status, msg)

    def unset(self):
        """Unset all child statuses, as well as any user_set Master status."""
        super().unset()

        self._user_set = False
        for child in self.children:
            child.unset()

    def _snapshot(self) -> _StatusDict:
        """Serialize Status for storage."""
        dct = super()._snapshot()
        dct["type"] = "master"
        dct["user_set"] = self._user_set
        return dct

    def _restore(self, dct: _StatusDict):
        """Restore Status from stored state."""
        type_ = dct.get("type", None)
        assert type_, type_  # type guard
        assert type_ == "master", type_

        status = dct.get("status", None)
        message = dct.get("message", None)
        user_set = dct.get("user_set", None)

        assert status is not None, status
        assert message is not None, message
        assert user_set is not None, user_set

        self._status = status
        self._message = message
        self._user_set = user_set

    def __repr__(self):
        if not self.children:
            return "<MasterStatus -- empty>"
        if self.status == "unknown":
            return "unknown"
        return str(self.coalesce())


class StatusPool(Object):
    """Represents the pool of statuses available to an Object."""

    # whether unknown statuses should be omitted from the master message
    SKIP_UNKNOWN = False
    # whether the status should be committed automatically when the hook exits
    AUTO_COMMIT = True
    # key used to register handle
    KEY = "status_pool"

    if TYPE_CHECKING:
        _statuses = {}  # type: Dict[str, Status]
        _charm: CharmBase
        master = MasterStatus()  # type: MasterStatus
        _manual_priorities = False  # type: bool
        _priority_counter = 0  # type: int

    def __init__(self, charm: CharmBase, key: Optional[str] = None):
        super().__init__(charm, key or self.KEY)
        # skip setattr
        self.__dict__["master"] = MasterStatus()
        self.__dict__["_statuses"] = {}
        self.__dict__["_manual_priorities"] = False
        self.__dict__["_priority_counter"] = 0

        stored_handle = Handle(self, StoredStateData.handle_kind, "_status_pool_state")
        charm.framework.register_type(
            StoredStateData, self, StoredStateData.handle_kind
        )
        try:
            self._state = charm.framework.load_snapshot(stored_handle)
        except NoSnapshotError:
            self._state = StoredStateData(self, "_status_pool_state")
            self._state["statuses"] = "{}"

        self._init_statuses(charm)
        self._load_from_stored_state()
        if self.AUTO_COMMIT:
            charm.framework.observe(
                charm.framework.on.commit, self._on_framework_commit  # type: ignore
            )

    def get_status(self, attr: str) -> Status:
        """Retrieve a status by name. Equivalent to getattr(self, attr)."""
        return getattr(self, attr)

    def set_status(self, attr: str, status: StatusBase):
        """Set a status by name. Equivalent to setattr(self, attr, status)."""
        return setattr(self, attr, status)

    def add_status(self, status: Status, attr: Optional[str] = None):
        """Add status to this pool; under attr: `attr`.

        If attr is not provided, status.tag will be used instead if set.

        NB `attr` needs to be a valid Python identifier.
        """
        tag = status.tag
        if not attr and not tag:
            raise ValueError(
                f"either give status {status} a tag, or pass `attr`" f"to add_status."
            )

        # pyright ain't to bright with inline conditionals
        attribute: str = typing.cast(str, attr or tag)

        if not attribute.isidentifier():
            raise ValueError(
                f"cannot set {attribute!r}={status} on {self}: "
                f"attribute needs to be a valid Python identifier."
            )

        # will check that attribute is not in use already
        self._add_status(status, attribute)

        setattr(self, attribute, status)

    def remove_status(self, status: Status):
        """Remove the status and forget about it."""
        # some safety-first cleanup
        status.unset()
        self.master._remove_child(status)  # noqa
        attr = status._attr  # noqa
        assert attr is not None, status
        delattr(self, attr)

    def _add_status(self, status: Status, attr: str):
        if getattr(self, attr, None) not in {status, None}:
            raise ValueError(
                f"cannot set {attr!r} = {status}." f"attribute already set on {self}"
            )

        if status.priority is None:
            if self._manual_priorities:
                raise ValueError(
                    "Either pass a priority to all Statuses, "
                    "or leave it blank for all."
                )
        else:
            self._manual_priorities = True

        if not self._manual_priorities:
            self._priority_counter += 1
            status._priority = self._priority_counter

        if status.priority and status.priority > 100:
            raise ValueError("Status priority cannot be > 100")

        status.tag = status.tag or attr
        self.master._add_child(status)  # noqa

        status._attr = attr
        self._statuses[attr] = status

    def _init_statuses(self, charm: CharmBase):
        """Extract the statuses from the class namespace.

        And associate them with the master status.
        """

        def _is_child_status(obj):
            return isinstance(obj, Status) and not isinstance(obj, MasterStatus)

        statuses_ = inspect.getmembers(self, predicate=_is_child_status)
        statuses = sorted(statuses_, key=lambda s: s[1]._id)

        master = self.master
        # bind children to master, set tag if unset, init logger
        for attr, obj in statuses:
            self._add_status(obj, attr)

        master.SKIP_UNKNOWN = self.SKIP_UNKNOWN
        master.children = tuple(a[1] for a in statuses)

        # skip setattr
        self.__dict__["_statuses"] = dict(statuses)
        self.__dict__["_charm"] = charm

    def _load_from_stored_state(self):
        """Retrieve stored state snapshot of current statuses."""
        statuses_raw = typing.cast(str, self._state["statuses"])
        stored_statuses = typing.cast(Dict[str, _StatusDict], json.loads(statuses_raw))
        for attr, status_dct in stored_statuses.items():
            if attr == "*master*":
                status = self.master
            else:
                if hasattr(self, attr):  # status was statically defined
                    status = getattr(self, attr)
                else:  # status was dynamically added
                    status = Status()
                    attr = status_dct.get("attr", None)
                    assert attr is not None, status_dct  # type guard
                    self.add_status(status, attr)

            status._restore(status_dct)  # noqa

    def _store(self):
        """Dump stored state."""
        all_statuses = chain(map(itemgetter(1), self._statuses.items()), (self.master,))
        statuses = {s._attr: s._snapshot() for s in all_statuses}
        self._state["statuses"] = json.dumps(statuses)

    def __setattr__(self, key: str, value: StatusBase):
        if isinstance(value, StatusBase):
            name = typing.cast(Optional[StatusName], getattr(value, "name", None))
            if name not in STATUSES:
                raise RuntimeError(
                    f"You cannot set {self} to {value}; its name is {name}, "
                    f"which is an invalid status name. `value` should "
                    f"be an instance of a StatusBase subclass."
                )

            if key == "master":
                return self.master._set(name, value.message)  # noqa
            elif key in self._statuses:
                return self._statuses[key]._set(name, value.message)  # noqa
            else:
                raise AttributeError(key)
        return super().__setattr__(key, value)

    def _on_framework_commit(self, _event):
        log.debug("master status auto-committed")
        self.commit()

    def commit(self):
        """Store the current state and sync with juju."""
        assert isinstance(self.master, MasterStatus), type(self.master)

        # cannot coalesce in unknown status
        if self.master.status != "unknown":
            self._charm.unit.status = self.master.coalesce()
            self._store()

        self._charm.framework.save_snapshot(self._state)  # type: ignore
        self._charm.framework._storage.commit()  # noqa

    def unset(self):
        """Unsets master status (and all children)."""
        self.master.unset()

    def __repr__(self):
        return repr(self.master)