Operator Libs Linux

  • Jon Seager
Channel Revision Published Runs on
latest/stable 2 09 Mar 2023
Ubuntu 22.04 Ubuntu 20.04
latest/stable 1 28 Oct 2021
Ubuntu 22.04 Ubuntu 20.04
juju deploy operator-libs-linux
Show information

Platform:

Ubuntu
22.04 20.04

charms.operator_libs_linux.v0.apt

# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Abstractions for the system's Debian/Ubuntu package information and repositories.

This module contains abstractions and wrappers around Debian/Ubuntu-style repositories and
packages, in order to easily provide an idiomatic and Pythonic mechanism for adding packages and/or
repositories to systems for use in machine charms.

A sane default configuration is attainable through nothing more than instantiation of the
appropriate classes. `DebianPackage` objects provide information about the architecture, version,
name, and status of a package.

`DebianPackage` will try to look up a package either from `dpkg -L` or from `apt-cache` when
provided with a string indicating the package name. If it cannot be located, `PackageNotFoundError`
will be returned, as `apt` and `dpkg` otherwise return `100` for all errors, and a meaningful error
message if the package is not known is desirable.

To install packages with convenience methods:

```python
try:
    # Run `apt-get update`
    apt.update()
    apt.add_package("zsh")
    apt.add_package(["vim", "htop", "wget"])
except PackageNotFoundError:
    logger.error("a specified package not found in package cache or on system")
except PackageError as e:
    logger.error("could not install package. Reason: %s", e.message)
````

To find details of a specific package:

```python
try:
    vim = apt.DebianPackage.from_system("vim")

    # To find from the apt cache only
    # apt.DebianPackage.from_apt_cache("vim")

    # To find from installed packages only
    # apt.DebianPackage.from_installed_package("vim")

    vim.ensure(PackageState.Latest)
    logger.info("updated vim to version: %s", vim.fullversion)
except PackageNotFoundError:
    logger.error("a specified package not found in package cache or on system")
except PackageError as e:
    logger.error("could not install package. Reason: %s", e.message)
```


`RepositoryMapping` will return a dict-like object containing enabled system repositories
and their properties (available groups, baseuri. gpg key). This class can add, disable, or
manipulate repositories. Items can be retrieved as `DebianRepository` objects.

In order add a new repository with explicit details for fields, a new `DebianRepository` can
be added to `RepositoryMapping`

`RepositoryMapping` provides an abstraction around the existing repositories on the system,
and can be accessed and iterated over like any `Mapping` object, to retrieve values by key,
iterate, or perform other operations.

Keys are constructed as `{repo_type}-{}-{release}` in order to uniquely identify a repository.

Repositories can be added with explicit values through a Python constructor.

Example:
```python
repositories = apt.RepositoryMapping()

if "deb-example.com-focal" not in repositories:
    repositories.add(DebianRepository(enabled=True, repotype="deb",
                     uri="https://example.com", release="focal", groups=["universe"]))
```

Alternatively, any valid `sources.list` line may be used to construct a new
`DebianRepository`.

Example:
```python
repositories = apt.RepositoryMapping()

if "deb-us.archive.ubuntu.com-xenial" not in repositories:
    line = "deb http://us.archive.ubuntu.com/ubuntu xenial main restricted"
    repo = DebianRepository.from_repo_line(line)
    repositories.add(repo)
```
"""

import fileinput
import glob
import logging
import os
import re
import subprocess
from enum import Enum
from subprocess import PIPE, CalledProcessError, check_output
from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Tuple, Union
from urllib.parse import urlparse

logger = logging.getLogger(__name__)

# The unique Charmhub library identifier, never change it
LIBID = "7c3dbc9c2ad44a47bd6fcb25caa270e5"

# Increment this major API version when introducing breaking changes
LIBAPI = 0

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 15


VALID_SOURCE_TYPES = ("deb", "deb-src")
OPTIONS_MATCHER = re.compile(r"\[.*?\]")
_GPG_KEY_DIR = "/etc/apt/trusted.gpg.d/"


class Error(Exception):
    """Base class of most errors raised by this library."""

    def __repr__(self):
        """Represent the Error."""
        return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args)

    @property
    def name(self):
        """Return a string representation of the model plus class."""
        return "<{}.{}>".format(type(self).__module__, type(self).__name__)

    @property
    def message(self):
        """Return the message passed as an argument."""
        return self.args[0]


class PackageError(Error):
    """Raised when there's an error installing or removing a package."""


class PackageNotFoundError(Error):
    """Raised when a requested package is not known to the system."""


class PackageState(Enum):
    """A class to represent possible package states."""

    Present = "present"
    Absent = "absent"
    Latest = "latest"
    Available = "available"


class DebianPackage:
    """Represents a traditional Debian package and its utility functions.

    `DebianPackage` wraps information and functionality around a known package, whether installed
    or available. The version, epoch, name, and architecture can be easily queried and compared
    against other `DebianPackage` objects to determine the latest version or to install a specific
    version.

    The representation of this object as a string mimics the output from `dpkg` for familiarity.

    Installation and removal of packages is handled through the `state` property or `ensure`
    method, with the following options:

        apt.PackageState.Absent
        apt.PackageState.Available
        apt.PackageState.Present
        apt.PackageState.Latest

    When `DebianPackage` is initialized, the state of a given `DebianPackage` object will be set to
    `Available`, `Present`, or `Latest`, with `Absent` implemented as a convenience for removal
    (though it operates essentially the same as `Available`).
    """

    def __init__(
        self, name: str, version: str, epoch: str, arch: str, state: PackageState
    ) -> None:
        self._name = name
        self._arch = arch
        self._state = state
        self._version = Version(version, epoch)

    def __eq__(self, other) -> bool:
        """Equality for comparison.

        Args:
          other: a `DebianPackage` object for comparison

        Returns:
          A boolean reflecting equality
        """
        return isinstance(other, self.__class__) and (
            self._name,
            self._version.number,
        ) == (other._name, other._version.number)

    def __hash__(self):
        """Return a hash of this package."""
        return hash((self._name, self._version.number))

    def __repr__(self):
        """Represent the package."""
        return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)

    def __str__(self):
        """Return a human-readable representation of the package."""
        return "<{}: {}-{}.{} -- {}>".format(
            self.__class__.__name__,
            self._name,
            self._version,
            self._arch,
            str(self._state),
        )

    @staticmethod
    def _apt(
        command: str,
        package_names: Union[str, List],
        optargs: Optional[List[str]] = None,
    ) -> None:
        """Wrap package management commands for Debian/Ubuntu systems.

        Args:
          command: the command given to `apt-get`
          package_names: a package name or list of package names to operate on
          optargs: an (Optional) list of additioanl arguments

        Raises:
          PackageError if an error is encountered
        """
        optargs = optargs if optargs is not None else []
        if isinstance(package_names, str):
            package_names = [package_names]
        _cmd = ["apt-get", "-y", *optargs, command, *package_names]
        try:
            env = os.environ.copy()
            env["DEBIAN_FRONTEND"] = "noninteractive"
            subprocess.run(_cmd, capture_output=True, check=True, text=True, env=env)
        except CalledProcessError as e:
            raise PackageError(
                "Could not {} package(s) [{}]: {}".format(command, [*package_names], e.stderr)
            ) from None

    def _add(self) -> None:
        """Add a package to the system."""
        self._apt(
            "install",
            "{}={}".format(self.name, self.version),
            optargs=["--option=Dpkg::Options::=--force-confold"],
        )

    def _remove(self) -> None:
        """Remove a package from the system. Implementation-specific."""
        return self._apt("remove", "{}={}".format(self.name, self.version))

    @property
    def name(self) -> str:
        """Returns the name of the package."""
        return self._name

    def ensure(self, state: PackageState):
        """Ensure that a package is in a given state.

        Args:
          state: a `PackageState` to reconcile the package to

        Raises:
          PackageError from the underlying call to apt
        """
        if self._state is not state:
            if state not in (PackageState.Present, PackageState.Latest):
                self._remove()
            else:
                self._add()
        self._state = state

    @property
    def present(self) -> bool:
        """Returns whether or not a package is present."""
        return self._state in (PackageState.Present, PackageState.Latest)

    @property
    def latest(self) -> bool:
        """Returns whether the package is the most recent version."""
        return self._state is PackageState.Latest

    @property
    def state(self) -> PackageState:
        """Returns the current package state."""
        return self._state

    @state.setter
    def state(self, state: PackageState) -> None:
        """Set the package state to a given value.

        Args:
          state: a `PackageState` to reconcile the package to

        Raises:
          PackageError from the underlying call to apt
        """
        if state in (PackageState.Latest, PackageState.Present):
            self._add()
        else:
            self._remove()
        self._state = state

    @property
    def version(self) -> "Version":
        """Returns the version for a package."""
        return self._version

    @property
    def epoch(self) -> str:
        """Returns the epoch for a package. May be unset."""
        return self._version.epoch

    @property
    def arch(self) -> str:
        """Returns the architecture for a package."""
        return self._arch

    @property
    def fullversion(self) -> str:
        """Returns the name+epoch for a package."""
        return "{}.{}".format(self._version, self._arch)

    @staticmethod
    def _get_epoch_from_version(version: str) -> Tuple[str, str]:
        """Pull the epoch, if any, out of a version string."""
        epoch_matcher = re.compile(r"^((?P<epoch>\d+):)?(?P<version>.*)")
        matches = epoch_matcher.search(version).groupdict()
        return matches.get("epoch", ""), matches.get("version")

    @classmethod
    def from_system(
        cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
    ) -> "DebianPackage":
        """Locates a package, either on the system or known to apt, and serializes the information.

        Args:
            package: a string representing the package
            version: an optional string if a specific version is requested
            arch: an optional architecture, defaulting to `dpkg --print-architecture`. If an
                architecture is not specified, this will be used for selection.

        """
        try:
            return DebianPackage.from_installed_package(package, version, arch)
        except PackageNotFoundError:
            logger.debug(
                "package '%s' is not currently installed or has the wrong architecture.", package
            )

        # Ok, try `apt-cache ...`
        try:
            return DebianPackage.from_apt_cache(package, version, arch)
        except (PackageNotFoundError, PackageError):
            # If we get here, it's not known to the systems.
            # This seems unnecessary, but virtually all `apt` commands have a return code of `100`,
            # and providing meaningful error messages without this is ugly.
            raise PackageNotFoundError(
                "Package '{}{}' could not be found on the system or in the apt cache!".format(
                    package, ".{}".format(arch) if arch else ""
                )
            ) from None

    @classmethod
    def from_installed_package(
        cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
    ) -> "DebianPackage":
        """Check whether the package is already installed and return an instance.

        Args:
            package: a string representing the package
            version: an optional string if a specific version is requested
            arch: an optional architecture, defaulting to `dpkg --print-architecture`.
                If an architecture is not specified, this will be used for selection.
        """
        system_arch = check_output(
            ["dpkg", "--print-architecture"], universal_newlines=True
        ).strip()
        arch = arch if arch else system_arch

        # Regexps are a really terrible way to do this. Thanks dpkg
        output = ""
        try:
            output = check_output(["dpkg", "-l", package], stderr=PIPE, universal_newlines=True)
        except CalledProcessError:
            raise PackageNotFoundError("Package is not installed: {}".format(package)) from None

        # Pop off the output from `dpkg -l' because there's no flag to
        # omit it`
        lines = str(output).splitlines()[5:]

        dpkg_matcher = re.compile(
            r"""
        ^(?P<package_status>\w+?)\s+
        (?P<package_name>.*?)(?P<throwaway_arch>:\w+?)?\s+
        (?P<version>.*?)\s+
        (?P<arch>\w+?)\s+
        (?P<description>.*)
        """,
            re.VERBOSE,
        )

        for line in lines:
            try:
                matches = dpkg_matcher.search(line).groupdict()
                package_status = matches["package_status"]

                if not package_status.endswith("i"):
                    logger.debug(
                        "package '%s' in dpkg output but not installed, status: '%s'",
                        package,
                        package_status,
                    )
                    break

                epoch, split_version = DebianPackage._get_epoch_from_version(matches["version"])
                pkg = DebianPackage(
                    matches["package_name"],
                    split_version,
                    epoch,
                    matches["arch"],
                    PackageState.Present,
                )
                if (pkg.arch == "all" or pkg.arch == arch) and (
                    version == "" or str(pkg.version) == version
                ):
                    return pkg
            except AttributeError:
                logger.warning("dpkg matcher could not parse line: %s", line)

        # If we didn't find it, fail through
        raise PackageNotFoundError("Package {}.{} is not installed!".format(package, arch))

    @classmethod
    def from_apt_cache(
        cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
    ) -> "DebianPackage":
        """Check whether the package is already installed and return an instance.

        Args:
            package: a string representing the package
            version: an optional string if a specific version is requested
            arch: an optional architecture, defaulting to `dpkg --print-architecture`.
                If an architecture is not specified, this will be used for selection.
        """
        system_arch = check_output(
            ["dpkg", "--print-architecture"], universal_newlines=True
        ).strip()
        arch = arch if arch else system_arch

        # Regexps are a really terrible way to do this. Thanks dpkg
        keys = ("Package", "Architecture", "Version")

        try:
            output = check_output(
                ["apt-cache", "show", package], stderr=PIPE, universal_newlines=True
            )
        except CalledProcessError as e:
            raise PackageError(
                "Could not list packages in apt-cache: {}".format(e.stderr)
            ) from None

        pkg_groups = output.strip().split("\n\n")
        keys = ("Package", "Architecture", "Version")

        for pkg_raw in pkg_groups:
            lines = str(pkg_raw).splitlines()
            vals = {}
            for line in lines:
                if line.startswith(keys):
                    items = line.split(":", 1)
                    vals[items[0]] = items[1].strip()
                else:
                    continue

            epoch, split_version = DebianPackage._get_epoch_from_version(vals["Version"])
            pkg = DebianPackage(
                vals["Package"],
                split_version,
                epoch,
                vals["Architecture"],
                PackageState.Available,
            )

            if (pkg.arch == "all" or pkg.arch == arch) and (
                version == "" or str(pkg.version) == version
            ):
                return pkg

        # If we didn't find it, fail through
        raise PackageNotFoundError("Package {}.{} is not in the apt cache!".format(package, arch))


class Version:
    """An abstraction around package versions.

    This seems like it should be strictly unnecessary, except that `apt_pkg` is not usable inside a
    venv, and wedging version comparisons into `DebianPackage` would overcomplicate it.

    This class implements the algorithm found here:
    https://www.debian.org/doc/debian-policy/ch-controlfields.html#version
    """

    def __init__(self, version: str, epoch: str):
        self._version = version
        self._epoch = epoch or ""

    def __repr__(self):
        """Represent the package."""
        return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)

    def __str__(self):
        """Return human-readable representation of the package."""
        return "{}{}".format("{}:".format(self._epoch) if self._epoch else "", self._version)

    @property
    def epoch(self):
        """Returns the epoch for a package. May be empty."""
        return self._epoch

    @property
    def number(self) -> str:
        """Returns the version number for a package."""
        return self._version

    def _get_parts(self, version: str) -> Tuple[str, str]:
        """Separate the version into component upstream and Debian pieces."""
        try:
            version.rindex("-")
        except ValueError:
            # No hyphens means no Debian version
            return version, "0"

        upstream, debian = version.rsplit("-", 1)
        return upstream, debian

    def _listify(self, revision: str) -> List[str]:
        """Split a revision string into a listself.

        This list is comprised of  alternating between strings and numbers,
        padded on either end to always be "str, int, str, int..." and
        always be of even length.  This allows us to trivially implement the
        comparison algorithm described.
        """
        result = []
        while revision:
            rev_1, remains = self._get_alphas(revision)
            rev_2, remains = self._get_digits(remains)
            result.extend([rev_1, rev_2])
            revision = remains
        return result

    def _get_alphas(self, revision: str) -> Tuple[str, str]:
        """Return a tuple of the first non-digit characters of a revision."""
        # get the index of the first digit
        for i, char in enumerate(revision):
            if char.isdigit():
                if i == 0:
                    return "", revision
                return revision[0:i], revision[i:]
        # string is entirely alphas
        return revision, ""

    def _get_digits(self, revision: str) -> Tuple[int, str]:
        """Return a tuple of the first integer characters of a revision."""
        # If the string is empty, return (0,'')
        if not revision:
            return 0, ""
        # get the index of the first non-digit
        for i, char in enumerate(revision):
            if not char.isdigit():
                if i == 0:
                    return 0, revision
                return int(revision[0:i]), revision[i:]
        # string is entirely digits
        return int(revision), ""

    def _dstringcmp(self, a, b):  # noqa: C901
        """Debian package version string section lexical sort algorithm.

        The lexical comparison is a comparison of ASCII values modified so
        that all the letters sort earlier than all the non-letters and so that
        a tilde sorts before anything, even the end of a part.
        """
        if a == b:
            return 0
        try:
            for i, char in enumerate(a):
                if char == b[i]:
                    continue
                # "a tilde sorts before anything, even the end of a part"
                # (emptyness)
                if char == "~":
                    return -1
                if b[i] == "~":
                    return 1
                # "all the letters sort earlier than all the non-letters"
                if char.isalpha() and not b[i].isalpha():
                    return -1
                if not char.isalpha() and b[i].isalpha():
                    return 1
                # otherwise lexical sort
                if ord(char) > ord(b[i]):
                    return 1
                if ord(char) < ord(b[i]):
                    return -1
        except IndexError:
            # a is longer than b but otherwise equal, greater unless there are tildes
            if char == "~":
                return -1
            return 1
        # if we get here, a is shorter than b but otherwise equal, so check for tildes...
        if b[len(a)] == "~":
            return 1
        return -1

    def _compare_revision_strings(self, first: str, second: str):  # noqa: C901
        """Compare two debian revision strings."""
        if first == second:
            return 0

        # listify pads results so that we will always be comparing ints to ints
        # and strings to strings (at least until we fall off the end of a list)
        first_list = self._listify(first)
        second_list = self._listify(second)
        if first_list == second_list:
            return 0
        try:
            for i, item in enumerate(first_list):
                # explicitly raise IndexError if we've fallen off the edge of list2
                if i >= len(second_list):
                    raise IndexError
                # if the items are equal, next
                if item == second_list[i]:
                    continue
                # numeric comparison
                if isinstance(item, int):
                    if item > second_list[i]:
                        return 1
                    if item < second_list[i]:
                        return -1
                else:
                    # string comparison
                    return self._dstringcmp(item, second_list[i])
        except IndexError:
            # rev1 is longer than rev2 but otherwise equal, hence greater
            # ...except for goddamn tildes
            if first_list[len(second_list)][0][0] == "~":
                return 1
            return 1
        # rev1 is shorter than rev2 but otherwise equal, hence lesser
        # ...except for goddamn tildes
        if second_list[len(first_list)][0][0] == "~":
            return -1
        return -1

    def _compare_version(self, other) -> int:
        if (self.number, self.epoch) == (other.number, other.epoch):
            return 0

        if self.epoch < other.epoch:
            return -1
        if self.epoch > other.epoch:
            return 1

        # If none of these are true, follow the algorithm
        upstream_version, debian_version = self._get_parts(self.number)
        other_upstream_version, other_debian_version = self._get_parts(other.number)

        upstream_cmp = self._compare_revision_strings(upstream_version, other_upstream_version)
        if upstream_cmp != 0:
            return upstream_cmp

        debian_cmp = self._compare_revision_strings(debian_version, other_debian_version)
        if debian_cmp != 0:
            return debian_cmp

        return 0

    def __lt__(self, other) -> bool:
        """Less than magic method impl."""
        return self._compare_version(other) < 0

    def __eq__(self, other) -> bool:
        """Equality magic method impl."""
        return self._compare_version(other) == 0

    def __gt__(self, other) -> bool:
        """Greater than magic method impl."""
        return self._compare_version(other) > 0

    def __le__(self, other) -> bool:
        """Less than or equal to magic method impl."""
        return self.__eq__(other) or self.__lt__(other)

    def __ge__(self, other) -> bool:
        """Greater than or equal to magic method impl."""
        return self.__gt__(other) or self.__eq__(other)

    def __ne__(self, other) -> bool:
        """Not equal to magic method impl."""
        return not self.__eq__(other)


def add_package(
    package_names: Union[str, List[str]],
    version: Optional[str] = "",
    arch: Optional[str] = "",
    update_cache: Optional[bool] = False,
) -> Union[DebianPackage, List[DebianPackage]]:
    """Add a package or list of packages to the system.

    Args:
        package_names: single package name, or list of package names
        name: the name(s) of the package(s)
        version: an (Optional) version as a string. Defaults to the latest known
        arch: an optional architecture for the package
        update_cache: whether or not to run `apt-get update` prior to operating

    Raises:
        TypeError if no package name is given, or explicit version is set for multiple packages
        PackageNotFoundError if the package is not in the cache.
        PackageError if packages fail to install
    """
    cache_refreshed = False
    if update_cache:
        update()
        cache_refreshed = True

    packages = {"success": [], "retry": [], "failed": []}

    package_names = [package_names] if isinstance(package_names, str) else package_names
    if not package_names:
        raise TypeError("Expected at least one package name to add, received zero!")

    if len(package_names) != 1 and version:
        raise TypeError(
            "Explicit version should not be set if more than one package is being added!"
        )

    for p in package_names:
        pkg, success = _add(p, version, arch)
        if success:
            packages["success"].append(pkg)
        else:
            logger.warning("failed to locate and install/update '%s'", pkg)
            packages["retry"].append(p)

    if packages["retry"] and not cache_refreshed:
        logger.info("updating the apt-cache and retrying installation of failed packages.")
        update()

        for p in packages["retry"]:
            pkg, success = _add(p, version, arch)
            if success:
                packages["success"].append(pkg)
            else:
                packages["failed"].append(p)

    if packages["failed"]:
        raise PackageError("Failed to install packages: {}".format(", ".join(packages["failed"])))

    return packages["success"] if len(packages["success"]) > 1 else packages["success"][0]


def _add(
    name: str,
    version: Optional[str] = "",
    arch: Optional[str] = "",
) -> Tuple[Union[DebianPackage, str], bool]:
    """Add a package to the system.

    Args:
        name: the name(s) of the package(s)
        version: an (Optional) version as a string. Defaults to the latest known
        arch: an optional architecture for the package

    Returns: a tuple of `DebianPackage` if found, or a :str: if it is not, and
        a boolean indicating success
    """
    try:
        pkg = DebianPackage.from_system(name, version, arch)
        pkg.ensure(state=PackageState.Present)
        return pkg, True
    except PackageNotFoundError:
        return name, False


def remove_package(
    package_names: Union[str, List[str]]
) -> Union[DebianPackage, List[DebianPackage]]:
    """Remove package(s) from the system.

    Args:
        package_names: the name of a package

    Raises:
        TypeError: if no packages are provided
    """
    packages: List[DebianPackage] = []

    package_names = [package_names] if isinstance(package_names, str) else package_names
    if not package_names:
        raise TypeError("Expected at least one package name to add, received zero!")

    for p in package_names:
        try:
            pkg = DebianPackage.from_installed_package(p)
            pkg.ensure(state=PackageState.Absent)
            packages.append(pkg)
        except PackageNotFoundError:
            logger.info("package '%s' was requested for removal, but it was not installed.", p)

    # the list of packages will be empty when no package is removed
    logger.debug("packages: '%s'", packages)
    return packages[0] if len(packages) == 1 else packages


def update() -> None:
    """Update the apt cache via `apt-get update`."""
    cmd = ["apt-get", "update", "--error-on=any"]
    try:
        subprocess.run(cmd, capture_output=True, check=True)
    except CalledProcessError as e:
        logger.error(
            "%s:\nstdout:\n%s\nstderr:\n%s",
            " ".join(cmd),
            e.stdout.decode(),
            e.stderr.decode(),
        )
        raise


def import_key(key: str) -> str:
    """Import an ASCII Armor key.

    A Radix64 format keyid is also supported for backwards
    compatibility. In this case Ubuntu keyserver will be
    queried for a key via HTTPS by its keyid. This method
    is less preferable because https proxy servers may
    require traffic decryption which is equivalent to a
    man-in-the-middle attack (a proxy server impersonates
    keyserver TLS certificates and has to be explicitly
    trusted by the system).

    Args:
        key: A GPG key in ASCII armor format, including BEGIN
            and END markers or a keyid.

    Returns:
        The GPG key filename written.

    Raises:
        GPGKeyError if the key could not be imported
    """
    key = key.strip()
    if "-" in key or "\n" in key:
        # Send everything not obviously a keyid to GPG to import, as
        # we trust its validation better than our own. eg. handling
        # comments before the key.
        logger.debug("PGP key found (looks like ASCII Armor format)")
        if (
            "-----BEGIN PGP PUBLIC KEY BLOCK-----" in key
            and "-----END PGP PUBLIC KEY BLOCK-----" in key
        ):
            logger.debug("Writing provided PGP key in the binary format")
            key_bytes = key.encode("utf-8")
            key_name = DebianRepository._get_keyid_by_gpg_key(key_bytes)
            key_gpg = DebianRepository._dearmor_gpg_key(key_bytes)
            gpg_key_filename = os.path.join(_GPG_KEY_DIR, "{}.gpg".format(key_name))
            DebianRepository._write_apt_gpg_keyfile(
                key_name=gpg_key_filename, key_material=key_gpg
            )
            return gpg_key_filename
        else:
            raise GPGKeyError("ASCII armor markers missing from GPG key")
    else:
        logger.warning(
            "PGP key found (looks like Radix64 format). "
            "SECURELY importing PGP key from keyserver; "
            "full key not provided."
        )
        # as of bionic add-apt-repository uses curl with an HTTPS keyserver URL
        # to retrieve GPG keys. `apt-key adv` command is deprecated as is
        # apt-key in general as noted in its manpage. See lp:1433761 for more
        # history. Instead, /etc/apt/trusted.gpg.d is used directly to drop
        # gpg
        key_asc = DebianRepository._get_key_by_keyid(key)
        # write the key in GPG format so that apt-key list shows it
        key_gpg = DebianRepository._dearmor_gpg_key(key_asc.encode("utf-8"))
        gpg_key_filename = os.path.join(_GPG_KEY_DIR, "{}.gpg".format(key))
        DebianRepository._write_apt_gpg_keyfile(key_name=gpg_key_filename, key_material=key_gpg)
        return gpg_key_filename


class InvalidSourceError(Error):
    """Exceptions for invalid source entries."""


class GPGKeyError(Error):
    """Exceptions for GPG keys."""


class DebianRepository:
    """An abstraction to represent a repository."""

    _deb822_stanza: Optional["_Deb822Stanza"] = None
    """set by Deb822Stanza after creating a DebianRepository"""

    def __init__(
        self,
        enabled: bool,
        repotype: str,
        uri: str,
        release: str,
        groups: List[str],
        filename: str = "",
        gpg_key_filename: str = "",
        options: Optional[Dict[str, str]] = None,
    ):
        self._enabled = enabled
        self._repotype = repotype
        self._uri = uri
        self._release = release
        self._groups = groups
        self._filename = filename
        self._gpg_key_filename = gpg_key_filename
        self._options = options

    @property
    def enabled(self):
        """Return whether or not the repository is enabled."""
        return self._enabled

    @property
    def repotype(self):
        """Return whether it is binary or source."""
        return self._repotype

    @property
    def uri(self):
        """Return the URI."""
        return self._uri

    @property
    def release(self):
        """Return which Debian/Ubuntu releases it is valid for."""
        return self._release

    @property
    def groups(self):
        """Return the enabled package groups."""
        return self._groups

    @property
    def filename(self):
        """Returns the filename for a repository."""
        return self._filename

    @filename.setter
    def filename(self, fname: str) -> None:
        """Set the filename used when a repo is written back to disk.

        Args:
            fname: a filename to write the repository information to.
        """
        if not fname.endswith((".list", ".sources")):
            raise InvalidSourceError("apt source filenames should end in .list or .sources!")
        self._filename = fname

    @property
    def gpg_key(self):
        """Returns the path to the GPG key for this repository."""
        if not self._gpg_key_filename and self._deb822_stanza is not None:
            self._gpg_key_filename = self._deb822_stanza.get_gpg_key_filename()
        return self._gpg_key_filename

    @property
    def options(self):
        """Returns any additional repo options which are set."""
        return self._options

    def make_options_string(self, include_signed_by: bool = True) -> str:
        """Generate the complete one-line-style options string for a repository.

        Combining `gpg_key`, if set (and include_signed_by is True), with any other
        provided options to form the options section of a one-line-style definition.
        """
        options = self._options if self._options else {}
        if include_signed_by and self.gpg_key:
            options["signed-by"] = self.gpg_key
        if not options:
            return ""
        pairs = ("{}={}".format(k, v) for k, v in sorted(options.items()))
        return "[{}] ".format(" ".join(pairs))

    @staticmethod
    def prefix_from_uri(uri: str) -> str:
        """Get a repo list prefix from the uri, depending on whether a path is set."""
        uridetails = urlparse(uri)
        path = (
            uridetails.path.lstrip("/").replace("/", "-") if uridetails.path else uridetails.netloc
        )
        return "/etc/apt/sources.list.d/{}".format(path)

    @staticmethod
    def from_repo_line(repo_line: str, write_file: Optional[bool] = True) -> "DebianRepository":
        """Instantiate a new `DebianRepository` from a `sources.list` entry line.

        Args:
            repo_line: a string representing a repository entry
            write_file: boolean to enable writing the new repo to disk. True by default.
                Expect it to result in an add-apt-repository call under the hood, like:
                    add-apt-repository --no-update --sourceslist="$repo_line"
        """
        repo = RepositoryMapping._parse(  # pyright: ignore[reportPrivateUsage]
            repo_line, filename="UserInput"  # temp filename
        )
        repo.filename = repo._make_filename()
        if write_file:
            _add_repository(repo)
        return repo

    def _make_filename(self) -> str:
        """Construct a filename from uri and release.

        For internal use when a filename isn't set.
        Should match the filename written to by add-apt-repository.
        """
        return "{}-{}.list".format(
            DebianRepository.prefix_from_uri(self.uri),
            self.release.replace("/", "-"),
        )

    def disable(self) -> None:
        """Remove this repository by disabling it in the source file.

        WARNING: This method does NOT alter the `self.enabled` flag.

        WARNING: disable is currently not implemented for repositories defined
        by a deb822 stanza. Raises a NotImplementedError in this case.
        """
        if self._deb822_stanza is not None:
            raise NotImplementedError(
                "Disabling a repository defined by a deb822 format source is not implemented."
                " Please raise an issue if you require this feature."
            )
        searcher = "{} {}{} {}".format(
            self.repotype, self.make_options_string(), self.uri, self.release
        )
        for line in fileinput.input(self._filename, inplace=True):
            if re.match(r"^{}\s".format(re.escape(searcher)), line):
                print("# {}".format(line), end="")
            else:
                print(line, end="")

    def import_key(self, key: str) -> None:
        """Import an ASCII Armor key.

        A Radix64 format keyid is also supported for backwards
        compatibility. In this case Ubuntu keyserver will be
        queried for a key via HTTPS by its keyid. This method
        is less preferable because https proxy servers may
        require traffic decryption which is equivalent to a
        man-in-the-middle attack (a proxy server impersonates
        keyserver TLS certificates and has to be explicitly
        trusted by the system).

        Args:
          key: A GPG key in ASCII armor format,
                      including BEGIN and END markers or a keyid.

        Raises:
          GPGKeyError if the key could not be imported
        """
        self._gpg_key_filename = import_key(key)

    @staticmethod
    def _get_keyid_by_gpg_key(key_material: bytes) -> str:
        """Get a GPG key fingerprint by GPG key material.

        Gets a GPG key fingerprint (40-digit, 160-bit) by the ASCII armor-encoded
        or binary GPG key material. Can be used, for example, to generate file
        names for keys passed via charm options.
        """
        # Use the same gpg command for both Xenial and Bionic
        cmd = ["gpg", "--with-colons", "--with-fingerprint"]
        ps = subprocess.run(
            cmd,
            stdout=PIPE,
            stderr=PIPE,
            input=key_material,
        )
        out, err = ps.stdout.decode(), ps.stderr.decode()
        if "gpg: no valid OpenPGP data found." in err:
            raise GPGKeyError("Invalid GPG key material provided")
        # from gnupg2 docs: fpr :: Fingerprint (fingerprint is in field 10)
        return re.search(r"^fpr:{9}([0-9A-F]{40}):$", out, re.MULTILINE).group(1)

    @staticmethod
    def _get_key_by_keyid(keyid: str) -> str:
        """Get a key via HTTPS from the Ubuntu keyserver.

        Different key ID formats are supported by SKS keyservers (the longer ones
        are more secure, see "dead beef attack" and https://evil32.com/). Since
        HTTPS is used, if SSLBump-like HTTPS proxies are in place, they will
        impersonate keyserver.ubuntu.com and generate a certificate with
        keyserver.ubuntu.com in the CN field or in SubjAltName fields of a
        certificate. If such proxy behavior is expected it is necessary to add the
        CA certificate chain containing the intermediate CA of the SSLBump proxy to
        every machine that this code runs on via ca-certs cloud-init directive (via
        cloudinit-userdata model-config) or via other means (such as through a
        custom charm option). Also note that DNS resolution for the hostname in a
        URL is done at a proxy server - not at the client side.
        8-digit (32 bit) key ID
        https://keyserver.ubuntu.com/pks/lookup?search=0x4652B4E6
        16-digit (64 bit) key ID
        https://keyserver.ubuntu.com/pks/lookup?search=0x6E85A86E4652B4E6
        40-digit key ID:
        https://keyserver.ubuntu.com/pks/lookup?search=0x35F77D63B5CEC106C577ED856E85A86E4652B4E6

        Args:
          keyid: An 8, 16 or 40 hex digit keyid to find a key for

        Returns:
          A string contining key material for the specified GPG key id


        Raises:
          subprocess.CalledProcessError
        """
        # options=mr - machine-readable output (disables html wrappers)
        keyserver_url = (
            "https://keyserver.ubuntu.com" "/pks/lookup?op=get&options=mr&exact=on&search=0x{}"
        )
        curl_cmd = ["curl", keyserver_url.format(keyid)]
        # use proxy server settings in order to retrieve the key
        return check_output(curl_cmd).decode()

    @staticmethod
    def _dearmor_gpg_key(key_asc: bytes) -> bytes:
        """Convert a GPG key in the ASCII armor format to the binary format.

        Args:
          key_asc: A GPG key in ASCII armor format.

        Returns:
          A GPG key in binary format as a string

        Raises:
          GPGKeyError
        """
        ps = subprocess.run(["gpg", "--dearmor"], stdout=PIPE, stderr=PIPE, input=key_asc)
        out, err = ps.stdout, ps.stderr.decode()
        if "gpg: no valid OpenPGP data found." in err:
            raise GPGKeyError(
                "Invalid GPG key material. Check your network setup"
                " (MTU, routing, DNS) and/or proxy server settings"
                " as well as destination keyserver status."
            )
        else:
            return out

    @staticmethod
    def _write_apt_gpg_keyfile(key_name: str, key_material: bytes) -> None:
        """Write GPG key material into a file at a provided path.

        Args:
          key_name: A key name to use for a key file (could be a fingerprint)
          key_material: A GPG key material (binary)
        """
        with open(key_name, "wb") as keyf:
            keyf.write(key_material)


def _repo_to_identifier(repo: DebianRepository) -> str:
    """Return str identifier derived from repotype, uri, and release.

    Private method used to produce the identifiers used by RepositoryMapping.
    """
    return "{}-{}-{}".format(repo.repotype, repo.uri, repo.release)


def _repo_to_line(repo: DebianRepository, include_signed_by: bool = True) -> str:
    """Return the one-per-line format repository definition."""
    return "{prefix}{repotype} {options}{uri} {release} {groups}".format(
        prefix="" if repo.enabled else "#",
        repotype=repo.repotype,
        options=repo.make_options_string(include_signed_by=include_signed_by),
        uri=repo.uri,
        release=repo.release,
        groups=" ".join(repo.groups),
    )


class RepositoryMapping(Mapping[str, DebianRepository]):
    """An representation of known repositories.

    Instantiation of `RepositoryMapping` will iterate through the
    filesystem, parse out repository files in `/etc/apt/...`, and create
    `DebianRepository` objects in this list.

    Typical usage:

        repositories = apt.RepositoryMapping()
        repositories.add(DebianRepository(
            enabled=True, repotype="deb", uri="https://example.com", release="focal",
            groups=["universe"]
        ))
    """

    _apt_dir = "/etc/apt"
    _sources_subdir = "sources.list.d"
    _default_list_name = "sources.list"
    _default_sources_name = "ubuntu.sources"
    _last_errors: Tuple[Error, ...] = ()

    def __init__(self):
        self._repository_map: Dict[str, DebianRepository] = {}
        self.default_file = os.path.join(self._apt_dir, self._default_list_name)
        # ^ public attribute for backwards compatibility only
        sources_dir = os.path.join(self._apt_dir, self._sources_subdir)
        default_sources = os.path.join(sources_dir, self._default_sources_name)

        # read sources.list if it exists
        # ignore InvalidSourceError if ubuntu.sources also exists
        # -- in this case, sources.list just contains a comment
        if os.path.isfile(self.default_file):
            try:
                self.load(self.default_file)
            except InvalidSourceError:
                if not os.path.isfile(default_sources):
                    raise

        # read sources.list.d
        for file in glob.iglob(os.path.join(sources_dir, "*.list")):
            self.load(file)
        for file in glob.iglob(os.path.join(sources_dir, "*.sources")):
            self.load_deb822(file)

    def __contains__(self, key: Any) -> bool:
        """Magic method for checking presence of repo in mapping.

        Checks against the string names used to identify repositories.
        """
        return key in self._repository_map

    def __len__(self) -> int:
        """Return number of repositories in map."""
        return len(self._repository_map)

    def __iter__(self) -> Iterator[DebianRepository]:
        """Return iterator for RepositoryMapping.

        Iterates over the DebianRepository values rather than the string names.
        FIXME: this breaks the expectations of the Mapping abstract base class
            for example when it provides methods like keys and items
        """
        return iter(self._repository_map.values())

    def __getitem__(self, repository_uri: str) -> DebianRepository:
        """Return a given `DebianRepository`."""
        return self._repository_map[repository_uri]

    def __setitem__(self, repository_uri: str, repository: DebianRepository) -> None:
        """Add a `DebianRepository` to the cache."""
        self._repository_map[repository_uri] = repository

    def load_deb822(self, filename: str) -> None:
        """Load a deb822 format repository source file into the cache.

        In contrast to one-line-style, the deb822 format specifies a repository
        using a multi-line stanza. Stanzas are separated by whitespace,
        and each definition consists of lines that are either key: value pairs,
        or continuations of the previous value.

        Read more about the deb822 format here:
            https://manpages.ubuntu.com/manpages/noble/en/man5/sources.list.5.html
        For instance, ubuntu 24.04 (noble) lists its sources using deb822 style in:
            /etc/apt/sources.list.d/ubuntu.sources
        """
        with open(filename, "r") as f:
            repos, errors = self._parse_deb822_lines(f, filename=filename)
        for repo in repos:
            self._repository_map[_repo_to_identifier(repo)] = repo
        if errors:
            self._last_errors = tuple(errors)
            logger.debug(
                "the following %d error(s) were encountered when reading deb822 sources:\n%s",
                len(errors),
                "\n".join(str(e) for e in errors),
            )
        if repos:
            logger.info("parsed %d apt package repositories from %s", len(repos), filename)
        else:
            raise InvalidSourceError("all repository lines in '{}' were invalid!".format(filename))

    @classmethod
    def _parse_deb822_lines(
        cls,
        lines: Iterable[str],
        filename: str = "",
    ) -> Tuple[List[DebianRepository], List[InvalidSourceError]]:
        """Parse lines from a deb822 file into a list of repos and a list of errors.

        The semantics of `_parse_deb822_lines` slightly different to `_parse`:
            `_parse` reads a commented out line as an entry that is not enabled
            `_parse_deb822_lines` strips out comments entirely when parsing a file into stanzas,
                instead only reading the 'Enabled' key to determine if an entry is enabled
        """
        repos: List[DebianRepository] = []
        errors: List[InvalidSourceError] = []
        for numbered_lines in _iter_deb822_stanzas(lines):
            try:
                stanza = _Deb822Stanza(numbered_lines=numbered_lines, filename=filename)
            except InvalidSourceError as e:
                errors.append(e)
            else:
                repos.extend(stanza.repos)
        return repos, errors

    def load(self, filename: str):
        """Load a one-line-style format repository source file into the cache.

        Args:
          filename: the path to the repository file
        """
        parsed: List[int] = []
        skipped: List[int] = []
        with open(filename, "r") as f:
            for n, line in enumerate(f, start=1):  # 1 indexed line numbers
                try:
                    repo = self._parse(line, filename)
                except InvalidSourceError:
                    skipped.append(n)
                else:
                    repo_identifier = "{}-{}-{}".format(repo.repotype, repo.uri, repo.release)
                    self._repository_map[repo_identifier] = repo
                    parsed.append(n)
                    logger.debug("parsed repo: '%s'", repo_identifier)

        if skipped:
            skip_list = ", ".join(str(s) for s in skipped)
            logger.debug("skipped the following lines in file '%s': %s", filename, skip_list)

        if parsed:
            logger.info("parsed %d apt package repositories from %s", len(parsed), filename)
        else:
            raise InvalidSourceError("all repository lines in '{}' were invalid!".format(filename))

    @staticmethod
    def _parse(line: str, filename: str) -> DebianRepository:
        """Parse a line in a sources.list file.

        Args:
          line: a single line from `load` to parse
          filename: the filename being read

        Raises:
          InvalidSourceError if the source type is unknown
        """
        enabled = True
        repotype = uri = release = gpg_key = ""
        options = {}
        groups = []

        line = line.strip()
        if line.startswith("#"):
            enabled = False
            line = line[1:]

        # Check for "#" in the line and treat a part after it as a comment then strip it off.
        i = line.find("#")
        if i > 0:
            line = line[:i]

        # Split a source into substrings to initialize a new repo.
        source = line.strip()
        if source:
            # Match any repo options, and get a dict representation.
            for v in re.findall(OPTIONS_MATCHER, source):
                opts = dict(o.split("=") for o in v.strip("[]").split())
                # Extract the 'signed-by' option for the gpg_key
                gpg_key = opts.pop("signed-by", "")
                options = opts

            # Remove any options from the source string and split the string into chunks
            source = re.sub(OPTIONS_MATCHER, "", source)
            chunks = source.split()

            # Check we've got a valid list of chunks
            if len(chunks) < 3 or chunks[0] not in VALID_SOURCE_TYPES:
                raise InvalidSourceError("An invalid sources line was found in %s!", filename)

            repotype = chunks[0]
            uri = chunks[1]
            release = chunks[2]
            groups = chunks[3:]

            return DebianRepository(
                enabled, repotype, uri, release, groups, filename, gpg_key, options
            )
        else:
            raise InvalidSourceError("An invalid sources line was found in %s!", filename)

    def add(  # noqa: D417  # undocumented-param: default_filename intentionally undocumented
        self, repo: DebianRepository, default_filename: Optional[bool] = False
    ) -> None:
        """Add a new repository to the system using add-apt-repository.

        Args:
            repo: a DebianRepository object
                if repo.enabled is falsey, will return without adding the repository
        Raises:
            CalledProcessError: if there's an error running apt-add-repository

        WARNING: Does not associate the repository with a signing key.
        Use `import_key` to add a signing key globally.

        WARNING: if repo.enabled is falsey, will return without adding the repository

        WARNING: Don't forget to call `apt.update` before installing any packages!
        Or call `apt.add_package` with `update_cache=True`.

        WARNING: the default_filename keyword argument is provided for backwards compatibility
        only. It is not used, and was not used in the previous revision of this library.
        """
        if not repo.enabled:
            logger.warning(
                (
                    "Returning from RepositoryMapping.add(repo=%s) without adding the repo"
                    " because repo.enabled is %s"
                ),
                repo,
                repo.enabled,
            )
            return
        _add_repository(repo)
        self._repository_map[_repo_to_identifier(repo)] = repo

    def disable(self, repo: DebianRepository) -> None:
        """Remove a repository by disabling it in the source file.

        WARNING: disable is currently not implemented for repositories defined
        by a deb822 stanza, and will raise a NotImplementedError if called on one.

        WARNING: This method does NOT alter the `.enabled` flag on the DebianRepository.
        """
        repo.disable()
        self._repository_map[_repo_to_identifier(repo)] = repo
        # ^ adding to map on disable seems like a bug, but this is the previous behaviour


def _add_repository(
    repo: DebianRepository,
    remove: bool = False,
    update_cache: bool = False,
) -> None:
    line = _repo_to_line(repo, include_signed_by=False)
    key_file = repo.gpg_key
    if key_file and not remove and not os.path.exists(key_file):
        msg = (
            "Adding repository '{line}' with add-apt-repository."
            " Key file '{key_file}' does not exist."
            " Ensure it is imported correctly to use this repository."
        ).format(line=line, key_file=key_file)
        logger.warning(msg)
    cmd = [
        "add-apt-repository",
        "--yes",
        "--sourceslist=" + line,
    ]
    if remove:
        cmd.append("--remove")
    if not update_cache:
        cmd.append("--no-update")
    logger.info("%s", cmd)
    try:
        subprocess.run(cmd, check=True, capture_output=True)
    except CalledProcessError as e:
        logger.error(
            "subprocess.run(%s):\nstdout:\n%s\nstderr:\n%s",
            cmd,
            e.stdout.decode(),
            e.stderr.decode(),
        )
        raise


class _Deb822Stanza:
    """Representation of a stanza from a deb822 source file.

    May define multiple DebianRepository objects.
    """

    def __init__(self, numbered_lines: List[Tuple[int, str]], filename: str = ""):
        self._filename = filename
        self._numbered_lines = numbered_lines
        if not numbered_lines:
            self._repos = ()
            self._gpg_key_filename = ""
            self._gpg_key_from_stanza = None
            return
        options, line_numbers = _deb822_stanza_to_options(numbered_lines)
        repos, gpg_key_info = _deb822_options_to_repos(
            options, line_numbers=line_numbers, filename=filename
        )
        for repo in repos:
            repo._deb822_stanza = self  # pyright: ignore[reportPrivateUsage]
        self._repos = repos
        self._gpg_key_filename, self._gpg_key_from_stanza = gpg_key_info

    @property
    def repos(self) -> Tuple[DebianRepository, ...]:
        """The repositories defined by this deb822 stanza."""
        return self._repos

    def get_gpg_key_filename(self) -> str:
        """Return the path to the GPG key for this stanza.

        Import the key first, if the key itself was provided in the stanza.
        Return an empty string if no filename or key was provided.
        """
        if self._gpg_key_filename:
            return self._gpg_key_filename
        if self._gpg_key_from_stanza is None:
            return ""
        # a gpg key was provided in the stanza
        # and we haven't already imported it
        self._gpg_key_filename = import_key(self._gpg_key_from_stanza)
        return self._gpg_key_filename


class MissingRequiredKeyError(InvalidSourceError):
    """Missing a required value in a source file."""

    def __init__(self, message: str = "", *, file: str, line: Optional[int], key: str) -> None:
        super().__init__(message, file, line, key)
        self.file = file
        self.line = line
        self.key = key


class BadValueError(InvalidSourceError):
    """Bad value for an entry in a source file."""

    def __init__(
        self,
        message: str = "",
        *,
        file: str,
        line: Optional[int],
        key: str,
        value: str,
    ) -> None:
        super().__init__(message, file, line, key, value)
        self.file = file
        self.line = line
        self.key = key
        self.value = value


def _iter_deb822_stanzas(lines: Iterable[str]) -> Iterator[List[Tuple[int, str]]]:
    """Given lines from a deb822 format file, yield a stanza of lines.

    Args:
        lines: an iterable of lines from a deb822 sources file

    Yields:
        lists of numbered lines (a tuple of line number and line) that make up
        a deb822 stanza, with comments stripped out (but accounted for in line numbering)
    """
    current_stanza: List[Tuple[int, str]] = []
    for n, line in enumerate(lines, start=1):  # 1 indexed line numbers
        if not line.strip():  # blank lines separate stanzas
            if current_stanza:
                yield current_stanza
                current_stanza = []
            continue
        content, _delim, _comment = line.partition("#")
        if content.strip():  # skip (potentially indented) comment line
            current_stanza.append((n, content.rstrip()))  # preserve indent
    if current_stanza:
        yield current_stanza


def _deb822_stanza_to_options(
    lines: Iterable[Tuple[int, str]],
) -> Tuple[Dict[str, str], Dict[str, int]]:
    """Turn numbered lines into a dict of options and a dict of line numbers.

    Args:
        lines: an iterable of numbered lines (a tuple of line number and line)

    Returns:
        a dictionary of option names to (potentially multiline) values, and
        a dictionary of option names to starting line number
    """
    parts: Dict[str, List[str]] = {}
    line_numbers: Dict[str, int] = {}
    current = None
    for n, line in lines:
        assert "#" not in line  # comments should be stripped out
        if line.startswith(" "):  # continuation of previous key's value
            assert current is not None
            parts[current].append(line.rstrip())  # preserve indent
            continue
        raw_key, _, raw_value = line.partition(":")
        current = raw_key.strip()
        parts[current] = [raw_value.strip()]
        line_numbers[current] = n
    options = {k: "\n".join(v) for k, v in parts.items()}
    return options, line_numbers


def _deb822_options_to_repos(
    options: Dict[str, str], line_numbers: Mapping[str, int] = {}, filename: str = ""
) -> Tuple[Tuple[DebianRepository, ...], Tuple[str, Optional[str]]]:
    """Return a collections of DebianRepository objects defined by this deb822 stanza.

    Args:
        options: a dictionary of deb822 field names to string options
        line_numbers: a dictionary of field names to line numbers (for error messages)
        filename: the file the options were read from (for repository object and errors)

    Returns:
        a tuple of `DebianRepository`s, and
        a tuple of the gpg key filename and optional in-stanza provided key itself

    Raises:
      InvalidSourceError if any options are malformed or required options are missing
    """
    # Enabled
    enabled_field = options.pop("Enabled", "yes")
    if enabled_field == "yes":
        enabled = True
    elif enabled_field == "no":
        enabled = False
    else:
        raise BadValueError(
            "Must be one of yes or no (default: yes).",
            file=filename,
            line=line_numbers.get("Enabled"),
            key="Enabled",
            value=enabled_field,
        )
    # Signed-By
    gpg_key_file = options.pop("Signed-By", "")
    gpg_key_from_stanza: Optional[str] = None
    if "\n" in gpg_key_file:
        # actually a literal multi-line gpg-key rather than a filename
        gpg_key_from_stanza = gpg_key_file
        gpg_key_file = ""
    # Types
    try:
        repotypes = options.pop("Types").split()
        uris = options.pop("URIs").split()
        suites = options.pop("Suites").split()
    except KeyError as e:
        [key] = e.args
        raise MissingRequiredKeyError(
            key=key,
            line=min(line_numbers.values()) if line_numbers else None,
            file=filename,
        )
    # Components
    # suite can specify an exact path, in which case the components must be omitted and suite must end with a slash (/).
    # If suite does not specify an exact path, at least one component must be present.
    # https://manpages.ubuntu.com/manpages/noble/man5/sources.list.5.html
    components: List[str]
    if len(suites) == 1 and suites[0].endswith("/"):
        if "Components" in options:
            msg = (
                "Since 'Suites' (line {suites_line}) specifies"
                " a path relative to  'URIs' (line {uris_line}),"
                " 'Components' must be  ommitted."
            ).format(
                suites_line=line_numbers.get("Suites"),
                uris_line=line_numbers.get("URIs"),
            )
            raise BadValueError(
                msg,
                file=filename,
                line=line_numbers.get("Components"),
                key="Components",
                value=options["Components"],
            )
        components = []
    else:
        if "Components" not in options:
            msg = (
                "Since 'Suites' (line {suites_line}) does not specify"
                " a path relative to  'URIs' (line {uris_line}),"
                " 'Components' must be  present in this stanza."
            ).format(
                suites_line=line_numbers.get("Suites"),
                uris_line=line_numbers.get("URIs"),
            )
            raise MissingRequiredKeyError(
                msg,
                file=filename,
                line=min(line_numbers.values()) if line_numbers else None,
                key="Components",
            )
        components = options.pop("Components").split()
    repos = tuple(
        DebianRepository(
            enabled=enabled,
            repotype=repotype,
            uri=uri,
            release=suite,
            groups=components,
            filename=filename,
            gpg_key_filename=gpg_key_file,
            options=options,
        )
        for repotype in repotypes
        for uri in uris
        for suite in suites
    )
    return repos, (gpg_key_file, gpg_key_from_stanza)