Observability Libs
- Jon Seager
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/edge | 53 | 13 Dec 2024 |
juju deploy observability-libs --channel edge
Deploy Kubernetes operators easily with Juju, the Universal Operator Lifecycle Manager. Need a Kubernetes cluster? Install MicroK8s to create a full CNCF-certified Kubernetes system in under 60 seconds.
Platform:
charms.observability_libs.v0.metrics_endpoint_discovery
-
- Last updated 20 Feb 2024
- Revision Library version 0.7
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
"""# MetricsEndpointDiscovery Library.
This library provides functionality for discovering metrics endpoints exposed
by applications deployed to a Kubernetes cluster.
It comprises:
- A custom event and event source for handling metrics endpoint changes.
- Logic to observe cluster events and emit the events as appropriate.
## Using the Library
### Handling Events
To ensure that your charm can react to changing metrics endpoint events,
use the CharmEvents extension.
```python
import json
from charms.observability_libs.v0.metrics_endpoint_discovery import
MetricsEndpointCharmEvents,
MetricsEndpointObserver
)
class MyCharm(CharmBase):
on = MetricsEndpointChangeCharmEvents()
def __init__(self, *args):
super().__init__(*args)
self._observer = MetricsEndpointObserver(self, {"app.kubernetes.io/name": ["grafana-k8s"]})
self.framework.observe(self.on.metrics_endpoint_change, self._on_endpoints_change)
def _on_endpoints_change(self, event):
self.unit.status = ActiveStatus(json.dumps(event.discovered))
```
"""
import json
import logging
import os
import subprocess
import sys
from pathlib import Path
from typing import Dict, Iterable
from lightkube import Client # pyright: ignore
from lightkube.resources.core_v1 import Pod
from ops.charm import CharmBase, CharmEvents
from ops.framework import EventBase, EventSource, Object, StoredState
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "a141d5620152466781ed83aafb948d03"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7
# File path where metrics endpoint change data is written for exchange
# between the discovery process and the materialised event.
PAYLOAD_FILE_PATH = "/tmp/metrics-endpoint-payload.json"
# File path for the spawned discovery process to write logs.
LOG_FILE_PATH = "/var/log/discovery.log"
class MetricsEndpointChangeEvent(EventBase):
"""A custom event for metrics endpoint changes."""
def __init__(self, handle):
super().__init__(handle)
with open(PAYLOAD_FILE_PATH, "r") as f:
self._discovered = json.loads(f.read())
def snapshot(self):
"""Save the event payload data."""
return {"payload": self._discovered}
def restore(self, snapshot):
"""Restore the event payload data."""
self._discovered = {}
if snapshot:
self._discovered = snapshot["payload"]
@property
def discovered(self):
"""Return the payload of detected endpoint changes for this event."""
return self._discovered
class MetricsEndpointChangeCharmEvents(CharmEvents):
"""A CharmEvents extension for metrics endpoint changes.
Includes :class:`MetricsEndpointChangeEvent` in those that can be handled.
"""
metrics_endpoint_change = EventSource(MetricsEndpointChangeEvent)
class MetricsEndpointObserver(Object):
"""Observes changing metrics endpoints in the cluster.
Observed endpoint changes cause :class"`MetricsEndpointChangeEvent` to be emitted.
"""
# Yes, we need this so we can track it between events
_stored = StoredState()
def __init__(self, charm: CharmBase, labels: Dict[str, Iterable]):
"""Constructor for MetricsEndpointObserver.
Args:
charm: the charm that is instantiating the library.
labels: dictionary of label/value to be observed for changing metrics endpoints.
"""
super().__init__(charm, "metrics-endpoint-observer")
self._charm = charm
self._labels = labels
self.start_observer()
def start_observer(self):
"""Start the metrics endpoint observer running in a new process."""
self.stop_observers()
# We need to trick Juju into thinking that we are not running
# in a hook context, as Juju will disallow use of juju-run.
new_env = os.environ.copy()
if "JUJU_CONTEXT_ID" in new_env:
new_env.pop("JUJU_CONTEXT_ID")
tool_prefix = "/usr/bin"
if Path(tool_prefix, "juju-run").exists():
tool_path = Path(tool_prefix, "juju-run")
else:
tool_path = Path(tool_prefix, "juju-exec")
subprocess.Popen(
[
"/usr/bin/python3",
"lib/charms/observability_libs/v{}/metrics_endpoint_discovery.py".format(LIBAPI),
json.dumps(self._labels),
str(tool_path),
self._charm.unit.name,
self._charm.charm_dir,
],
stdout=open(LOG_FILE_PATH, "a"),
stderr=subprocess.STDOUT,
start_new_session=True,
env=new_env,
)
def stop_observers(self):
"""Stops all running instances of the observer."""
logging.info("Stopping all running metrics endpoint observer processes")
subprocess.run(
[
"pkill",
"--signal",
"INT",
"-f",
r".*lib/charms/observability_libs/v[0-9]/metrics_endpoint_discovery\.py.*",
]
)
@property
def unit_tag(self):
"""Juju-style tag identifying the unit being run by this charm."""
unit_num = self._charm.unit.name.split("/")[-1]
return "unit-{}-{}".format(self._charm.app.name, unit_num)
def write_payload(payload):
"""Write the input event data to event payload file."""
with open(PAYLOAD_FILE_PATH, "w") as f:
f.write(json.dumps(payload))
def dispatch(run_cmd, unit, charm_dir):
"""Use the input juju-run command to dispatch a :class:`MetricsEndpointChangeEvent`."""
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/metrics_endpoint_change {}/dispatch"
subprocess.run([run_cmd, "-u", unit, dispatch_sub_cmd.format(charm_dir)])
def main():
"""Main watch and dispatch loop.
Watch the input k8s service names. When changes are detected, write the
observed data to the payload file, and dispatch the change event.
"""
labels, run_cmd, unit, charm_dir = sys.argv[1:]
client = Client() # pyright: ignore
labels = json.loads(labels)
for change, entity in client.watch(Pod, namespace="*", labels=labels):
if Path(PAYLOAD_FILE_PATH).exists():
dispatch(run_cmd, unit, charm_dir)
Path(PAYLOAD_FILE_PATH).unlink()
meta = entity.metadata
metrics_path = ""
if entity.metadata.annotations.get("prometheus.io/path", ""): # pyright: ignore
metrics_path = entity.metadata.annotations.get( # pyright: ignore
"prometheus.io/path", ""
)
target_ports = []
for c in filter(lambda c: c.ports is not None, entity.spec.containers): # pyright: ignore
for p in filter(lambda p: p.name == "metrics", c.ports): # pyright: ignore
target_ports.append("*:{}".format(p.containerPort))
payload = {
"change": change,
"namespace": meta.namespace, # pyright: ignore
"name": meta.name, # pyright: ignore
"path": metrics_path,
"targets": target_ports or ["*:80"],
}
write_payload(payload)
dispatch(run_cmd, unit, charm_dir)
if __name__ == "__main__":
main()