harness-extensions

Harness Extensions

Channel Revision Published Runs on
latest/beta 3 05 Jul 2022
Ubuntu 20.04
juju deploy harness-extensions --channel beta
Show information

Platform:

Ubuntu
20.04

'''This is a library providing a utility for integration testing relation databag contents.

Basic usage:

>>> from charms.harness_extensions.v0.relation_data_wrapper import get_relation_data
>>> def test_relation_data():
>>>     data = get_relation_data(requirer='app1:endpoint1', provider='app2:endpoint2')
>>>     assert data.provider.application_data == {'foo': 'bar'}
>>>     assert data.provider.units_data[0] == {'baz': 'qux'}
'''

# The unique Charmhub library identifier, never change it
LIBID = "0250fc57562c48d5aa15582995dbb936"

# Increment this major API version when introducing breaking changes
LIBAPI = 0

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1

import json as jsn
import logging
import re
from dataclasses import dataclass
from subprocess import Popen, PIPE
from typing import Dict, Optional, Tuple, List

import yaml
from ops.testing import Harness

logger = logging.getLogger(__file__)

JUJU_COMMAND = "juju"
_JUJU_DATA_CACHE = {}
_JUJU_KEYS = ("egress-subnets", "ingress-address", "private-address")


def _purge(data: dict):
    for key in _JUJU_KEYS:
        if key in data:
            del data[key]


def _juju_status(app_name, model: str = None, json: bool = False):
    cmd = f'{JUJU_COMMAND} status{" " + app_name if app_name else ""} --relations'
    if model:
        cmd += f' -m {model}'
    if json:
        cmd += ' --format json'
    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)
    raw = proc.stdout.read().decode('utf-8')
    if json:
        return jsn.loads(raw)
    return raw


def _show_unit(unit_name, model: str = None):
    if model:
        proc = Popen(f"{JUJU_COMMAND} show-unit -m {model} {unit_name}".split(),
                     stdout=PIPE)
    else:
        proc = Popen(f"{JUJU_COMMAND} show-unit {unit_name}".split(),
                     stdout=PIPE)
    return proc.stdout.read().decode("utf-8").strip()


def _get_unit_info(unit_name: str, model: str = None) -> dict:
    """Returns unit-info data structure.

     for example:

    traefik-k8s/0:
      opened-ports: []
      charm: local:focal/traefik-k8s-1
      leader: true
      relation-info:
      - endpoint: ingress-per-unit
        related-endpoint: ingress
        application-data:
          _supported_versions: '- v1'
        related-units:
          prometheus-k8s/0:
            in-scope: true
            data:
              egress-subnets: 10.152.183.150/32
              ingress-address: 10.152.183.150
              private-address: 10.152.183.150
      provider-id: traefik-k8s-0
      address: 10.1.232.144
    """
    if cached_data := _JUJU_DATA_CACHE.get(unit_name):
        return cached_data

    raw_data = _show_unit(unit_name, model=model)
    if not raw_data:
        raise ValueError(
            f"no unit info could be grabbed for {unit_name}; "
            f"are you sure it's a valid unit name?"
        )

    data = yaml.safe_load(raw_data)
    if unit_name not in data:
        raise KeyError(f"{unit_name} not in {data!r}")

    unit_data = data[unit_name]
    _JUJU_DATA_CACHE[unit_name] = unit_data
    return unit_data


def _get_relation_by_endpoint(relations, local_endpoint, remote_endpoint,
                              remote_obj, peer: bool):
    matches = [
        r for r in relations if
        ((r["endpoint"] == local_endpoint and
          r["related-endpoint"] == remote_endpoint) or
         (r["endpoint"] == remote_endpoint and
          r["related-endpoint"] == local_endpoint))
    ]
    if not peer:
        matches = [r for r in matches if remote_obj in r["related-units"]]

    if not matches:
        raise ValueError(
            f"no relations found with remote endpoint={remote_endpoint!r} "
            f"and local endpoint={local_endpoint!r} "
            f"in {remote_obj!r}"
        )
    if len(matches) > 1:
        raise ValueError(
            f"multiple relations found with remote endpoint={remote_endpoint!r} "
            f"and local endpoint={local_endpoint!r} "
            f"in {remote_obj!r} (relations={matches})"
        )
    return matches[0]


@dataclass
class Metadata:
    scale: int
    units: Tuple[int, ...]
    leader_id: int
    interface: str


@dataclass
class AppRelationData:
    app_name: str
    relation_id: int
    meta: Metadata
    endpoint: str
    application_data: dict
    units_data: Dict[int, dict]


def _get_metadata_from_status(app_name, relation_name, other_app_name,
                              other_relation_name, model: str = None):
    # line example: traefik-k8s           active      3  traefik-k8s             0  10.152.183.73  no
    status = _juju_status(app_name, model=model, json=True)
    # machine status json output apparently has no 'scale'... -_-
    scale = len(status['applications'][app_name]['units'])

    leader_id: int = None
    unit_ids: List[int] = []

    for u, v in status['applications'][app_name]['units'].items():
        unit_id = int(u.split('/')[1])
        if v.get('leader', False):
            leader_id = unit_id
        unit_ids.append(unit_id)
    if leader_id is None:
        raise RuntimeError(f'could not identify leader among units {unit_ids}. '
                           f'You might need to wait for all units to be allocated.')

    # we gotta do this because json status --format json does not include the interface
    raw_text_status = _juju_status(app_name, model=model)

    re_safe_app_name = app_name.replace('-', r'\-')
    intf_re = fr"(({re_safe_app_name}:{relation_name}\s+{other_app_name}:{other_relation_name})|({other_app_name}:{other_relation_name}\s+{app_name}:{relation_name}))\s+([\w\-]+)"
    interface = re.compile(intf_re).findall(raw_text_status)[0][-1]
    return Metadata(scale, tuple(unit_ids), leader_id, interface)


def _get_app_name_and_units(url, relation_name,
                            other_app_name, other_relation_name,
                            model: str = None):
    """Get app name and unit count from url; url is either `app_name/0` or `app_name`."""
    app_name, unit_id = url.split('/') if '/' in url else (url, None)

    meta = _get_metadata_from_status(app_name, relation_name, other_app_name,
                                     other_relation_name, model=model)
    if unit_id:
        units = (int(unit_id),)
    else:
        units = meta.units
    return app_name, units, meta


def get_content(obj: str, other_obj,
                include_default_juju_keys: bool = False,
                model: str = None,
                peer: bool = False) -> AppRelationData:
    """Get the content of the databag of `obj`, as seen from `other_obj`."""
    url, endpoint = obj.split(":")
    other_url, other_endpoint = other_obj.split(":")

    other_app_name, _ = other_url.split('/') if '/' in other_url else (
        other_url, None)

    app_name, units, meta = _get_app_name_and_units(
        url, endpoint, other_app_name, other_endpoint,
        model)

    # in k8s there's always a 0 unit, in machine that's not the case.
    # so even though we need 'any' remote unit name, we still need to query the status
    # to find out what units there are.
    status = _juju_status(other_app_name, model=model, json=True)
    other_unit_name = next(
        iter(status['applications'][other_app_name]['units']))
    # we might have a different number of units and other units, and it doesn't
    # matter which 'other' we pass to get the databags for 'this one'.
    # in peer relations, show-unit luckily reports 'local-unit', so we're good.

    leader_unit_data = None
    app_data = None
    units_data = {}
    r_id = None
    for unit_id in units:
        unit_name = f"{app_name}/{unit_id}"
        unit_data, app_data, r_id_ = _get_databags(
            unit_name, other_unit_name,
            endpoint, other_endpoint,
            model=model, peer=peer)

        if r_id is not None:
            assert r_id == r_id_, f'mismatching relation IDs: {r_id, r_id_}'
        r_id = r_id_

        if not include_default_juju_keys:
            _purge(unit_data)
        units_data[unit_id] = unit_data

    return AppRelationData(
        app_name=app_name,
        meta=meta,
        endpoint=endpoint,
        application_data=app_data,
        units_data=units_data,
        relation_id=r_id)


def _get_databags(local_unit, remote_unit, local_endpoint, remote_endpoint,
                  model: str = None, peer: bool = False):
    """Gets the databags of local unit and its leadership status.

    Given a remote unit and the remote endpoint name.
    """
    local_data = _get_unit_info(local_unit, model=model)
    data = _get_unit_info(remote_unit, model=model)
    relation_info = data.get("relation-info")
    if not relation_info:
        raise RuntimeError(f"{remote_unit} has no relations")

    raw_data = _get_relation_by_endpoint(relation_info, local_endpoint,
                                         remote_endpoint, local_unit, peer=peer)
    if peer:
        unit_data = raw_data["local-unit"]["data"]
    else:
        unit_data = raw_data["related-units"][local_unit]["data"]
    app_data = raw_data["application-data"]
    return unit_data, app_data, raw_data['relation-id']


@dataclass
class RelationData:
    provider: AppRelationData
    requirer: AppRelationData


def get_peer_relation_data(
        *, endpoint: str,
        include_default_juju_keys: bool = False, model: str = None
):
    return get_content(endpoint, endpoint,
                       include_default_juju_keys, model=model,
                       peer=True)


def get_relation_data_from_juju(
        *, provider_endpoint: str, requirer_endpoint: str,
        include_default_juju_keys: bool = False, model: str = None
):
    """Get relation databags for a juju relation.

    >>> get_relation_data_from_juju('prometheus/0:ingress', 'traefik/1:ingress-per-unit')
    """
    provider_data = get_content(provider_endpoint, requirer_endpoint,
                                include_default_juju_keys, model=model)
    requirer_data = get_content(requirer_endpoint, provider_endpoint,
                                include_default_juju_keys, model=model)

    # sanity check: the two IDs should be identical
    if not provider_data.relation_id == requirer_data.relation_id:
        logger.warning(
            f"provider relation id {provider_data.relation_id} "
            f"not the same as requirer relation id: {requirer_data.relation_id}")

    return RelationData(provider=provider_data, requirer=requirer_data)


def _harness_data_to_relationdata(data: Dict[str, Dict[str, str]],
                                  provider: str,
                                  requirer: str,
                                  prov_endpoint: str,
                                  req_endpoint: str,
                                  r_id: int,
                                  include_default_juju_keys: bool = False):

    def _clean(data: Dict[str, str]):
        if include_default_juju_keys:
            return data
        cdata = data.copy()
        _purge(cdata)
        return cdata

    def _get_units_data(source):
        return {int(p_name.split('/')[1]): _clean(data[p_name]) for p_name in data if p_name.startswith(source) and '/' in p_name}

    provider_data = AppRelationData(
        app_name=provider,
        meta=None,
        endpoint=prov_endpoint,
        application_data=_clean(data[provider]),
        units_data=_get_units_data(provider),
        relation_id=r_id)

    requirer_data = AppRelationData(
        app_name=requirer,
        meta=None,
        endpoint=req_endpoint,
        application_data=_clean(data[requirer]),
        units_data=_get_units_data(requirer),
        relation_id=r_id)

    return RelationData(provider=provider_data,
                        requirer=requirer_data)


def get_relation_data_from_harness(
        harness: Harness, *, provider_endpoint: str, requirer_endpoint: str,
        include_default_juju_keys: bool = False):
    prov_url, prov_endpoint = provider_endpoint.split(":")
    req_url, req_endpoint = requirer_endpoint.split(":")

    ids_map = harness._backend._relation_ids_map
    if prov_endpoint in ids_map:
        ids = ids_map[prov_endpoint]
    elif req_endpoint in ids_map:
        ids = ids_map[req_endpoint]
    else:
        raise ValueError(f'Neither endpoint is in use: '
                         f'{prov_endpoint}, {req_endpoint}')

    for id in ids:
        data = harness._backend._relation_data[id]
        if prov_url in data and req_url in data:
            return _harness_data_to_relationdata(
                data, prov_url, req_url, prov_endpoint, req_endpoint,
                id, include_default_juju_keys
            )
    raise RuntimeError(f"could not find the relation ID "
                       f"matching a relation between {provider_endpoint} "
                       f"and {requirer_endpoint}")