Charmed MongoDB

Channel Revision Published Runs on
5/stable 117 20 Apr 2023
Ubuntu 22.04
5/candidate 117 20 Apr 2023
Ubuntu 22.04
5/edge 127 19 Sep 2023
Ubuntu 22.04
3.6/stable 100 28 Apr 2023
Ubuntu 20.04 Ubuntu 18.04
3.6/candidate 100 13 Apr 2023
Ubuntu 20.04 Ubuntu 18.04
3.6/edge 100 03 Feb 2023
Ubuntu 20.04 Ubuntu 18.04
juju deploy mongodb --channel 5/stable
Show information




"""Simple functions, which can be used in both K8s and VM charms."""
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import json
import logging
import os
import secrets
import string
import subprocess
from typing import List, Optional, Union

from charms.mongodb.v0.mongodb import MongoDBConfiguration, MongoDBConnection
from ops.model import (
from pymongo.errors import AutoReconnect, ServerSelectionTimeoutError

# The unique Charmhub library identifier, never change it
LIBID = "b9a7fe0c38d8486a9d1ce94c27d4758e"

# Increment this major API version when introducing breaking changes

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version

# path to store mongodb ketFile
KEY_FILE = "keyFile"
TLS_EXT_PEM_FILE = "external-cert.pem"
TLS_EXT_CA_FILE = "external-ca.crt"
TLS_INT_PEM_FILE = "internal-cert.pem"
TLS_INT_CA_FILE = "internal-ca.crt"

MONGODB_COMMON_DIR = "/var/snap/charmed-mongodb/common"
MONGODB_SNAP_DATA_DIR = "/var/snap/charmed-mongodb/current"

DATA_DIR = "/var/lib/mongodb"
CONF_DIR = "/etc/mongod"
MONGODB_LOG_FILENAME = "mongodb.log"
logger = logging.getLogger(__name__)

# noinspection GrazieInspection
def get_create_user_cmd(
    config: MongoDBConfiguration, mongo_path="charmed-mongodb.mongo"
) -> List[str]:
    """Creates initial admin user for MongoDB.

    Initial admin user can be created only through localhost connection.
    unfortunately, pymongo not able to create connection which considered
    as local connection by MongoDB, even if socket connection used.
    As result where are only hackish ways to create initial user.
    It is needed to install mongodb-clients inside charm container to make
    this function work correctly
    return [
        f"  user: '{config.username}',"
        "  pwd: passwordPrompt(),"
        "  roles:["
        "    {'role': 'userAdminAnyDatabase', 'db': 'admin'}, "
        "    {'role': 'readWriteAnyDatabase', 'db': 'admin'}, "
        "    {'role': 'clusterAdmin', 'db': 'admin'}, "
        "  ],"
        "  mechanisms: ['SCRAM-SHA-256'],"
        "  passwordDigestor: 'server',"

def get_mongod_args(
    config: MongoDBConfiguration,
    auth: bool = True,
    snap_install: bool = False,
) -> str:
    """Construct the MongoDB startup command line.

        A string representing the command used to start MongoDB.
    full_data_dir = f"{MONGODB_COMMON_DIR}{DATA_DIR}" if snap_install else DATA_DIR
    full_conf_dir = f"{MONGODB_SNAP_DATA_DIR}{CONF_DIR}" if snap_install else CONF_DIR
    # in k8s the default logging options that are used for the vm charm are ignored and logs are
    # the output of the container. To enable logging to a file it must be set explicitly
    logging_options = "" if snap_install else f"--logpath={full_data_dir}/{MONGODB_LOG_FILENAME}"
    cmd = [
        # bind to localhost and external interfaces
        # part of replicaset
        # db must be located within the snap common directory since the snap is strictly confined
    if auth:

    if auth and not config.tls_internal:
        # keyFile cannot be used without auth and cannot be used in tandem with internal TLS

    if config.tls_external:
                # allow non-TLS connections

    # internal TLS can be enabled only in external is enabled
    if config.tls_internal and config.tls_external:

    return " ".join(cmd)

def generate_password() -> str:
    """Generate a random password string.

       A random password string.
    choices = string.ascii_letters + string.digits
    return "".join([secrets.choice(choices) for _ in range(32)])

def generate_keyfile() -> str:
    """Key file used for authentication between replica set peers.

       A maximum allowed random string.
    choices = string.ascii_letters + string.digits
    return "".join([secrets.choice(choices) for _ in range(1024)])

def build_unit_status(mongodb_config: MongoDBConfiguration, unit_ip: str) -> StatusBase:
    """Generates the status of a unit based on its status reported by mongod."""
        with MongoDBConnection(mongodb_config) as mongo:
            replset_status = mongo.get_replset_status()

            if unit_ip not in replset_status:
                return WaitingStatus("Member being added..")

            replica_status = replset_status[unit_ip]

            if replica_status == "PRIMARY":
                return ActiveStatus("Primary")
            elif replica_status == "SECONDARY":
                return ActiveStatus("")
            elif replica_status in ["STARTUP", "STARTUP2", "ROLLBACK", "RECOVERING"]:
                return WaitingStatus("Member is syncing...")
            elif replica_status == "REMOVED":
                return WaitingStatus("Member is removing...")
                return BlockedStatus(replica_status)
    except ServerSelectionTimeoutError as e:
        # ServerSelectionTimeoutError is commonly due to ReplicaSetNoPrimary
        logger.debug("Got error: %s, while checking replica set status", str(e))
        return WaitingStatus("Waiting for primary re-election..")
    except AutoReconnect as e:
        # AutoReconnect is raised when a connection to the database is lost and an attempt to
        # auto-reconnect will be made by pymongo.
        logger.debug("Got error: %s, while checking replica set status", str(e))
        return WaitingStatus("Waiting to reconnect to unit..")

def copy_licenses_to_unit():
    """Copies licenses packaged in the snap to the charm's licenses directory."""
    os.makedirs("src/licenses", exist_ok=True)
    subprocess.check_output("cp LICENSE src/licenses/LICENSE-charm", shell=True)
        "cp -r /snap/charmed-mongodb/current/licenses/* src/licenses", shell=True

_StrOrBytes = Union[str, bytes]

def process_pbm_error(error_string: Optional[_StrOrBytes]) -> str:
    """Parses pbm error string and returns a user friendly message."""
    message = "couldn't configure s3 backup option"
    if not error_string:
        return message
    if type(error_string) == bytes:
        error_string = error_string.decode("utf-8")
    if "status code: 403" in error_string:  # type: ignore
        message = "s3 credentials are incorrect."
    elif "status code: 404" in error_string:  # type: ignore
        message = "s3 configurations are incompatible."
    elif "status code: 301" in error_string:  # type: ignore
        message = "s3 configurations are incompatible."
    return message

def current_pbm_op(pbm_status: str) -> str:
    """Parses pbm status for the operation that pbm is running."""
    pbm_status = json.loads(pbm_status)
    return pbm_status["running"] if "running" in pbm_status else ""

def process_pbm_status(pbm_status: str) -> StatusBase:
    """Parses current pbm operation and returns unit status."""
    current_op = current_pbm_op(pbm_status)
    # no operations are currently running with pbm
    if current_op == {}:
        return ActiveStatus("")

    if current_op["type"] == "backup":
        backup_id = current_op["name"]
        return MaintenanceStatus(f"backup started/running, backup id:'{backup_id}'")

    if current_op["type"] == "restore":
        backup_id = current_op["name"]
        return MaintenanceStatus(f"restore started/running, backup id:'{backup_id}'")

    if current_op["type"] == "resync":
        return WaitingStatus("waiting to sync s3 configurations.")

    return ActiveStatus()