Source code for mwdblib.api.options

import configparser
import pathlib
import warnings
from typing import Any, Optional, Type

import keyring
import keyring.errors


class OptionsField:
    def __init__(
        self, default_value: Any = None, value_type: Optional[Type] = None
    ) -> None:
        self.default_value: Any = default_value
        self.nullable: bool = default_value is None
        self.value_type: Type = value_type or type(default_value)

    def load_from_config(
        self, instance: Any, config_parser: configparser.ConfigParser, section: str
    ) -> None:
        """
        Loads value from configuration and overrides field if value is set.
        """
        value: Optional[Any]
        if self.value_type is bool:
            value = config_parser.getboolean(section, self.name, fallback=None)
        elif self.value_type is int:
            value = config_parser.getint(section, self.name, fallback=None)
        else:
            value = config_parser.get(section, self.name, fallback=None)
        if value is not None:
            self.__set__(instance, value)

    def load_from_dict(self, instance: Any, dictionary: dict) -> None:
        """
        Loads value from dictionary if set. Accepts None.
        """
        if self.name in dictionary:
            self.__set__(instance, dictionary[self.name])

    def __set_name__(self, owner: Any, name: str) -> None:
        self.name = name
        self.instance_name = "_" + name

    def __get__(self, instance: Any, owner: Any) -> Any:
        if hasattr(instance, self.instance_name):
            return getattr(instance, self.instance_name)
        return self.default_value

    def __set__(self, instance: Any, value: Any) -> None:
        if not (self.nullable and value is None) and type(value) is not self.value_type:
            raise TypeError(
                f"Expected '{self.name}' to be {self.value_type} not {type(value)}"
            )
        setattr(instance, self.instance_name, value)


[docs]class APIClientOptions: """ Options bag that contains configuration for APIClient. Field values are loaded using the following precedence: - built-in defaults accessible via class properties e.g. ``APIClientOptions.api_url`` - values from ``~/.mwdb`` configuration file - values passed as an arguments to the ``APIClientOptions`` constructor Configuration may depend on ``api_url`` value, so remember to set it if you want to talk with specific MWDB Core instance. """ # Register fields and defaults api_url = OptionsField("https://mwdb.cert.pl/api/") api_key = OptionsField(value_type=str) username = OptionsField(value_type=str) password = OptionsField(value_type=str) verify_ssl = OptionsField(True) obey_ratelimiter = OptionsField(True) retry_on_downtime = OptionsField(False) max_downtime_retries = OptionsField(5) downtime_timeout = OptionsField(10) retry_idempotent = OptionsField(True) use_keyring = OptionsField(True) emit_warnings = OptionsField(True) # General options that can be set both globally or for specific instance GENERAL_OPTIONS = [ verify_ssl, obey_ratelimiter, retry_on_downtime, max_downtime_retries, downtime_timeout, retry_idempotent, use_keyring, emit_warnings, ] # Options that apply only to global mwdblib configuration GLOBAL_ONLY_OPTIONS = [api_url] # Options that apply only to specific MWDB instance INSTANCE_ONLY_OPTIONS = [api_key, username, password] # Configuration priority (from least important): # - global configuration section [mwdb] # - instance configuration section [mwdb:<api_url>] # - api_options keyword arguments def __init__( self, config_path: Optional[pathlib.Path] = (pathlib.Path.home() / ".mwdb"), **api_options: Any, ) -> None: self.config_parser: configparser.ConfigParser = configparser.ConfigParser() if config_path is not None: # Ensure that config_path is Path object self.config_path: Optional[pathlib.Path] = pathlib.Path(config_path) # Read configuration from provided path or do nothing if doesn't exist self.config_parser.read([self.config_path]) else: # If config_path is None, assume that user doesn't want to # fetch credentials from keyring as well self.config_path = None self.use_keyring = False # For each settings item: override defaults by global configuration for option in self.GENERAL_OPTIONS + self.GLOBAL_ONLY_OPTIONS: option.load_from_config(self, self.config_parser, "mwdb") option.load_from_dict(self, api_options) # Normalize api_url if not self.api_url.endswith("/"): self.api_url += "/" if not self.api_url.endswith("/api/") and self.emit_warnings: warnings.warn( "APIClient.api_url doesn't end with '/api/'. Make sure you have passed " "URL to the REST API instead of MWDB UI" ) # Load general settings from instance configuration for option in self.GENERAL_OPTIONS + self.INSTANCE_ONLY_OPTIONS: option.load_from_config(self, self.config_parser, f"mwdb:{self.api_url}") option.load_from_dict(self, api_options) # If keyring is used: fetch credentials from keyring # Otherwise: assume that they're stored plaintext in configuration if ( self.username is not None and self.use_keyring and "api_key" not in api_options and "password" not in api_options ): self.api_key = keyring.get_password( f"mwdb-apikey:{self.api_url}", self.username ) # If api_key not set: try to fetch password if self.api_key is None: self.password = keyring.get_password( f"mwdb:{self.api_url}", self.username )
[docs] def clear_stored_credentials(self, config_writeback: bool = True) -> bool: """ Clears stored credentials in configuration for current user. Used by ``mwdb logout`` CLI command. """ if not self.username: return False # Remove credentials from keyring if self.use_keyring: try: keyring.delete_password(f"mwdb-apikey:{self.api_url}", self.username) except keyring.errors.PasswordDeleteError: pass try: keyring.delete_password(f"mwdb:{self.api_url}", self.username) except keyring.errors.PasswordDeleteError: pass instance_section = f"mwdb:{self.api_url}" # Remove credentials from configuration if self.config_parser.has_section(instance_section): if self.config_parser.has_option(instance_section, "username"): self.config_parser.remove_option(instance_section, "username") if self.config_parser.has_option(instance_section, "password"): self.config_parser.remove_option(instance_section, "password") if self.config_parser.has_option(instance_section, "api_key"): self.config_parser.remove_option(instance_section, "api_key") if config_writeback and self.config_path: with self.config_path.open("w") as f: self.config_parser.write(f) return True
[docs] def store_credentials( self, username: Optional[str], password: Optional[str], api_key: Optional[str] ) -> bool: """ Stores current credentials in configuration for current user. Used by ``mwdb login`` CLI command. """ if not username or (not api_key and not password): return False self.username = username self.password = password self.api_key = api_key # Clear currently stored credentials self.clear_stored_credentials(config_writeback=False) # Ensure that 'mwdb' section exists in configuration if not self.config_parser.has_section("mwdb"): self.config_parser.add_section("mwdb") # Set api_url information self.config_parser.set("mwdb", "api_url", self.api_url) # Set credentials for instance instance_section = f"mwdb:{self.api_url}" if not self.config_parser.has_section(instance_section): self.config_parser.add_section(instance_section) self.config_parser.set(instance_section, "username", self.username) # Set credentials if self.use_keyring: if self.api_key: keyring.set_password( f"mwdb-apikey:{self.api_url}", self.username, self.api_key ) elif self.password: keyring.set_password( f"mwdb:{self.api_url}", self.username, self.password ) else: raise RuntimeError("Implementation error: no api_key nor password") self.config_parser.set(instance_section, "use_keyring", "1") else: if self.api_key: self.config_parser.set(instance_section, "api_key", self.api_key) elif self.password: self.config_parser.set(instance_section, "password", self.password) else: raise RuntimeError("Implementation error: no api_key nor password") self.config_parser.set(instance_section, "use_keyring", "0") # Perform configuration writeback if self.config_path: with self.config_path.open("w") as f: self.config_parser.write(f) return True