Prometheus
- Canonical Observability
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/stable | 210 | 19 Nov 2024 | |
latest/candidate | 210 | 10 Sep 2024 | |
latest/beta | 216 | 19 Nov 2024 | |
latest/edge | 216 | 13 Nov 2024 | |
1.0/stable | 159 | 16 Feb 2024 | |
1.0/candidate | 159 | 12 Dec 2023 | |
1.0/beta | 159 | 12 Dec 2023 | |
1.0/edge | 159 | 12 Dec 2023 |
juju deploy prometheus-k8s
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
Platform:
charms.prometheus_k8s.v1.prometheus_remote_write
-
- Last updated 05 Dec 2023
- Revision Library version 1.4
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.
"""# Prometheus remote-write library.
This library facilitates the integration of the prometheus_remote_write interface.
Source code can be found on GitHub at:
https://github.com/canonical/prometheus-k8s-operator/tree/main/lib/charms/prometheus_k8s
Charms that need to push data to a charm exposing the Prometheus remote_write API,
should use the `PrometheusRemoteWriteConsumer`. Charms that operate software that exposes
the Prometheus remote_write API, that is, they can receive metrics data over remote_write,
should use the `PrometheusRemoteWriteProducer`.
"""
import json
import logging
import os
import platform
import re
import socket
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import yaml
from cosl import JujuTopology
from cosl.rules import AlertRules
from ops.charm import (
CharmBase,
HookEvent,
RelationBrokenEvent,
RelationEvent,
RelationMeta,
RelationRole,
)
from ops.framework import EventBase, EventSource, Object, ObjectEvents
from ops.model import Relation
# The unique Charmhub library identifier, never change it
LIBID = "f783823fa75f4b7880eb70f2077ec259"
# Increment this major API version when introducing breaking changes
LIBAPI = 1
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 4
PYDEPS = ["cosl"]
logger = logging.getLogger(__name__)
DEFAULT_RELATION_NAME = "receive-remote-write"
DEFAULT_CONSUMER_NAME = "send-remote-write"
RELATION_INTERFACE_NAME = "prometheus_remote_write"
DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/prometheus_alert_rules"
class RelationNotFoundError(Exception):
"""Raised if there is no relation with the given name."""
def __init__(self, relation_name: str):
self.relation_name = relation_name
self.message = "No relation named '{}' found".format(relation_name)
super().__init__(self.message)
class RelationInterfaceMismatchError(Exception):
"""Raised if the relation with the given name has a different interface."""
def __init__(
self,
relation_name: str,
expected_relation_interface: str,
actual_relation_interface: str,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_interface
self.actual_relation_interface = actual_relation_interface
self.message = (
"The '{}' relation has '{}' as its interface rather than the expected '{}'".format(
relation_name, actual_relation_interface, expected_relation_interface
)
)
super().__init__(self.message)
class RelationRoleMismatchError(Exception):
"""Raised if the relation with the given name has a different direction."""
def __init__(
self,
relation_name: str,
expected_relation_role: RelationRole,
actual_relation_role: RelationRole,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_role
self.actual_relation_role = actual_relation_role
self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format(
relation_name, repr(actual_relation_role), repr(expected_relation_role)
)
super().__init__(self.message)
class InvalidAlertRuleEvent(EventBase):
"""Event emitted when alert rule files are not parsable.
Enables us to set a clear status on the provider.
"""
def __init__(self, handle, errors: str = "", valid: bool = False):
super().__init__(handle)
self.errors = errors
self.valid = valid
def snapshot(self) -> Dict:
"""Save alert rule information."""
return {
"valid": self.valid,
"errors": self.errors,
}
def restore(self, snapshot):
"""Restore alert rule information."""
self.valid = snapshot["valid"]
self.errors = snapshot["errors"]
def _is_official_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in the upstream format as supported by Prometheus.
Alert rules in dictionary format are in "official" form if they
contain a "groups" key, since this implies they contain a list of
alert rule groups.
Args:
rules_dict: a set of alert rules in Python dictionary format
Returns:
True if alert rules are in official Prometheus file format.
"""
return "groups" in rules_dict
def _is_single_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in single rule format.
The Prometheus charm library supports reading of alert rules in a
custom format that consists of a single alert rule per file. This
does not conform to the official Prometheus alert rule file format
which requires that each alert rules file consists of a list of
alert rule groups and each group consists of a list of alert
rules.
Alert rules in dictionary form are considered to be in single rule
format if in the least it contains two keys corresponding to the
alert rule name and alert expression.
Returns:
True if alert rule is in single rule file format.
"""
# one alert rule per file
return set(rules_dict) >= {"alert", "expr"}
def _validate_relation_by_interface_and_direction(
charm: CharmBase,
relation_name: str,
expected_relation_interface: str,
expected_relation_role: RelationRole,
):
"""Verifies that a relation has the necessary characteristics.
Verifies that the `relation_name` provided: (1) exists in metadata.yaml,
(2) declares as interface the interface name passed as `relation_interface`
and (3) has the right "direction", i.e., it is a relation that `charm`
provides or requires.
Args:
charm: a `CharmBase` object to scan for the matching relation.
relation_name: the name of the relation to be verified.
expected_relation_interface: the interface name to be matched by the
relation named `relation_name`.
expected_relation_role: whether the `relation_name` must be either
provided or required by `charm`.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the same relation interface
as specified via the `expected_relation_interface` argument.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the same role as specified
via the `expected_relation_role` argument.
"""
if relation_name not in charm.meta.relations:
raise RelationNotFoundError(relation_name)
relation: RelationMeta = charm.meta.relations[relation_name]
actual_relation_interface = relation.interface_name
if actual_relation_interface != expected_relation_interface:
raise RelationInterfaceMismatchError(
relation_name, expected_relation_interface, actual_relation_interface or "None"
)
if expected_relation_role == RelationRole.provides:
if relation_name not in charm.meta.provides:
raise RelationRoleMismatchError(
relation_name, RelationRole.provides, RelationRole.requires
)
elif expected_relation_role == RelationRole.requires:
if relation_name not in charm.meta.requires:
raise RelationRoleMismatchError(
relation_name, RelationRole.requires, RelationRole.provides
)
else:
raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role))
class PrometheusRemoteWriteEndpointsChangedEvent(EventBase):
"""Event emitted when Prometheus remote_write endpoints change."""
def __init__(self, handle, relation_id):
super().__init__(handle)
self.relation_id = relation_id
def snapshot(self):
"""Save scrape Prometheus remote_write information."""
return {"relation_id": self.relation_id}
def restore(self, snapshot):
"""Restore scrape Prometheus remote_write information."""
self.relation_id = snapshot["relation_id"]
class InvalidAlertRulePathError(Exception):
"""Raised if the alert rules folder cannot be found or is otherwise invalid."""
def __init__(
self,
alert_rules_absolute_path: str,
message: str,
):
self.alert_rules_absolute_path = alert_rules_absolute_path
self.message = message
super().__init__(self.message)
def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str:
"""Resolve the provided path items against the directory of the main file.
Look up the directory of the main .py file being executed. This is normally
going to be the charm.py file of the charm including this library. Then, resolve
the provided path elements and, if the result path exists and is a directory,
return its absolute path; otherwise, return `None`.
"""
charm_dir = Path(str(charm.charm_dir))
if not charm_dir.exists() or not charm_dir.is_dir():
# Operator Framework does not currently expose a robust
# way to determine the top level charm source directory
# that is consistent across deployed charms and unit tests
# Hence for unit tests the current working directory is used
# TODO: updated this logic when the following ticket is resolved
# https://github.com/canonical/operator/issues/643
charm_dir = Path(os.getcwd())
alerts_dir_path = charm_dir.absolute().joinpath(*path_elements)
if not alerts_dir_path.exists():
raise InvalidAlertRulePathError(str(alerts_dir_path), "directory does not exist")
if not alerts_dir_path.is_dir():
raise InvalidAlertRulePathError(str(alerts_dir_path), "is not a directory")
return str(alerts_dir_path)
class PrometheusRemoteWriteConsumerEvents(ObjectEvents):
"""Event descriptor for events raised by `PrometheusRemoteWriteConsumer`."""
endpoints_changed = EventSource(PrometheusRemoteWriteEndpointsChangedEvent)
alert_rule_status_changed = EventSource(InvalidAlertRuleEvent)
class PrometheusRemoteWriteConsumer(Object):
"""API that manages a required `prometheus_remote_write` relation.
The `PrometheusRemoteWriteConsumer` is intended to be used by charms that need to push data to
other charms over the Prometheus remote_write API.
The `PrometheusRemoteWriteConsumer` object can be instantiated as follows in your charm:
```
from charms.prometheus_k8s.v1.prometheus_remote_write import PrometheusRemoteWriteConsumer
def __init__(self, *args):
...
self.remote_write_consumer = PrometheusRemoteWriteConsumer(self)
...
```
The `PrometheusRemoteWriteConsumer` assumes that, in the `metadata.yaml` of your charm,
you declare a required relation as follows:
```
requires:
send-remote-write: # Relation name
interface: prometheus_remote_write # Relation interface
```
The charmed operator is expected to use the `PrometheusRemoteWriteConsumer` as follows:
```
def __init__(self, *args):
...
self.remote_write_consumer = PrometheusRemoteWriteConsumer(self)
...
self.framework.observe(
self.remote_write_consumer.on.endpoints_changed,
self._handle_endpoints_changed,
)
```
The `endpoints_changed` event will fire in situations such as provider ip change (e.g.
relation created, provider upgrade, provider pod churn) or provider config change (e.g.
metadata settings).
Then, inside the logic of `_handle_endpoints_changed`, the updated endpoint list is
retrieved with:
```
self.remote_write_consumer.endpoints
```
which returns a dictionary structured like the Prometheus configuration object (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).
Regarding the default relation name, `send-remote-write`: if you choose to change it,
you would need to explicitly provide it to the `PrometheusRemoteWriteConsumer` via the
`relation_name` constructor argument. (The relation interface, on the other hand, is
fixed and, if you were to change it, your charm would not be able to relate with other
charms using the correct relation interface. The library prevents you from doing that by
raising an exception.)
In any case, it is strongly discouraged to change the relation name: having consistent
relation names across charms that do similar things is good practice and more
straightforward for the users of your charm. The one exception to the rule above,
is if your charm needs to both consume and provide a relation using the
`prometheus_remote_write` interface, in which case changing the relation name to
differentiate between "incoming" and "outgoing" remote write interactions is necessary.
It is also possible to specify alert rules. By default, this library will search
`<charm_parent_dir>/prometheus_alert_rules`, which in standard charm
layouts resolves to `src/prometheus_alert_rules`. Each set of alert rules, grouped
by the topology identifier, goes into a separate `*.rule` file.
If the syntax of a rule is invalid, the `MetricsEndpointProvider` logs an error and
does not load the particular rule.
To avoid false positives and false negatives the library will inject label filters
automatically in the PromQL expression. For example if the charm provides an
alert rule with an `expr` like this one:
```yaml
expr: up < 1
```
it will be modified with label filters ensuring that
the only timeseries evaluated are those scraped from this charm, and no other.
```yaml
expr: up{juju_application="traefik",
juju_charm="traefik-k8s",
juju_model="cos",
juju_model_uuid="b5ed878d-2671-42e8-873a-e8d58c0ec325"
} < 1
labels:
juju_application: traefik
juju_charm: traefik-k8s
juju_model: cos
juju_model_uuid: b5ed878d-2671-42e8-873a-e8d58c0ec325
```
"""
on = PrometheusRemoteWriteConsumerEvents() # pyright: ignore
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_CONSUMER_NAME,
alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
):
"""API to manage a required relation with the `prometheus_remote_write` interface.
Args:
charm: The charm object that instantiated this class.
relation_name: Name of the relation with the `prometheus_remote_write` interface as
defined in metadata.yaml.
alert_rules_path: Path of the directory containing the alert rules.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.requires`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires
)
try:
alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path)
except InvalidAlertRulePathError as e:
logger.debug(
"Invalid Prometheus alert rules folder at %s: %s",
e.alert_rules_absolute_path,
e.message,
)
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._alert_rules_path = alert_rules_path
self.topology = JujuTopology.from_charm(charm)
on_relation = self._charm.on[self._relation_name]
self.framework.observe(on_relation.relation_joined, self._handle_endpoints_changed)
self.framework.observe(on_relation.relation_changed, self._handle_endpoints_changed)
self.framework.observe(on_relation.relation_departed, self._handle_endpoints_changed)
self.framework.observe(on_relation.relation_broken, self._on_relation_broken)
self.framework.observe(on_relation.relation_joined, self._push_alerts_on_relation_joined)
self.framework.observe(
self._charm.on.leader_elected, self._push_alerts_to_all_relation_databags
)
self.framework.observe(
self._charm.on.upgrade_charm, self._push_alerts_to_all_relation_databags
)
def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
self.on.endpoints_changed.emit(relation_id=event.relation.id)
def _handle_endpoints_changed(self, event: RelationEvent) -> None:
if self._charm.unit.is_leader() and event.app is not None:
ev = json.loads(event.relation.data[event.app].get("event", "{}"))
if ev:
valid = bool(ev.get("valid", True))
errors = ev.get("errors", "")
if valid and not errors:
self.on.alert_rule_status_changed.emit(valid=valid)
else:
self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
self.on.endpoints_changed.emit(relation_id=event.relation.id)
def _push_alerts_on_relation_joined(self, event: RelationEvent) -> None:
self._push_alerts_to_relation_databag(event.relation)
def _push_alerts_to_all_relation_databags(self, _: Optional[HookEvent]) -> None:
for relation in self.model.relations[self._relation_name]:
self._push_alerts_to_relation_databag(relation)
def _push_alerts_to_relation_databag(self, relation: Relation) -> None:
if not self._charm.unit.is_leader():
return
alert_rules = AlertRules(query_type="promql", topology=self.topology)
alert_rules.add_path(self._alert_rules_path)
alert_rules_as_dict = alert_rules.as_dict()
if alert_rules_as_dict:
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)
def reload_alerts(self) -> None:
"""Reload alert rules from disk and push to relation data."""
self._push_alerts_to_all_relation_databags(None)
@property
def endpoints(self) -> List[Dict[str, str]]:
"""A config object ready to be dropped into a prometheus config file.
The format of the dict is specified in the official prometheus docs:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
Returns:
A list of dictionaries where each dictionary provides information about
a single remote_write endpoint.
"""
endpoints = []
for relation in self.model.relations[self._relation_name]:
for unit in relation.units:
if unit.app is self._charm.app:
# This is a peer unit
continue
remote_write = relation.data[unit].get("remote_write")
if remote_write:
deserialized_remote_write = json.loads(remote_write)
endpoints.append(
{
"url": deserialized_remote_write["url"],
}
)
return endpoints
class PrometheusRemoteWriteAlertsChangedEvent(EventBase):
"""Event emitted when Prometheus remote_write alerts change."""
def __init__(self, handle, relation_id):
super().__init__(handle)
self.relation_id = relation_id
def snapshot(self):
"""Save Prometheus remote_write information."""
return {"relation_id": self.relation_id}
def restore(self, snapshot):
"""Restore Prometheus remote_write information."""
self.relation_id = snapshot["relation_id"]
class PrometheusRemoteWriteProviderConsumersChangedEvent(EventBase):
"""Event emitted when Prometheus remote_write alerts change."""
class PrometheusRemoteWriteProviderEvents(ObjectEvents):
"""Event descriptor for events raised by `PrometheusRemoteWriteProvider`."""
alert_rules_changed = EventSource(PrometheusRemoteWriteAlertsChangedEvent)
consumers_changed = EventSource(PrometheusRemoteWriteProviderConsumersChangedEvent)
class PrometheusRemoteWriteProvider(Object):
"""API that manages a provided `prometheus_remote_write` relation.
The `PrometheusRemoteWriteProvider` is intended to be used by charms whose workloads need
to receive data from other charms' workloads over the Prometheus remote_write API.
The `PrometheusRemoteWriteProvider` object can be instantiated as follows in your charm:
```
from charms.prometheus_k8s.v1.prometheus_remote_write import PrometheusRemoteWriteProvider
def __init__(self, *args):
...
self.remote_write_provider = PrometheusRemoteWriteProvider(self)
...
```
The `PrometheusRemoteWriteProvider` assumes that, in the `metadata.yaml` of your charm,
you declare a provided relation as follows:
```
provides:
receive-remote-write: # Relation name
interface: prometheus_remote_write # Relation interface
```
About the name of the relation managed by this library: technically, you *could* change
the relation name, `receive-remote-write`, but that requires you to provide the new
relation name to the `PrometheusRemoteWriteProducer` via the `relation_name` constructor
argument. (The relation interface, on the other hand, is immutable and, if you were to change
it, your charm would not be able to relate with other charms using the right relation
interface. The library prevents you from doing that by raising an exception.) In any case, it
is strongly discouraged to change the relation name: having consistent relation names across
charms that do similar things is a very good thing for the people that will use your charm.
The one exception to the rule above, is if you charm needs to both consume and provide a
relation using the `prometheus_remote_write` interface, in which case changing the relation
name to differentiate between "incoming" and "outgoing" remote write interactions is necessary.
"""
on = PrometheusRemoteWriteProviderEvents() # pyright: ignore
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
*,
server_url_func: Callable[[], str] = lambda: f"http://{socket.getfqdn()}:9090",
endpoint_path: str = "/api/v1/write",
):
"""API to manage a provided relation with the `prometheus_remote_write` interface.
Args:
charm: The charm object that instantiated this class.
relation_name: Name of the relation with the `prometheus_remote_write` interface as
defined in metadata.yaml.
server_url_func: A callable returning the URL for your prometheus server.
endpoint_path: The path of the server's remote_write endpoint.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.requires`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
)
super().__init__(charm, relation_name)
self._charm = charm
self._tool = CosTool(self._charm)
self._relation_name = relation_name
self._get_server_url = server_url_func
self._endpoint_path = endpoint_path
on_relation = self._charm.on[self._relation_name]
self.framework.observe(
on_relation.relation_created,
self._on_consumers_changed,
)
self.framework.observe(
on_relation.relation_joined,
self._on_consumers_changed,
)
self.framework.observe(
on_relation.relation_changed,
self._on_relation_changed,
)
def _on_consumers_changed(self, event: RelationEvent) -> None:
if not isinstance(event, RelationBrokenEvent):
self.update_endpoint(event.relation)
self.on.consumers_changed.emit()
def _on_relation_changed(self, event: RelationEvent) -> None:
"""Flag Providers that data has changed, so they can re-read alerts."""
self.on.alert_rules_changed.emit(event.relation.id)
def update_endpoint(self, relation: Optional[Relation] = None) -> None:
"""Triggers programmatically the update of the relation data.
This method should be used when the charm relying on this library needs
to update the relation data in response to something occurring outside
the `prometheus_remote_write` relation lifecycle, e.g., in case of a
host address change because the charmed operator becomes connected to an
Ingress after the `prometheus_remote_write` relation is established.
Args:
relation: An optional instance of `class:ops.model.Relation` to update.
If not provided, all instances of the `prometheus_remote_write`
relation are updated.
"""
relations = [relation] if relation else self.model.relations[self._relation_name]
for relation in relations:
self._set_endpoint_on_relation(relation)
def _set_endpoint_on_relation(self, relation: Relation) -> None:
"""Set the remote_write endpoint on relations.
Args:
relation: The relation whose data to update.
"""
relation.data[self._charm.unit]["remote_write"] = json.dumps(
{
"url": self._get_server_url().rstrip("/") + "/" + self._endpoint_path.strip("/"),
}
)
@property
def alerts(self) -> dict:
"""Fetch alert rules from all relations.
A Prometheus alert rules file consists of a list of "groups". Each
group consists of a list of alerts (`rules`) that are sequentially
executed. This method returns all the alert rules provided by each
related metrics provider charm. These rules may be used to generate a
separate alert rules file for each relation since the returned list
of alert groups are indexed by relation ID. Also, for each relation ID
associated scrape metadata such as Juju model, UUID and application
name are provided so the unique name may be generated for the rules
file. For each relation the structure of data returned is a dictionary
with four keys
- groups
- model
- model_uuid
- application
The value of the `groups` key is such that it may be used to generate
a Prometheus alert rules file directly using `yaml.dump` but the
`groups` key itself must be included as this is required by Prometheus,
for example as in `yaml.safe_dump({"groups": alerts["groups"]})`.
The `PrometheusRemoteWriteProvider` accepts a list of rules and these
rules are all placed into one group.
Returns:
a dictionary mapping the name of an alert rule group to the group.
"""
alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files
for relation in self._charm.model.relations[self._relation_name]:
if not relation.units or not relation.app:
continue
alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}"))
if not alert_rules:
continue
alert_rules = self._inject_alert_expr_labels(alert_rules)
identifier, topology = self._get_identifier_by_alert_rules(alert_rules)
if not topology:
try:
scrape_metadata = json.loads(relation.data[relation.app]["scrape_metadata"])
identifier = JujuTopology.from_dict(scrape_metadata).identifier
alerts[identifier] = self._tool.apply_label_matchers(alert_rules) # type: ignore
except KeyError as e:
logger.debug(
"Relation %s has no 'scrape_metadata': %s",
relation.id,
e,
)
if not identifier:
logger.error(
"Alert rules were found but no usable group or identifier was present."
)
continue
_, errmsg = self._tool.validate_alert_rules(alert_rules)
if errmsg:
logger.error(f"Invalid alert rule file: {errmsg}")
if self._charm.unit.is_leader():
data = json.loads(relation.data[self._charm.app].get("event", "{}"))
data["errors"] = errmsg
relation.data[self._charm.app]["event"] = json.dumps(data)
continue
alerts[identifier] = alert_rules
return alerts
def _get_identifier_by_alert_rules(
self, rules: Dict[str, Any]
) -> Tuple[Union[str, None], Union[JujuTopology, None]]:
"""Determine an appropriate dict key for alert rules.
The key is used as the filename when writing alerts to disk, so the structure
and uniqueness is important.
Args:
rules: a dict of alert rules
Returns:
A tuple containing an identifier, if found, and a JujuTopology, if it could
be constructed.
"""
if "groups" not in rules:
logger.debug("No alert groups were found in relation data")
return None, None
# Construct an ID based on what's in the alert rules if they have labels
for group in rules["groups"]:
try:
labels = group["rules"][0]["labels"]
topology = JujuTopology(
# Don't try to safely get required constructor fields. There's already
# a handler for KeyErrors
model_uuid=labels["juju_model_uuid"],
model=labels["juju_model"],
application=labels["juju_application"],
unit=labels.get("juju_unit", ""),
charm_name=labels.get("juju_charm", ""),
)
return topology.identifier, topology
except KeyError:
logger.debug("Alert rules were found but no usable labels were present")
continue
logger.warning(
"No labeled alert rules were found, and no 'scrape_metadata' "
"was available. Using the alert group name as filename."
)
try:
for group in rules["groups"]:
return group["name"], None
except KeyError:
logger.debug("No group name was found to use as identifier")
return None, None
def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]:
"""Iterate through alert rules and inject topology into expressions.
Args:
rules: a dict of alert rules
"""
if "groups" not in rules:
return rules
modified_groups = []
for group in rules["groups"]:
# Copy off rules, so we don't modify an object we're iterating over
rules_copy = group["rules"]
for idx, rule in enumerate(rules_copy):
labels = rule.get("labels")
if labels:
try:
topology = JujuTopology(
# Don't try to safely get required constructor fields. There's already
# a handler for KeyErrors
model_uuid=labels["juju_model_uuid"],
model=labels["juju_model"],
application=labels["juju_application"],
unit=labels.get("juju_unit", ""),
charm_name=labels.get("juju_charm", ""),
)
# Inject topology and put it back in the list
rule["expr"] = self._tool.inject_label_matchers(
re.sub(r"%%juju_topology%%,?", "", rule["expr"]),
topology.alert_expression_dict,
)
except KeyError:
# Some required JujuTopology key is missing. Just move on.
pass
group["rules"][idx] = rule
modified_groups.append(group)
rules["groups"] = modified_groups
return rules
# Copy/pasted from prometheus_scrape.py
class CosTool:
"""Uses cos-tool to inject label matchers into alert rule expressions and validate rules."""
_path = None
_disabled = False
def __init__(self, charm):
self._charm = charm
@property
def path(self):
"""Lazy lookup of the path of cos-tool."""
if self._disabled:
return None
if not self._path:
self._path = self._get_tool_path()
if not self._path:
logger.debug("Skipping injection of juju topology as label matchers")
self._disabled = True
return self._path
def apply_label_matchers(self, rules) -> dict:
"""Will apply label matchers to the expression of all alerts in all supplied groups."""
if not self.path:
return rules
for group in rules["groups"]:
rules_in_group = group.get("rules", [])
for rule in rules_in_group:
topology = {}
# if the user for some reason has provided juju_unit, we'll need to honor it
# in most cases, however, this will be empty
for label in [
"juju_model",
"juju_model_uuid",
"juju_application",
"juju_charm",
"juju_unit",
]:
if label in rule["labels"]:
topology[label] = rule["labels"][label]
rule["expr"] = self.inject_label_matchers(rule["expr"], topology)
return rules
def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]:
"""Will validate correctness of alert rules, returning a boolean and any errors."""
if not self.path:
logger.debug("`cos-tool` unavailable. Not validating alert correctness.")
return True, ""
with tempfile.TemporaryDirectory() as tmpdir:
rule_path = Path(tmpdir + "/validate_rule.yaml")
rule_path.write_text(yaml.dump(rules))
args = [str(self.path), "validate", str(rule_path)]
# noinspection PyBroadException
try:
self._exec(args)
return True, ""
except subprocess.CalledProcessError as e:
logger.debug("Validating the rules failed: %s", e.output)
return False, ", ".join(
[
line
for line in e.output.decode("utf8").splitlines()
if "error validating" in line
]
)
def inject_label_matchers(self, expression, topology) -> str:
"""Add label matchers to an expression."""
if not topology:
return expression
if not self.path:
logger.debug("`cos-tool` unavailable. Leaving expression unchanged: %s", expression)
return expression
args = [str(self.path), "transform"]
args.extend(
["--label-matcher={}={}".format(key, value) for key, value in topology.items()]
)
args.extend(["{}".format(expression)])
# noinspection PyBroadException
try:
return self._exec(args)
except subprocess.CalledProcessError as e:
logger.debug('Applying the expression failed: "%s", falling back to the original', e)
return expression
def _get_tool_path(self) -> Optional[Path]:
arch = platform.machine()
arch = "amd64" if arch == "x86_64" else arch
res = "cos-tool-{}".format(arch)
try:
path = Path(res).resolve()
path.chmod(0o777)
return path
except NotImplementedError:
logger.debug("System lacks support for chmod")
except FileNotFoundError:
logger.debug('Could not locate cos-tool at: "{}"'.format(res))
return None
def _exec(self, cmd) -> str:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return result.stdout.decode("utf-8").strip()