Operator Libs Linux

  • Jon Seager
Channel Revision Published Runs on
latest/stable 2 09 Mar 2023
Ubuntu 22.04 Ubuntu 20.04
latest/stable 1 28 Oct 2021
Ubuntu 22.04 Ubuntu 20.04
juju deploy operator-libs-linux
Show information

Platform:

Ubuntu
22.04 20.04

charms.operator_libs_linux.v2.snap

# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Representations of the system's Snaps, and abstractions around managing them.

The `snap` module provides convenience methods for listing, installing, refreshing, and removing
Snap packages, in addition to setting and getting configuration options for them.

In the `snap` module, `SnapCache` creates a dict-like mapping of `Snap` objects at when
instantiated. Installed snaps are fully populated, and available snaps are lazily-loaded upon
request. This module relies on an installed and running `snapd` daemon to perform operations over
the `snapd` HTTP API.

`SnapCache` objects can be used to install or modify Snap packages by name in a manner similar to
using the `snap` command from the commandline.

An example of adding Juju to the system with `SnapCache` and setting a config value:

```python
try:
    cache = snap.SnapCache()
    juju = cache["juju"]

    if not juju.present:
        juju.ensure(snap.SnapState.Latest, channel="beta")
        juju.set({"some.key": "value", "some.key2": "value2"})
except snap.SnapError as e:
    logger.error("An exception occurred when installing charmcraft. Reason: %s", e.message)
```

In addition, the `snap` module provides "bare" methods which can act on Snap packages as
simple function calls. :meth:`add`, :meth:`remove`, and :meth:`ensure` are provided, as
well as :meth:`add_local` for installing directly from a local `.snap` file. These return
`Snap` objects.

As an example of installing several Snaps and checking details:

```python
try:
    nextcloud, charmcraft = snap.add(["nextcloud", "charmcraft"])
    if nextcloud.get("mode") != "production":
        nextcloud.set({"mode": "production"})
except snap.SnapError as e:
    logger.error("An exception occurred when installing snaps. Reason: %s" % e.message)
```
"""

import http.client
import json
import logging
import os
import re
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from collections.abc import Mapping
from datetime import datetime, timedelta, timezone
from enum import Enum
from subprocess import CalledProcessError, CompletedProcess
from typing import Any, Dict, Iterable, List, Optional, Union

logger = logging.getLogger(__name__)

# The unique Charmhub library identifier, never change it
LIBID = "05394e5893f94f2d90feb7cbe6b633cd"

# Increment this major API version when introducing breaking changes
LIBAPI = 2

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 9


# Regex to locate 7-bit C1 ANSI sequences
ansi_filter = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")


def _cache_init(func):
    def inner(*args, **kwargs):
        if _Cache.cache is None:
            _Cache.cache = SnapCache()
        return func(*args, **kwargs)

    return inner


# recursive hints seems to error out pytest
JSONType = Union[Dict[str, Any], List[Any], str, int, float]


class SnapService:
    """Data wrapper for snap services."""

    def __init__(
        self,
        daemon: Optional[str] = None,
        daemon_scope: Optional[str] = None,
        enabled: bool = False,
        active: bool = False,
        activators: List[str] = [],
        **kwargs,
    ):
        self.daemon = daemon
        self.daemon_scope = kwargs.get("daemon-scope", None) or daemon_scope
        self.enabled = enabled
        self.active = active
        self.activators = activators

    def as_dict(self) -> Dict:
        """Return instance representation as dict."""
        return {
            "daemon": self.daemon,
            "daemon_scope": self.daemon_scope,
            "enabled": self.enabled,
            "active": self.active,
            "activators": self.activators,
        }


class MetaCache(type):
    """MetaCache class used for initialising the snap cache."""

    @property
    def cache(cls) -> "SnapCache":
        """Property for returning the snap cache."""
        return cls._cache

    @cache.setter
    def cache(cls, cache: "SnapCache") -> None:
        """Setter for the snap cache."""
        cls._cache = cache

    def __getitem__(cls, name) -> "Snap":
        """Snap cache getter."""
        return cls._cache[name]


class _Cache(object, metaclass=MetaCache):
    _cache = None


class Error(Exception):
    """Base class of most errors raised by this library."""

    def __repr__(self):
        """Represent the Error class."""
        return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args)

    @property
    def name(self):
        """Return a string representation of the model plus class."""
        return "<{}.{}>".format(type(self).__module__, type(self).__name__)

    @property
    def message(self):
        """Return the message passed as an argument."""
        return self.args[0]


class SnapAPIError(Error):
    """Raised when an HTTP API error occurs talking to the Snapd server."""

    def __init__(self, body: Dict, code: int, status: str, message: str):
        super().__init__(message)  # Makes str(e) return message
        self.body = body
        self.code = code
        self.status = status
        self._message = message

    def __repr__(self):
        """Represent the SnapAPIError class."""
        return "APIError({!r}, {!r}, {!r}, {!r})".format(
            self.body, self.code, self.status, self._message
        )


class SnapState(Enum):
    """The state of a snap on the system or in the cache."""

    Present = "present"
    Absent = "absent"
    Latest = "latest"
    Available = "available"


class SnapError(Error):
    """Raised when there's an error running snap control commands."""


class SnapNotFoundError(Error):
    """Raised when a requested snap is not known to the system."""


class Snap(object):
    """Represents a snap package and its properties.

    `Snap` exposes the following properties about a snap:
      - name: the name of the snap
      - state: a `SnapState` representation of its install status
      - channel: "stable", "candidate", "beta", and "edge" are common
      - revision: a string representing the snap's revision
      - confinement: "classic", "strict", or "devmode"
    """

    def __init__(
        self,
        name,
        state: SnapState,
        channel: str,
        revision: str,
        confinement: str,
        apps: Optional[List[Dict[str, str]]] = None,
        cohort: Optional[str] = "",
    ) -> None:
        self._name = name
        self._state = state
        self._channel = channel
        self._revision = revision
        self._confinement = confinement
        self._cohort = cohort
        self._apps = apps or []
        self._snap_client = SnapClient()

    def __eq__(self, other) -> bool:
        """Equality for comparison."""
        return isinstance(other, self.__class__) and (
            self._name,
            self._revision,
        ) == (other._name, other._revision)

    def __hash__(self):
        """Calculate a hash for this snap."""
        return hash((self._name, self._revision))

    def __repr__(self):
        """Represent the object such that it can be reconstructed."""
        return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)

    def __str__(self):
        """Represent the snap object as a string."""
        return "<{}: {}-{}.{} -- {}>".format(
            self.__class__.__name__,
            self._name,
            self._revision,
            self._channel,
            str(self._state),
        )

    def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str:
        """Perform a snap operation.

        Args:
          command: the snap command to execute
          optargs: an (optional) list of additional arguments to pass,
            commonly confinement or channel

        Raises:
          SnapError if there is a problem encountered
        """
        optargs = optargs or []
        args = ["snap", command, self._name, *optargs]
        try:
            return subprocess.check_output(args, universal_newlines=True)
        except CalledProcessError as e:
            raise SnapError(
                "Snap: {!r}; command {!r} failed with output = {!r}".format(
                    self._name, args, e.output
                )
            )

    def _snap_daemons(
        self,
        command: List[str],
        services: Optional[List[str]] = None,
    ) -> CompletedProcess:
        """Perform snap app commands.

        Args:
          command: the snap command to execute
          services: the snap service to execute command on

        Raises:
          SnapError if there is a problem encountered
        """
        if services:
            # an attempt to keep the command constrained to the snap instance's services
            services = ["{}.{}".format(self._name, service) for service in services]
        else:
            services = [self._name]

        args = ["snap", *command, *services]

        try:
            return subprocess.run(args, universal_newlines=True, check=True, capture_output=True)
        except CalledProcessError as e:
            raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr))

    def get(self, key: Optional[str], *, typed: bool = False) -> Any:
        """Fetch snap configuration values.

        Args:
            key: the key to retrieve. Default to retrieve all values for typed=True.
            typed: set to True to retrieve typed values (set with typed=True).
                Default is to return a string.
        """
        if typed:
            args = ["-d"]
            if key:
                args.append(key)
            config = json.loads(self._snap("get", args))
            if key:
                return config.get(key)
            return config

        if not key:
            raise TypeError("Key must be provided when typed=False")

        return self._snap("get", [key]).strip()

    def set(self, config: Dict[str, Any], *, typed: bool = False) -> None:
        """Set a snap configuration value.

        Args:
           config: a dictionary containing keys and values specifying the config to set.
           typed: set to True to convert all values in the config into typed values while
                configuring the snap (set with typed=True). Default is not to convert.
        """
        if not typed:
            config = {k: str(v) for k, v in config.items()}
        self._snap_client._put_snap_conf(self._name, config)

    def unset(self, key) -> str:
        """Unset a snap configuration value.

        Args:
            key: the key to unset
        """
        return self._snap("unset", [key])

    def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = False) -> None:
        """Start a snap's services.

        Args:
            services (list): (optional) list of individual snap services to start (otherwise all)
            enable (bool): (optional) flag to enable snap services on start. Default `false`
        """
        args = ["start", "--enable"] if enable else ["start"]
        self._snap_daemons(args, services)

    def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = False) -> None:
        """Stop a snap's services.

        Args:
            services (list): (optional) list of individual snap services to stop (otherwise all)
            disable (bool): (optional) flag to disable snap services on stop. Default `False`
        """
        args = ["stop", "--disable"] if disable else ["stop"]
        self._snap_daemons(args, services)

    def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = 10) -> str:
        """Fetch a snap services' logs.

        Args:
            services (list): (optional) list of individual snap services to show logs from
                (otherwise all)
            num_lines (int): (optional) integer number of log lines to return. Default `10`
        """
        args = ["logs", "-n={}".format(num_lines)] if num_lines else ["logs"]
        return self._snap_daemons(args, services).stdout

    def connect(
        self, plug: str, service: Optional[str] = None, slot: Optional[str] = None
    ) -> None:
        """Connect a plug to a slot.

        Args:
            plug (str): the plug to connect
            service (str): (optional) the snap service name to plug into
            slot (str): (optional) the snap service slot to plug in to

        Raises:
            SnapError if there is a problem encountered
        """
        command = ["connect", "{}:{}".format(self._name, plug)]

        if service and slot:
            command = command + ["{}:{}".format(service, slot)]
        elif slot:
            command = command + [slot]

        args = ["snap", *command]
        try:
            subprocess.run(args, universal_newlines=True, check=True, capture_output=True)
        except CalledProcessError as e:
            raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr))

    def hold(self, duration: Optional[timedelta] = None) -> None:
        """Add a refresh hold to a snap.

        Args:
            duration: duration for the hold, or None (the default) to hold this snap indefinitely.
        """
        hold_str = "forever"
        if duration is not None:
            seconds = round(duration.total_seconds())
            hold_str = f"{seconds}s"
        self._snap("refresh", [f"--hold={hold_str}"])

    def unhold(self) -> None:
        """Remove the refresh hold of a snap."""
        self._snap("refresh", ["--unhold"])

    def alias(self, application: str, alias: Optional[str] = None) -> None:
        """Create an alias for a given application.

        Args:
            application: application to get an alias.
            alias: (optional) name of the alias; if not provided, the application name is used.
        """
        if alias is None:
            alias = application
        args = ["snap", "alias", f"{self.name}.{application}", alias]
        try:
            subprocess.check_output(args, universal_newlines=True)
        except CalledProcessError as e:
            raise SnapError(
                "Snap: {!r}; command {!r} failed with output = {!r}".format(
                    self._name, args, e.output
                )
            )

    def restart(
        self, services: Optional[List[str]] = None, reload: Optional[bool] = False
    ) -> None:
        """Restarts a snap's services.

        Args:
            services (list): (optional) list of individual snap services to restart.
                (otherwise all)
            reload (bool): (optional) flag to use the service reload command, if available.
                Default `False`
        """
        args = ["restart", "--reload"] if reload else ["restart"]
        self._snap_daemons(args, services)

    def _install(
        self,
        channel: Optional[str] = "",
        cohort: Optional[str] = "",
        revision: Optional[str] = None,
    ) -> None:
        """Add a snap to the system.

        Args:
          channel: the channel to install from
          cohort: optional, the key of a cohort that this snap belongs to
          revision: optional, the revision of the snap to install
        """
        cohort = cohort or self._cohort

        args = []
        if self.confinement == "classic":
            args.append("--classic")
        if self.confinement == "devmode":
            args.append("--devmode")
        if channel:
            args.append('--channel="{}"'.format(channel))
        if revision:
            args.append('--revision="{}"'.format(revision))
        if cohort:
            args.append('--cohort="{}"'.format(cohort))

        self._snap("install", args)

    def _refresh(
        self,
        channel: Optional[str] = "",
        cohort: Optional[str] = "",
        revision: Optional[str] = None,
        devmode: bool = False,
        leave_cohort: Optional[bool] = False,
    ) -> None:
        """Refresh a snap.

        Args:
          channel: the channel to install from
          cohort: optionally, specify a cohort.
          revision: optionally, specify the revision of the snap to refresh
          devmode: optionally, specify devmode confinement
          leave_cohort: leave the current cohort.
        """
        args = []
        if channel:
            args.append('--channel="{}"'.format(channel))

        if revision:
            args.append('--revision="{}"'.format(revision))

        if devmode:
            args.append("--devmode")

        if not cohort:
            cohort = self._cohort

        if leave_cohort:
            self._cohort = ""
            args.append("--leave-cohort")
        elif cohort:
            args.append('--cohort="{}"'.format(cohort))

        self._snap("refresh", args)

    def _remove(self) -> str:
        """Remove a snap from the system."""
        return self._snap("remove")

    @property
    def name(self) -> str:
        """Returns the name of the snap."""
        return self._name

    def ensure(
        self,
        state: SnapState,
        classic: Optional[bool] = False,
        devmode: bool = False,
        channel: Optional[str] = "",
        cohort: Optional[str] = "",
        revision: Optional[str] = None,
    ):
        """Ensure that a snap is in a given state.

        Args:
          state: a `SnapState` to reconcile to.
          classic: an (Optional) boolean indicating whether classic confinement should be used
          devmode: an (Optional) boolean indicating whether devmode confinement should be used
          channel: the channel to install from
          cohort: optional. Specify the key of a snap cohort.
          revision: optional. the revision of the snap to install/refresh

        While both channel and revision could be specified, the underlying snap install/refresh
        command will determine which one takes precedence (revision at this time)

        Raises:
          SnapError if an error is encountered
        """
        if classic and devmode:
            raise ValueError("Cannot set both classic and devmode confinement")

        if classic or self._confinement == "classic":
            self._confinement = "classic"
        elif devmode or self._confinement == "devmode":
            self._confinement = "devmode"
        else:
            self._confinement = ""

        if state not in (SnapState.Present, SnapState.Latest):
            # We are attempting to remove this snap.
            if self._state in (SnapState.Present, SnapState.Latest):
                # The snap is installed, so we run _remove.
                self._remove()
            else:
                # The snap is not installed -- no need to do anything.
                pass
        else:
            # We are installing or refreshing a snap.
            if self._state not in (SnapState.Present, SnapState.Latest):
                # The snap is not installed, so we install it.
                logger.info(
                    "Installing snap %s, revision %s, tracking %s", self._name, revision, channel
                )
                self._install(channel, cohort, revision)
                logger.info("The snap installation completed successfully")
            elif revision is None or revision != self._revision:
                # The snap is installed, but we are changing it (e.g., switching channels).
                logger.info(
                    "Refreshing snap %s, revision %s, tracking %s", self._name, revision, channel
                )
                self._refresh(channel=channel, cohort=cohort, revision=revision, devmode=devmode)
                logger.info("The snap refresh completed successfully")
            else:
                logger.info("Refresh of snap %s was unnecessary", self._name)

        self._update_snap_apps()
        self._state = state

    def _update_snap_apps(self) -> None:
        """Update a snap's apps after snap changes state."""
        try:
            self._apps = self._snap_client.get_installed_snap_apps(self._name)
        except SnapAPIError:
            logger.debug("Unable to retrieve snap apps for {}".format(self._name))
            self._apps = []

    @property
    def present(self) -> bool:
        """Report whether or not a snap is present."""
        return self._state in (SnapState.Present, SnapState.Latest)

    @property
    def latest(self) -> bool:
        """Report whether the snap is the most recent version."""
        return self._state is SnapState.Latest

    @property
    def state(self) -> SnapState:
        """Report the current snap state."""
        return self._state

    @state.setter
    def state(self, state: SnapState) -> None:
        """Set the snap state to a given value.

        Args:
          state: a `SnapState` to reconcile the snap to.

        Raises:
          SnapError if an error is encountered
        """
        if self._state is not state:
            self.ensure(state)
        self._state = state

    @property
    def revision(self) -> str:
        """Returns the revision for a snap."""
        return self._revision

    @property
    def channel(self) -> str:
        """Returns the channel for a snap."""
        return self._channel

    @property
    def confinement(self) -> str:
        """Returns the confinement for a snap."""
        return self._confinement

    @property
    def apps(self) -> List:
        """Returns (if any) the installed apps of the snap."""
        self._update_snap_apps()
        return self._apps

    @property
    def services(self) -> Dict:
        """Returns (if any) the installed services of the snap."""
        self._update_snap_apps()
        services = {}
        for app in self._apps:
            if "daemon" in app:
                services[app["name"]] = SnapService(**app).as_dict()

        return services

    @property
    def held(self) -> bool:
        """Report whether the snap has a hold."""
        info = self._snap("info")
        return "hold:" in info


class _UnixSocketConnection(http.client.HTTPConnection):
    """Implementation of HTTPConnection that connects to a named Unix socket."""

    def __init__(self, host, timeout=None, socket_path=None):
        if timeout is None:
            super().__init__(host)
        else:
            super().__init__(host, timeout=timeout)
        self.socket_path = socket_path

    def connect(self):
        """Override connect to use Unix socket (instead of TCP socket)."""
        if not hasattr(socket, "AF_UNIX"):
            raise NotImplementedError("Unix sockets not supported on {}".format(sys.platform))
        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.sock.connect(self.socket_path)
        if self.timeout is not None:
            self.sock.settimeout(self.timeout)


class _UnixSocketHandler(urllib.request.AbstractHTTPHandler):
    """Implementation of HTTPHandler that uses a named Unix socket."""

    def __init__(self, socket_path: str):
        super().__init__()
        self.socket_path = socket_path

    def http_open(self, req) -> http.client.HTTPResponse:
        """Override http_open to use a Unix socket connection (instead of TCP)."""
        return self.do_open(_UnixSocketConnection, req, socket_path=self.socket_path)


class SnapClient:
    """Snapd API client to talk to HTTP over UNIX sockets.

    In order to avoid shelling out and/or involving sudo in calling the snapd API,
    use a wrapper based on the Pebble Client, trimmed down to only the utility methods
    needed for talking to snapd.
    """

    def __init__(
        self,
        socket_path: str = "/run/snapd.socket",
        opener: Optional[urllib.request.OpenerDirector] = None,
        base_url: str = "http://localhost/v2/",
        timeout: float = 30.0,
    ):
        """Initialize a client instance.

        Args:
            socket_path: a path to the socket on the filesystem. Defaults to /run/snap/snapd.socket
            opener: specifies an opener for unix socket, if unspecified a default is used
            base_url: base url for making requests to the snap client. Defaults to
                http://localhost/v2/
            timeout: timeout in seconds to use when making requests to the API. Default is 30.0s.
        """
        if opener is None:
            opener = self._get_default_opener(socket_path)
        self.opener = opener
        self.base_url = base_url
        self.timeout = timeout

    @classmethod
    def _get_default_opener(cls, socket_path):
        """Build the default opener to use for requests (HTTP over Unix socket)."""
        opener = urllib.request.OpenerDirector()
        opener.add_handler(_UnixSocketHandler(socket_path))
        opener.add_handler(urllib.request.HTTPDefaultErrorHandler())
        opener.add_handler(urllib.request.HTTPRedirectHandler())
        opener.add_handler(urllib.request.HTTPErrorProcessor())
        return opener

    def _request(
        self,
        method: str,
        path: str,
        query: Dict = None,
        body: Dict = None,
    ) -> JSONType:
        """Make a JSON request to the Snapd server with the given HTTP method and path.

        If query dict is provided, it is encoded and appended as a query string
        to the URL. If body dict is provided, it is serialied as JSON and used
        as the HTTP body (with Content-Type: "application/json"). The resulting
        body is decoded from JSON.
        """
        headers = {"Accept": "application/json"}
        data = None
        if body is not None:
            data = json.dumps(body).encode("utf-8")
            headers["Content-Type"] = "application/json"

        response = self._request_raw(method, path, query, headers, data)
        response = json.loads(response.read().decode())
        if response["type"] == "async":
            return self._wait(response["change"])
        return response["result"]

    def _wait(self, change_id: str, timeout=300) -> JSONType:
        """Wait for an async change to complete.

        The poll time is 100 milliseconds, the same as in snap clients.
        """
        deadline = time.time() + timeout
        while True:
            if time.time() > deadline:
                raise TimeoutError(f"timeout waiting for snap change {change_id}")
            response = self._request("GET", f"changes/{change_id}")
            status = response["status"]
            if status == "Done":
                return response.get("data")
            if status == "Doing" or status == "Do":
                time.sleep(0.1)
                continue
            if status == "Wait":
                logger.warning("snap change %s succeeded with status 'Wait'", change_id)
                return response.get("data")
            raise SnapError(
                f"snap change {response.get('kind')!r} id {change_id} failed with status {status}"
            )

    def _request_raw(
        self,
        method: str,
        path: str,
        query: Dict = None,
        headers: Dict = None,
        data: bytes = None,
    ) -> http.client.HTTPResponse:
        """Make a request to the Snapd server; return the raw HTTPResponse object."""
        url = self.base_url + path
        if query:
            url = url + "?" + urllib.parse.urlencode(query)

        if headers is None:
            headers = {}
        request = urllib.request.Request(url, method=method, data=data, headers=headers)

        try:
            response = self.opener.open(request, timeout=self.timeout)
        except urllib.error.HTTPError as e:
            code = e.code
            status = e.reason
            message = ""
            try:
                body = json.loads(e.read().decode())["result"]
            except (IOError, ValueError, KeyError) as e2:
                # Will only happen on read error or if Pebble sends invalid JSON.
                body = {}
                message = "{} - {}".format(type(e2).__name__, e2)
            raise SnapAPIError(body, code, status, message)
        except urllib.error.URLError as e:
            raise SnapAPIError({}, 500, "Not found", e.reason)
        return response

    def get_installed_snaps(self) -> Dict:
        """Get information about currently installed snaps."""
        return self._request("GET", "snaps")

    def get_snap_information(self, name: str) -> Dict:
        """Query the snap server for information about single snap."""
        return self._request("GET", "find", {"name": name})[0]

    def get_installed_snap_apps(self, name: str) -> List:
        """Query the snap server for apps belonging to a named, currently installed snap."""
        return self._request("GET", "apps", {"names": name, "select": "service"})

    def _put_snap_conf(self, name: str, conf: Dict[str, Any]):
        """Set the configuration details for an installed snap."""
        return self._request("PUT", f"snaps/{name}/conf", body=conf)


class SnapCache(Mapping):
    """An abstraction to represent installed/available packages.

    When instantiated, `SnapCache` iterates through the list of installed
    snaps using the `snapd` HTTP API, and a list of available snaps by reading
    the filesystem to populate the cache. Information about available snaps is lazily-loaded
    from the `snapd` API when requested.
    """

    def __init__(self):
        if not self.snapd_installed:
            raise SnapError("snapd is not installed or not in /usr/bin") from None
        self._snap_client = SnapClient()
        self._snap_map = {}
        if self.snapd_installed:
            self._load_available_snaps()
            self._load_installed_snaps()

    def __contains__(self, key: str) -> bool:
        """Check if a given snap is in the cache."""
        return key in self._snap_map

    def __len__(self) -> int:
        """Report number of items in the snap cache."""
        return len(self._snap_map)

    def __iter__(self) -> Iterable["Snap"]:
        """Provide iterator for the snap cache."""
        return iter(self._snap_map.values())

    def __getitem__(self, snap_name: str) -> Snap:
        """Return either the installed version or latest version for a given snap."""
        snap = self._snap_map.get(snap_name, None)
        if snap is None:
            # The snapd cache file may not have existed when _snap_map was
            # populated.  This is normal.
            try:
                self._snap_map[snap_name] = self._load_info(snap_name)
            except SnapAPIError:
                raise SnapNotFoundError("Snap '{}' not found!".format(snap_name))

        return self._snap_map[snap_name]

    @property
    def snapd_installed(self) -> bool:
        """Check whether snapd has been installled on the system."""
        return os.path.isfile("/usr/bin/snap")

    def _load_available_snaps(self) -> None:
        """Load the list of available snaps from disk.

        Leave them empty and lazily load later if asked for.
        """
        if not os.path.isfile("/var/cache/snapd/names"):
            # The snap catalog may not be populated yet; this is normal.
            # snapd updates the cache infrequently and the cache file may not
            # currently exist.
            return

        with open("/var/cache/snapd/names", "r") as f:
            for line in f:
                if line.strip():
                    self._snap_map[line.strip()] = None

    def _load_installed_snaps(self) -> None:
        """Load the installed snaps into the dict."""
        installed = self._snap_client.get_installed_snaps()

        for i in installed:
            snap = Snap(
                name=i["name"],
                state=SnapState.Latest,
                channel=i["channel"],
                revision=i["revision"],
                confinement=i["confinement"],
                apps=i.get("apps", None),
            )
            self._snap_map[snap.name] = snap

    def _load_info(self, name) -> Snap:
        """Load info for snaps which are not installed if requested.

        Args:
            name: a string representing the name of the snap
        """
        info = self._snap_client.get_snap_information(name)

        return Snap(
            name=info["name"],
            state=SnapState.Available,
            channel=info["channel"],
            revision=info["revision"],
            confinement=info["confinement"],
            apps=None,
        )


@_cache_init
def add(
    snap_names: Union[str, List[str]],
    state: Union[str, SnapState] = SnapState.Latest,
    channel: Optional[str] = "",
    classic: Optional[bool] = False,
    devmode: bool = False,
    cohort: Optional[str] = "",
    revision: Optional[str] = None,
) -> Union[Snap, List[Snap]]:
    """Add a snap to the system.

    Args:
        snap_names: the name or names of the snaps to install
        state: a string or `SnapState` representation of the desired state, one of
            [`Present` or `Latest`]
        channel: an (Optional) channel as a string. Defaults to 'latest'
        classic: an (Optional) boolean specifying whether it should be added with classic
            confinement. Default `False`
        devmode: an (Optional) boolean specifying whether it should be added with devmode
            confinement. Default `False`
        cohort: an (Optional) string specifying the snap cohort to use
        revision: an (Optional) string specifying the snap revision to use

    Raises:
        SnapError if some snaps failed to install or were not found.
    """
    if not channel and not revision:
        channel = "latest"

    snap_names = [snap_names] if isinstance(snap_names, str) else snap_names
    if not snap_names:
        raise TypeError("Expected at least one snap to add, received zero!")

    if isinstance(state, str):
        state = SnapState(state)

    return _wrap_snap_operations(snap_names, state, channel, classic, devmode, cohort, revision)


@_cache_init
def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]:
    """Remove specified snap(s) from the system.

    Args:
        snap_names: the name or names of the snaps to install

    Raises:
        SnapError if some snaps failed to install.
    """
    snap_names = [snap_names] if isinstance(snap_names, str) else snap_names
    if not snap_names:
        raise TypeError("Expected at least one snap to add, received zero!")
    return _wrap_snap_operations(
        snap_names=snap_names,
        state=SnapState.Absent,
        channel="",
        classic=False,
        devmode=False,
    )


@_cache_init
def ensure(
    snap_names: Union[str, List[str]],
    state: str,
    channel: Optional[str] = "",
    classic: Optional[bool] = False,
    devmode: bool = False,
    cohort: Optional[str] = "",
    revision: Optional[int] = None,
) -> Union[Snap, List[Snap]]:
    """Ensure specified snaps are in a given state on the system.

    Args:
        snap_names: the name(s) of the snaps to operate on
        state: a string representation of the desired state, from `SnapState`
        channel: an (Optional) channel as a string. Defaults to 'latest'
        classic: an (Optional) boolean specifying whether it should be added with classic
            confinement. Default `False`
        devmode: an (Optional) boolean specifying whether it should be added with devmode
            confinement. Default `False`
        cohort: an (Optional) string specifying the snap cohort to use
        revision: an (Optional) integer specifying the snap revision to use

    When both channel and revision are specified, the underlying snap install/refresh
    command will determine the precedence (revision at the time of adding this)

    Raises:
        SnapError if the snap is not in the cache.
    """
    if not revision and not channel:
        channel = "latest"

    if state in ("present", "latest") or revision:
        return add(
            snap_names=snap_names,
            state=SnapState(state),
            channel=channel,
            classic=classic,
            devmode=devmode,
            cohort=cohort,
            revision=revision,
        )
    else:
        return remove(snap_names)


def _wrap_snap_operations(
    snap_names: List[str],
    state: SnapState,
    channel: str,
    classic: bool,
    devmode: bool,
    cohort: Optional[str] = "",
    revision: Optional[str] = None,
) -> Union[Snap, List[Snap]]:
    """Wrap common operations for bare commands."""
    snaps = {"success": [], "failed": []}

    op = "remove" if state is SnapState.Absent else "install or refresh"

    for s in snap_names:
        try:
            snap = _Cache[s]
            if state is SnapState.Absent:
                snap.ensure(state=SnapState.Absent)
            else:
                snap.ensure(
                    state=state,
                    classic=classic,
                    devmode=devmode,
                    channel=channel,
                    cohort=cohort,
                    revision=revision,
                )
            snaps["success"].append(snap)
        except SnapError as e:
            logger.warning("Failed to {} snap {}: {}!".format(op, s, e.message))
            snaps["failed"].append(s)
        except SnapNotFoundError:
            logger.warning("Snap '{}' not found in cache!".format(s))
            snaps["failed"].append(s)

    if len(snaps["failed"]):
        raise SnapError(
            "Failed to install or refresh snap(s): {}".format(", ".join(list(snaps["failed"])))
        )

    return snaps["success"] if len(snaps["success"]) > 1 else snaps["success"][0]


def install_local(
    filename: str,
    classic: Optional[bool] = False,
    devmode: Optional[bool] = False,
    dangerous: Optional[bool] = False,
) -> Snap:
    """Perform a snap operation.

    Args:
        filename: the path to a local .snap file to install
        classic: whether to use classic confinement
        devmode: whether to use devmode confinement
        dangerous: whether --dangerous should be passed to install snaps without a signature

    Raises:
        SnapError if there is a problem encountered
    """
    args = [
        "snap",
        "install",
        filename,
    ]
    if classic:
        args.append("--classic")
    if devmode:
        args.append("--devmode")
    if dangerous:
        args.append("--dangerous")
    try:
        result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1]
        snap_name, _ = result.split(" ", 1)
        snap_name = ansi_filter.sub("", snap_name)

        c = SnapCache()

        try:
            return c[snap_name]
        except SnapAPIError as e:
            logger.error(
                "Could not find snap {} when querying Snapd socket: {}".format(snap_name, e.body)
            )
            raise SnapError("Failed to find snap {} in Snap cache".format(snap_name))
    except CalledProcessError as e:
        raise SnapError("Could not install snap {}: {}".format(filename, e.output))


def _system_set(config_item: str, value: str) -> None:
    """Set system snapd config values.

    Args:
        config_item: name of snap system setting. E.g. 'refresh.hold'
        value: value to assign
    """
    args = ["snap", "set", "system", "{}={}".format(config_item, value)]
    try:
        subprocess.check_call(args, universal_newlines=True)
    except CalledProcessError:
        raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value))


def hold_refresh(days: int = 90, forever: bool = False) -> bool:
    """Set the system-wide snap refresh hold.

    Args:
        days: number of days to hold system refreshes for. Maximum 90. Set to zero to remove hold.
        forever: if True, will set a hold forever.
    """
    if not isinstance(forever, bool):
        raise TypeError("forever must be a bool")
    if not isinstance(days, int):
        raise TypeError("days must be an int")
    if forever:
        _system_set("refresh.hold", "forever")
        logger.info("Set system-wide snap refresh hold to: forever")
    elif days == 0:
        _system_set("refresh.hold", "")
        logger.info("Removed system-wide snap refresh hold")
    else:
        # Currently the snap daemon can only hold for a maximum of 90 days
        if not 1 <= days <= 90:
            raise ValueError("days must be between 1 and 90")
        # Add the number of days to current time
        target_date = datetime.now(timezone.utc).astimezone() + timedelta(days=days)
        # Format for the correct datetime format
        hold_date = target_date.strftime("%Y-%m-%dT%H:%M:%S%z")
        # Python dumps the offset in format '+0100', we need '+01:00'
        hold_date = "{0}:{1}".format(hold_date[:-2], hold_date[-2:])
        # Actually set the hold date
        _system_set("refresh.hold", hold_date)
        logger.info("Set system-wide snap refresh hold to: %s", hold_date)