Operator Libs Linux
- Jon Seager
Channel | Revision | Published | Runs on |
---|---|---|---|
latest/stable | 2 | 09 Mar 2023 | |
latest/stable | 1 | 28 Oct 2021 |
juju deploy operator-libs-linux
Deploy universal operators easily with Juju, the Universal Operator Lifecycle Manager.
Platform:
22.04
20.04
charms.operator_libs_linux.v0.apt
-
- Last updated 21 Aug 2024
- Revision Library version 0.14
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Abstractions for the system's Debian/Ubuntu package information and repositories.
This module contains abstractions and wrappers around Debian/Ubuntu-style repositories and
packages, in order to easily provide an idiomatic and Pythonic mechanism for adding packages and/or
repositories to systems for use in machine charms.
A sane default configuration is attainable through nothing more than instantiation of the
appropriate classes. `DebianPackage` objects provide information about the architecture, version,
name, and status of a package.
`DebianPackage` will try to look up a package either from `dpkg -L` or from `apt-cache` when
provided with a string indicating the package name. If it cannot be located, `PackageNotFoundError`
will be returned, as `apt` and `dpkg` otherwise return `100` for all errors, and a meaningful error
message if the package is not known is desirable.
To install packages with convenience methods:
```python
try:
# Run `apt-get update`
apt.update()
apt.add_package("zsh")
apt.add_package(["vim", "htop", "wget"])
except PackageNotFoundError:
logger.error("a specified package not found in package cache or on system")
except PackageError as e:
logger.error("could not install package. Reason: %s", e.message)
````
To find details of a specific package:
```python
try:
vim = apt.DebianPackage.from_system("vim")
# To find from the apt cache only
# apt.DebianPackage.from_apt_cache("vim")
# To find from installed packages only
# apt.DebianPackage.from_installed_package("vim")
vim.ensure(PackageState.Latest)
logger.info("updated vim to version: %s", vim.fullversion)
except PackageNotFoundError:
logger.error("a specified package not found in package cache or on system")
except PackageError as e:
logger.error("could not install package. Reason: %s", e.message)
```
`RepositoryMapping` will return a dict-like object containing enabled system repositories
and their properties (available groups, baseuri. gpg key). This class can add, disable, or
manipulate repositories. Items can be retrieved as `DebianRepository` objects.
In order add a new repository with explicit details for fields, a new `DebianRepository` can
be added to `RepositoryMapping`
`RepositoryMapping` provides an abstraction around the existing repositories on the system,
and can be accessed and iterated over like any `Mapping` object, to retrieve values by key,
iterate, or perform other operations.
Keys are constructed as `{repo_type}-{}-{release}` in order to uniquely identify a repository.
Repositories can be added with explicit values through a Python constructor.
Example:
```python
repositories = apt.RepositoryMapping()
if "deb-example.com-focal" not in repositories:
repositories.add(DebianRepository(enabled=True, repotype="deb",
uri="https://example.com", release="focal", groups=["universe"]))
```
Alternatively, any valid `sources.list` line may be used to construct a new
`DebianRepository`.
Example:
```python
repositories = apt.RepositoryMapping()
if "deb-us.archive.ubuntu.com-xenial" not in repositories:
line = "deb http://us.archive.ubuntu.com/ubuntu xenial main restricted"
repo = DebianRepository.from_repo_line(line)
repositories.add(repo)
```
"""
import fileinput
import glob
import logging
import os
import re
import subprocess
from collections.abc import Mapping
from enum import Enum
from subprocess import PIPE, CalledProcessError, check_output
from typing import Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "7c3dbc9c2ad44a47bd6fcb25caa270e5"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 14
VALID_SOURCE_TYPES = ("deb", "deb-src")
OPTIONS_MATCHER = re.compile(r"\[.*?\]")
class Error(Exception):
"""Base class of most errors raised by this library."""
def __repr__(self):
"""Represent the Error."""
return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args)
@property
def name(self):
"""Return a string representation of the model plus class."""
return "<{}.{}>".format(type(self).__module__, type(self).__name__)
@property
def message(self):
"""Return the message passed as an argument."""
return self.args[0]
class PackageError(Error):
"""Raised when there's an error installing or removing a package."""
class PackageNotFoundError(Error):
"""Raised when a requested package is not known to the system."""
class PackageState(Enum):
"""A class to represent possible package states."""
Present = "present"
Absent = "absent"
Latest = "latest"
Available = "available"
class DebianPackage:
"""Represents a traditional Debian package and its utility functions.
`DebianPackage` wraps information and functionality around a known package, whether installed
or available. The version, epoch, name, and architecture can be easily queried and compared
against other `DebianPackage` objects to determine the latest version or to install a specific
version.
The representation of this object as a string mimics the output from `dpkg` for familiarity.
Installation and removal of packages is handled through the `state` property or `ensure`
method, with the following options:
apt.PackageState.Absent
apt.PackageState.Available
apt.PackageState.Present
apt.PackageState.Latest
When `DebianPackage` is initialized, the state of a given `DebianPackage` object will be set to
`Available`, `Present`, or `Latest`, with `Absent` implemented as a convenience for removal
(though it operates essentially the same as `Available`).
"""
def __init__(
self, name: str, version: str, epoch: str, arch: str, state: PackageState
) -> None:
self._name = name
self._arch = arch
self._state = state
self._version = Version(version, epoch)
def __eq__(self, other) -> bool:
"""Equality for comparison.
Args:
other: a `DebianPackage` object for comparison
Returns:
A boolean reflecting equality
"""
return isinstance(other, self.__class__) and (
self._name,
self._version.number,
) == (other._name, other._version.number)
def __hash__(self):
"""Return a hash of this package."""
return hash((self._name, self._version.number))
def __repr__(self):
"""Represent the package."""
return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)
def __str__(self):
"""Return a human-readable representation of the package."""
return "<{}: {}-{}.{} -- {}>".format(
self.__class__.__name__,
self._name,
self._version,
self._arch,
str(self._state),
)
@staticmethod
def _apt(
command: str,
package_names: Union[str, List],
optargs: Optional[List[str]] = None,
) -> None:
"""Wrap package management commands for Debian/Ubuntu systems.
Args:
command: the command given to `apt-get`
package_names: a package name or list of package names to operate on
optargs: an (Optional) list of additioanl arguments
Raises:
PackageError if an error is encountered
"""
optargs = optargs if optargs is not None else []
if isinstance(package_names, str):
package_names = [package_names]
_cmd = ["apt-get", "-y", *optargs, command, *package_names]
try:
env = os.environ.copy()
env["DEBIAN_FRONTEND"] = "noninteractive"
subprocess.run(_cmd, capture_output=True, check=True, text=True, env=env)
except CalledProcessError as e:
raise PackageError(
"Could not {} package(s) [{}]: {}".format(command, [*package_names], e.stderr)
) from None
def _add(self) -> None:
"""Add a package to the system."""
self._apt(
"install",
"{}={}".format(self.name, self.version),
optargs=["--option=Dpkg::Options::=--force-confold"],
)
def _remove(self) -> None:
"""Remove a package from the system. Implementation-specific."""
return self._apt("remove", "{}={}".format(self.name, self.version))
@property
def name(self) -> str:
"""Returns the name of the package."""
return self._name
def ensure(self, state: PackageState):
"""Ensure that a package is in a given state.
Args:
state: a `PackageState` to reconcile the package to
Raises:
PackageError from the underlying call to apt
"""
if self._state is not state:
if state not in (PackageState.Present, PackageState.Latest):
self._remove()
else:
self._add()
self._state = state
@property
def present(self) -> bool:
"""Returns whether or not a package is present."""
return self._state in (PackageState.Present, PackageState.Latest)
@property
def latest(self) -> bool:
"""Returns whether the package is the most recent version."""
return self._state is PackageState.Latest
@property
def state(self) -> PackageState:
"""Returns the current package state."""
return self._state
@state.setter
def state(self, state: PackageState) -> None:
"""Set the package state to a given value.
Args:
state: a `PackageState` to reconcile the package to
Raises:
PackageError from the underlying call to apt
"""
if state in (PackageState.Latest, PackageState.Present):
self._add()
else:
self._remove()
self._state = state
@property
def version(self) -> "Version":
"""Returns the version for a package."""
return self._version
@property
def epoch(self) -> str:
"""Returns the epoch for a package. May be unset."""
return self._version.epoch
@property
def arch(self) -> str:
"""Returns the architecture for a package."""
return self._arch
@property
def fullversion(self) -> str:
"""Returns the name+epoch for a package."""
return "{}.{}".format(self._version, self._arch)
@staticmethod
def _get_epoch_from_version(version: str) -> Tuple[str, str]:
"""Pull the epoch, if any, out of a version string."""
epoch_matcher = re.compile(r"^((?P<epoch>\d+):)?(?P<version>.*)")
matches = epoch_matcher.search(version).groupdict()
return matches.get("epoch", ""), matches.get("version")
@classmethod
def from_system(
cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
) -> "DebianPackage":
"""Locates a package, either on the system or known to apt, and serializes the information.
Args:
package: a string representing the package
version: an optional string if a specific version is requested
arch: an optional architecture, defaulting to `dpkg --print-architecture`. If an
architecture is not specified, this will be used for selection.
"""
try:
return DebianPackage.from_installed_package(package, version, arch)
except PackageNotFoundError:
logger.debug(
"package '%s' is not currently installed or has the wrong architecture.", package
)
# Ok, try `apt-cache ...`
try:
return DebianPackage.from_apt_cache(package, version, arch)
except (PackageNotFoundError, PackageError):
# If we get here, it's not known to the systems.
# This seems unnecessary, but virtually all `apt` commands have a return code of `100`,
# and providing meaningful error messages without this is ugly.
raise PackageNotFoundError(
"Package '{}{}' could not be found on the system or in the apt cache!".format(
package, ".{}".format(arch) if arch else ""
)
) from None
@classmethod
def from_installed_package(
cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
) -> "DebianPackage":
"""Check whether the package is already installed and return an instance.
Args:
package: a string representing the package
version: an optional string if a specific version is requested
arch: an optional architecture, defaulting to `dpkg --print-architecture`.
If an architecture is not specified, this will be used for selection.
"""
system_arch = check_output(
["dpkg", "--print-architecture"], universal_newlines=True
).strip()
arch = arch if arch else system_arch
# Regexps are a really terrible way to do this. Thanks dpkg
output = ""
try:
output = check_output(["dpkg", "-l", package], stderr=PIPE, universal_newlines=True)
except CalledProcessError:
raise PackageNotFoundError("Package is not installed: {}".format(package)) from None
# Pop off the output from `dpkg -l' because there's no flag to
# omit it`
lines = str(output).splitlines()[5:]
dpkg_matcher = re.compile(
r"""
^(?P<package_status>\w+?)\s+
(?P<package_name>.*?)(?P<throwaway_arch>:\w+?)?\s+
(?P<version>.*?)\s+
(?P<arch>\w+?)\s+
(?P<description>.*)
""",
re.VERBOSE,
)
for line in lines:
try:
matches = dpkg_matcher.search(line).groupdict()
package_status = matches["package_status"]
if not package_status.endswith("i"):
logger.debug(
"package '%s' in dpkg output but not installed, status: '%s'",
package,
package_status,
)
break
epoch, split_version = DebianPackage._get_epoch_from_version(matches["version"])
pkg = DebianPackage(
matches["package_name"],
split_version,
epoch,
matches["arch"],
PackageState.Present,
)
if (pkg.arch == "all" or pkg.arch == arch) and (
version == "" or str(pkg.version) == version
):
return pkg
except AttributeError:
logger.warning("dpkg matcher could not parse line: %s", line)
# If we didn't find it, fail through
raise PackageNotFoundError("Package {}.{} is not installed!".format(package, arch))
@classmethod
def from_apt_cache(
cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
) -> "DebianPackage":
"""Check whether the package is already installed and return an instance.
Args:
package: a string representing the package
version: an optional string if a specific version is requested
arch: an optional architecture, defaulting to `dpkg --print-architecture`.
If an architecture is not specified, this will be used for selection.
"""
system_arch = check_output(
["dpkg", "--print-architecture"], universal_newlines=True
).strip()
arch = arch if arch else system_arch
# Regexps are a really terrible way to do this. Thanks dpkg
keys = ("Package", "Architecture", "Version")
try:
output = check_output(
["apt-cache", "show", package], stderr=PIPE, universal_newlines=True
)
except CalledProcessError as e:
raise PackageError(
"Could not list packages in apt-cache: {}".format(e.stderr)
) from None
pkg_groups = output.strip().split("\n\n")
keys = ("Package", "Architecture", "Version")
for pkg_raw in pkg_groups:
lines = str(pkg_raw).splitlines()
vals = {}
for line in lines:
if line.startswith(keys):
items = line.split(":", 1)
vals[items[0]] = items[1].strip()
else:
continue
epoch, split_version = DebianPackage._get_epoch_from_version(vals["Version"])
pkg = DebianPackage(
vals["Package"],
split_version,
epoch,
vals["Architecture"],
PackageState.Available,
)
if (pkg.arch == "all" or pkg.arch == arch) and (
version == "" or str(pkg.version) == version
):
return pkg
# If we didn't find it, fail through
raise PackageNotFoundError("Package {}.{} is not in the apt cache!".format(package, arch))
class Version:
"""An abstraction around package versions.
This seems like it should be strictly unnecessary, except that `apt_pkg` is not usable inside a
venv, and wedging version comparisons into `DebianPackage` would overcomplicate it.
This class implements the algorithm found here:
https://www.debian.org/doc/debian-policy/ch-controlfields.html#version
"""
def __init__(self, version: str, epoch: str):
self._version = version
self._epoch = epoch or ""
def __repr__(self):
"""Represent the package."""
return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)
def __str__(self):
"""Return human-readable representation of the package."""
return "{}{}".format("{}:".format(self._epoch) if self._epoch else "", self._version)
@property
def epoch(self):
"""Returns the epoch for a package. May be empty."""
return self._epoch
@property
def number(self) -> str:
"""Returns the version number for a package."""
return self._version
def _get_parts(self, version: str) -> Tuple[str, str]:
"""Separate the version into component upstream and Debian pieces."""
try:
version.rindex("-")
except ValueError:
# No hyphens means no Debian version
return version, "0"
upstream, debian = version.rsplit("-", 1)
return upstream, debian
def _listify(self, revision: str) -> List[str]:
"""Split a revision string into a listself.
This list is comprised of alternating between strings and numbers,
padded on either end to always be "str, int, str, int..." and
always be of even length. This allows us to trivially implement the
comparison algorithm described.
"""
result = []
while revision:
rev_1, remains = self._get_alphas(revision)
rev_2, remains = self._get_digits(remains)
result.extend([rev_1, rev_2])
revision = remains
return result
def _get_alphas(self, revision: str) -> Tuple[str, str]:
"""Return a tuple of the first non-digit characters of a revision."""
# get the index of the first digit
for i, char in enumerate(revision):
if char.isdigit():
if i == 0:
return "", revision
return revision[0:i], revision[i:]
# string is entirely alphas
return revision, ""
def _get_digits(self, revision: str) -> Tuple[int, str]:
"""Return a tuple of the first integer characters of a revision."""
# If the string is empty, return (0,'')
if not revision:
return 0, ""
# get the index of the first non-digit
for i, char in enumerate(revision):
if not char.isdigit():
if i == 0:
return 0, revision
return int(revision[0:i]), revision[i:]
# string is entirely digits
return int(revision), ""
def _dstringcmp(self, a, b): # noqa: C901
"""Debian package version string section lexical sort algorithm.
The lexical comparison is a comparison of ASCII values modified so
that all the letters sort earlier than all the non-letters and so that
a tilde sorts before anything, even the end of a part.
"""
if a == b:
return 0
try:
for i, char in enumerate(a):
if char == b[i]:
continue
# "a tilde sorts before anything, even the end of a part"
# (emptyness)
if char == "~":
return -1
if b[i] == "~":
return 1
# "all the letters sort earlier than all the non-letters"
if char.isalpha() and not b[i].isalpha():
return -1
if not char.isalpha() and b[i].isalpha():
return 1
# otherwise lexical sort
if ord(char) > ord(b[i]):
return 1
if ord(char) < ord(b[i]):
return -1
except IndexError:
# a is longer than b but otherwise equal, greater unless there are tildes
if char == "~":
return -1
return 1
# if we get here, a is shorter than b but otherwise equal, so check for tildes...
if b[len(a)] == "~":
return 1
return -1
def _compare_revision_strings(self, first: str, second: str): # noqa: C901
"""Compare two debian revision strings."""
if first == second:
return 0
# listify pads results so that we will always be comparing ints to ints
# and strings to strings (at least until we fall off the end of a list)
first_list = self._listify(first)
second_list = self._listify(second)
if first_list == second_list:
return 0
try:
for i, item in enumerate(first_list):
# explicitly raise IndexError if we've fallen off the edge of list2
if i >= len(second_list):
raise IndexError
# if the items are equal, next
if item == second_list[i]:
continue
# numeric comparison
if isinstance(item, int):
if item > second_list[i]:
return 1
if item < second_list[i]:
return -1
else:
# string comparison
return self._dstringcmp(item, second_list[i])
except IndexError:
# rev1 is longer than rev2 but otherwise equal, hence greater
# ...except for goddamn tildes
if first_list[len(second_list)][0][0] == "~":
return 1
return 1
# rev1 is shorter than rev2 but otherwise equal, hence lesser
# ...except for goddamn tildes
if second_list[len(first_list)][0][0] == "~":
return -1
return -1
def _compare_version(self, other) -> int:
if (self.number, self.epoch) == (other.number, other.epoch):
return 0
if self.epoch < other.epoch:
return -1
if self.epoch > other.epoch:
return 1
# If none of these are true, follow the algorithm
upstream_version, debian_version = self._get_parts(self.number)
other_upstream_version, other_debian_version = self._get_parts(other.number)
upstream_cmp = self._compare_revision_strings(upstream_version, other_upstream_version)
if upstream_cmp != 0:
return upstream_cmp
debian_cmp = self._compare_revision_strings(debian_version, other_debian_version)
if debian_cmp != 0:
return debian_cmp
return 0
def __lt__(self, other) -> bool:
"""Less than magic method impl."""
return self._compare_version(other) < 0
def __eq__(self, other) -> bool:
"""Equality magic method impl."""
return self._compare_version(other) == 0
def __gt__(self, other) -> bool:
"""Greater than magic method impl."""
return self._compare_version(other) > 0
def __le__(self, other) -> bool:
"""Less than or equal to magic method impl."""
return self.__eq__(other) or self.__lt__(other)
def __ge__(self, other) -> bool:
"""Greater than or equal to magic method impl."""
return self.__gt__(other) or self.__eq__(other)
def __ne__(self, other) -> bool:
"""Not equal to magic method impl."""
return not self.__eq__(other)
def add_package(
package_names: Union[str, List[str]],
version: Optional[str] = "",
arch: Optional[str] = "",
update_cache: Optional[bool] = False,
) -> Union[DebianPackage, List[DebianPackage]]:
"""Add a package or list of packages to the system.
Args:
package_names: single package name, or list of package names
name: the name(s) of the package(s)
version: an (Optional) version as a string. Defaults to the latest known
arch: an optional architecture for the package
update_cache: whether or not to run `apt-get update` prior to operating
Raises:
TypeError if no package name is given, or explicit version is set for multiple packages
PackageNotFoundError if the package is not in the cache.
PackageError if packages fail to install
"""
cache_refreshed = False
if update_cache:
update()
cache_refreshed = True
packages = {"success": [], "retry": [], "failed": []}
package_names = [package_names] if isinstance(package_names, str) else package_names
if not package_names:
raise TypeError("Expected at least one package name to add, received zero!")
if len(package_names) != 1 and version:
raise TypeError(
"Explicit version should not be set if more than one package is being added!"
)
for p in package_names:
pkg, success = _add(p, version, arch)
if success:
packages["success"].append(pkg)
else:
logger.warning("failed to locate and install/update '%s'", pkg)
packages["retry"].append(p)
if packages["retry"] and not cache_refreshed:
logger.info("updating the apt-cache and retrying installation of failed packages.")
update()
for p in packages["retry"]:
pkg, success = _add(p, version, arch)
if success:
packages["success"].append(pkg)
else:
packages["failed"].append(p)
if packages["failed"]:
raise PackageError("Failed to install packages: {}".format(", ".join(packages["failed"])))
return packages["success"] if len(packages["success"]) > 1 else packages["success"][0]
def _add(
name: str,
version: Optional[str] = "",
arch: Optional[str] = "",
) -> Tuple[Union[DebianPackage, str], bool]:
"""Add a package to the system.
Args:
name: the name(s) of the package(s)
version: an (Optional) version as a string. Defaults to the latest known
arch: an optional architecture for the package
Returns: a tuple of `DebianPackage` if found, or a :str: if it is not, and
a boolean indicating success
"""
try:
pkg = DebianPackage.from_system(name, version, arch)
pkg.ensure(state=PackageState.Present)
return pkg, True
except PackageNotFoundError:
return name, False
def remove_package(
package_names: Union[str, List[str]]
) -> Union[DebianPackage, List[DebianPackage]]:
"""Remove package(s) from the system.
Args:
package_names: the name of a package
Raises:
PackageNotFoundError if the package is not found.
"""
packages = []
package_names = [package_names] if isinstance(package_names, str) else package_names
if not package_names:
raise TypeError("Expected at least one package name to add, received zero!")
for p in package_names:
try:
pkg = DebianPackage.from_installed_package(p)
pkg.ensure(state=PackageState.Absent)
packages.append(pkg)
except PackageNotFoundError:
logger.info("package '%s' was requested for removal, but it was not installed.", p)
# the list of packages will be empty when no package is removed
logger.debug("packages: '%s'", packages)
return packages[0] if len(packages) == 1 else packages
def update() -> None:
"""Update the apt cache via `apt-get update`."""
subprocess.run(["apt-get", "update", "--error-on=any"], capture_output=True, check=True)
def import_key(key: str) -> str:
"""Import an ASCII Armor key.
A Radix64 format keyid is also supported for backwards
compatibility. In this case Ubuntu keyserver will be
queried for a key via HTTPS by its keyid. This method
is less preferable because https proxy servers may
require traffic decryption which is equivalent to a
man-in-the-middle attack (a proxy server impersonates
keyserver TLS certificates and has to be explicitly
trusted by the system).
Args:
key: A GPG key in ASCII armor format, including BEGIN
and END markers or a keyid.
Returns:
The GPG key filename written.
Raises:
GPGKeyError if the key could not be imported
"""
key = key.strip()
if "-" in key or "\n" in key:
# Send everything not obviously a keyid to GPG to import, as
# we trust its validation better than our own. eg. handling
# comments before the key.
logger.debug("PGP key found (looks like ASCII Armor format)")
if (
"-----BEGIN PGP PUBLIC KEY BLOCK-----" in key
and "-----END PGP PUBLIC KEY BLOCK-----" in key
):
logger.debug("Writing provided PGP key in the binary format")
key_bytes = key.encode("utf-8")
key_name = DebianRepository._get_keyid_by_gpg_key(key_bytes)
key_gpg = DebianRepository._dearmor_gpg_key(key_bytes)
gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key_name)
DebianRepository._write_apt_gpg_keyfile(
key_name=gpg_key_filename, key_material=key_gpg
)
return gpg_key_filename
else:
raise GPGKeyError("ASCII armor markers missing from GPG key")
else:
logger.warning(
"PGP key found (looks like Radix64 format). "
"SECURELY importing PGP key from keyserver; "
"full key not provided."
)
# as of bionic add-apt-repository uses curl with an HTTPS keyserver URL
# to retrieve GPG keys. `apt-key adv` command is deprecated as is
# apt-key in general as noted in its manpage. See lp:1433761 for more
# history. Instead, /etc/apt/trusted.gpg.d is used directly to drop
# gpg
key_asc = DebianRepository._get_key_by_keyid(key)
# write the key in GPG format so that apt-key list shows it
key_gpg = DebianRepository._dearmor_gpg_key(key_asc.encode("utf-8"))
gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key)
DebianRepository._write_apt_gpg_keyfile(key_name=gpg_key_filename, key_material=key_gpg)
return gpg_key_filename
class InvalidSourceError(Error):
"""Exceptions for invalid source entries."""
class GPGKeyError(Error):
"""Exceptions for GPG keys."""
class DebianRepository:
"""An abstraction to represent a repository."""
def __init__(
self,
enabled: bool,
repotype: str,
uri: str,
release: str,
groups: List[str],
filename: Optional[str] = "",
gpg_key_filename: Optional[str] = "",
options: Optional[dict] = None,
):
self._enabled = enabled
self._repotype = repotype
self._uri = uri
self._release = release
self._groups = groups
self._filename = filename
self._gpg_key_filename = gpg_key_filename
self._options = options
@property
def enabled(self):
"""Return whether or not the repository is enabled."""
return self._enabled
@property
def repotype(self):
"""Return whether it is binary or source."""
return self._repotype
@property
def uri(self):
"""Return the URI."""
return self._uri
@property
def release(self):
"""Return which Debian/Ubuntu releases it is valid for."""
return self._release
@property
def groups(self):
"""Return the enabled package groups."""
return self._groups
@property
def filename(self):
"""Returns the filename for a repository."""
return self._filename
@filename.setter
def filename(self, fname: str) -> None:
"""Set the filename used when a repo is written back to disk.
Args:
fname: a filename to write the repository information to.
"""
if not fname.endswith(".list"):
raise InvalidSourceError("apt source filenames should end in .list!")
self._filename = fname
@property
def gpg_key(self):
"""Returns the path to the GPG key for this repository."""
return self._gpg_key_filename
@property
def options(self):
"""Returns any additional repo options which are set."""
return self._options
def make_options_string(self) -> str:
"""Generate the complete options string for a a repository.
Combining `gpg_key`, if set, and the rest of the options to find
a complex repo string.
"""
options = self._options if self._options else {}
if self._gpg_key_filename:
options["signed-by"] = self._gpg_key_filename
return (
"[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in options.items()]))
if options
else ""
)
@staticmethod
def prefix_from_uri(uri: str) -> str:
"""Get a repo list prefix from the uri, depending on whether a path is set."""
uridetails = urlparse(uri)
path = (
uridetails.path.lstrip("/").replace("/", "-") if uridetails.path else uridetails.netloc
)
return "/etc/apt/sources.list.d/{}".format(path)
@staticmethod
def from_repo_line(repo_line: str, write_file: Optional[bool] = True) -> "DebianRepository":
"""Instantiate a new `DebianRepository` a `sources.list` entry line.
Args:
repo_line: a string representing a repository entry
write_file: boolean to enable writing the new repo to disk
"""
repo = RepositoryMapping._parse(repo_line, "UserInput")
fname = "{}-{}.list".format(
DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-")
)
repo.filename = fname
options = repo.options if repo.options else {}
if repo.gpg_key:
options["signed-by"] = repo.gpg_key
# For Python 3.5 it's required to use sorted in the options dict in order to not have
# different results in the order of the options between executions.
options_str = (
"[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in sorted(options.items())]))
if options
else ""
)
if write_file:
with open(fname, "wb") as f:
f.write(
(
"{}".format("#" if not repo.enabled else "")
+ "{} {}{} ".format(repo.repotype, options_str, repo.uri)
+ "{} {}\n".format(repo.release, " ".join(repo.groups))
).encode("utf-8")
)
return repo
def disable(self) -> None:
"""Remove this repository from consideration.
Disable it instead of removing from the repository file.
"""
searcher = "{} {}{} {}".format(
self.repotype, self.make_options_string(), self.uri, self.release
)
for line in fileinput.input(self._filename, inplace=True):
if re.match(r"^{}\s".format(re.escape(searcher)), line):
print("# {}".format(line), end="")
else:
print(line, end="")
def import_key(self, key: str) -> None:
"""Import an ASCII Armor key.
A Radix64 format keyid is also supported for backwards
compatibility. In this case Ubuntu keyserver will be
queried for a key via HTTPS by its keyid. This method
is less preferable because https proxy servers may
require traffic decryption which is equivalent to a
man-in-the-middle attack (a proxy server impersonates
keyserver TLS certificates and has to be explicitly
trusted by the system).
Args:
key: A GPG key in ASCII armor format,
including BEGIN and END markers or a keyid.
Raises:
GPGKeyError if the key could not be imported
"""
self._gpg_key_filename = import_key(key)
@staticmethod
def _get_keyid_by_gpg_key(key_material: bytes) -> str:
"""Get a GPG key fingerprint by GPG key material.
Gets a GPG key fingerprint (40-digit, 160-bit) by the ASCII armor-encoded
or binary GPG key material. Can be used, for example, to generate file
names for keys passed via charm options.
"""
# Use the same gpg command for both Xenial and Bionic
cmd = ["gpg", "--with-colons", "--with-fingerprint"]
ps = subprocess.run(
cmd,
stdout=PIPE,
stderr=PIPE,
input=key_material,
)
out, err = ps.stdout.decode(), ps.stderr.decode()
if "gpg: no valid OpenPGP data found." in err:
raise GPGKeyError("Invalid GPG key material provided")
# from gnupg2 docs: fpr :: Fingerprint (fingerprint is in field 10)
return re.search(r"^fpr:{9}([0-9A-F]{40}):$", out, re.MULTILINE).group(1)
@staticmethod
def _get_key_by_keyid(keyid: str) -> str:
"""Get a key via HTTPS from the Ubuntu keyserver.
Different key ID formats are supported by SKS keyservers (the longer ones
are more secure, see "dead beef attack" and https://evil32.com/). Since
HTTPS is used, if SSLBump-like HTTPS proxies are in place, they will
impersonate keyserver.ubuntu.com and generate a certificate with
keyserver.ubuntu.com in the CN field or in SubjAltName fields of a
certificate. If such proxy behavior is expected it is necessary to add the
CA certificate chain containing the intermediate CA of the SSLBump proxy to
every machine that this code runs on via ca-certs cloud-init directive (via
cloudinit-userdata model-config) or via other means (such as through a
custom charm option). Also note that DNS resolution for the hostname in a
URL is done at a proxy server - not at the client side.
8-digit (32 bit) key ID
https://keyserver.ubuntu.com/pks/lookup?search=0x4652B4E6
16-digit (64 bit) key ID
https://keyserver.ubuntu.com/pks/lookup?search=0x6E85A86E4652B4E6
40-digit key ID:
https://keyserver.ubuntu.com/pks/lookup?search=0x35F77D63B5CEC106C577ED856E85A86E4652B4E6
Args:
keyid: An 8, 16 or 40 hex digit keyid to find a key for
Returns:
A string contining key material for the specified GPG key id
Raises:
subprocess.CalledProcessError
"""
# options=mr - machine-readable output (disables html wrappers)
keyserver_url = (
"https://keyserver.ubuntu.com" "/pks/lookup?op=get&options=mr&exact=on&search=0x{}"
)
curl_cmd = ["curl", keyserver_url.format(keyid)]
# use proxy server settings in order to retrieve the key
return check_output(curl_cmd).decode()
@staticmethod
def _dearmor_gpg_key(key_asc: bytes) -> bytes:
"""Convert a GPG key in the ASCII armor format to the binary format.
Args:
key_asc: A GPG key in ASCII armor format.
Returns:
A GPG key in binary format as a string
Raises:
GPGKeyError
"""
ps = subprocess.run(["gpg", "--dearmor"], stdout=PIPE, stderr=PIPE, input=key_asc)
out, err = ps.stdout, ps.stderr.decode()
if "gpg: no valid OpenPGP data found." in err:
raise GPGKeyError(
"Invalid GPG key material. Check your network setup"
" (MTU, routing, DNS) and/or proxy server settings"
" as well as destination keyserver status."
)
else:
return out
@staticmethod
def _write_apt_gpg_keyfile(key_name: str, key_material: bytes) -> None:
"""Write GPG key material into a file at a provided path.
Args:
key_name: A key name to use for a key file (could be a fingerprint)
key_material: A GPG key material (binary)
"""
with open(key_name, "wb") as keyf:
keyf.write(key_material)
class RepositoryMapping(Mapping):
"""An representation of known repositories.
Instantiation of `RepositoryMapping` will iterate through the
filesystem, parse out repository files in `/etc/apt/...`, and create
`DebianRepository` objects in this list.
Typical usage:
repositories = apt.RepositoryMapping()
repositories.add(DebianRepository(
enabled=True, repotype="deb", uri="https://example.com", release="focal",
groups=["universe"]
))
"""
def __init__(self):
self._repository_map = {}
# Repositories that we're adding -- used to implement mode param
self.default_file = "/etc/apt/sources.list"
# read sources.list if it exists
if os.path.isfile(self.default_file):
self.load(self.default_file)
# read sources.list.d
for file in glob.iglob("/etc/apt/sources.list.d/*.list"):
self.load(file)
def __contains__(self, key: str) -> bool:
"""Magic method for checking presence of repo in mapping."""
return key in self._repository_map
def __len__(self) -> int:
"""Return number of repositories in map."""
return len(self._repository_map)
def __iter__(self) -> Iterable[DebianRepository]:
"""Return iterator for RepositoryMapping."""
return iter(self._repository_map.values())
def __getitem__(self, repository_uri: str) -> DebianRepository:
"""Return a given `DebianRepository`."""
return self._repository_map[repository_uri]
def __setitem__(self, repository_uri: str, repository: DebianRepository) -> None:
"""Add a `DebianRepository` to the cache."""
self._repository_map[repository_uri] = repository
def load(self, filename: str):
"""Load a repository source file into the cache.
Args:
filename: the path to the repository file
"""
parsed = []
skipped = []
with open(filename, "r") as f:
for n, line in enumerate(f):
try:
repo = self._parse(line, filename)
except InvalidSourceError:
skipped.append(n)
else:
repo_identifier = "{}-{}-{}".format(repo.repotype, repo.uri, repo.release)
self._repository_map[repo_identifier] = repo
parsed.append(n)
logger.debug("parsed repo: '%s'", repo_identifier)
if skipped:
skip_list = ", ".join(str(s) for s in skipped)
logger.debug("skipped the following lines in file '%s': %s", filename, skip_list)
if parsed:
logger.info("parsed %d apt package repositories", len(parsed))
else:
raise InvalidSourceError("all repository lines in '{}' were invalid!".format(filename))
@staticmethod
def _parse(line: str, filename: str) -> DebianRepository:
"""Parse a line in a sources.list file.
Args:
line: a single line from `load` to parse
filename: the filename being read
Raises:
InvalidSourceError if the source type is unknown
"""
enabled = True
repotype = uri = release = gpg_key = ""
options = {}
groups = []
line = line.strip()
if line.startswith("#"):
enabled = False
line = line[1:]
# Check for "#" in the line and treat a part after it as a comment then strip it off.
i = line.find("#")
if i > 0:
line = line[:i]
# Split a source into substrings to initialize a new repo.
source = line.strip()
if source:
# Match any repo options, and get a dict representation.
for v in re.findall(OPTIONS_MATCHER, source):
opts = dict(o.split("=") for o in v.strip("[]").split())
# Extract the 'signed-by' option for the gpg_key
gpg_key = opts.pop("signed-by", "")
options = opts
# Remove any options from the source string and split the string into chunks
source = re.sub(OPTIONS_MATCHER, "", source)
chunks = source.split()
# Check we've got a valid list of chunks
if len(chunks) < 3 or chunks[0] not in VALID_SOURCE_TYPES:
raise InvalidSourceError("An invalid sources line was found in %s!", filename)
repotype = chunks[0]
uri = chunks[1]
release = chunks[2]
groups = chunks[3:]
return DebianRepository(
enabled, repotype, uri, release, groups, filename, gpg_key, options
)
else:
raise InvalidSourceError("An invalid sources line was found in %s!", filename)
def add(self, repo: DebianRepository, default_filename: Optional[bool] = False) -> None:
"""Add a new repository to the system.
Args:
repo: a `DebianRepository` object
default_filename: an (Optional) filename if the default is not desirable
"""
new_filename = "{}-{}.list".format(
DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-")
)
fname = repo.filename or new_filename
options = repo.options if repo.options else {}
if repo.gpg_key:
options["signed-by"] = repo.gpg_key
with open(fname, "wb") as f:
f.write(
(
"{}".format("#" if not repo.enabled else "")
+ "{} {}{} ".format(repo.repotype, repo.make_options_string(), repo.uri)
+ "{} {}\n".format(repo.release, " ".join(repo.groups))
).encode("utf-8")
)
self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo
def disable(self, repo: DebianRepository) -> None:
"""Remove a repository. Disable by default.
Args:
repo: a `DebianRepository` to disable
"""
searcher = "{} {}{} {}".format(
repo.repotype, repo.make_options_string(), repo.uri, repo.release
)
for line in fileinput.input(repo.filename, inplace=True):
if re.match(r"^{}\s".format(re.escape(searcher)), line):
print("# {}".format(line), end="")
else:
print(line, end="")
self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo