Core interface (MWDB)

class mwdblib.MWDB(api: Optional[mwdblib.api.api.APIClient] = None, **api_options: Any)[source]

Main object used for communication with MWDB REST API

Parameters
  • api_url – MWDB API URL that ends with ‘/api/’.

  • api_key – MWDB API key

  • username – MWDB account username

  • password – MWDB account password

  • verify_ssl – Verify SSL certificate correctness (default: True)

  • obey_ratelimiter – If false, HTTP 429 errors will cause an exception like all other error codes. If true (default), library will transparently handle them by sleeping for a specified duration.

  • retry_on_downtime – If true, requests will be automatically retried after 10 seconds on HTTP 502/504 and ConnectionError.

  • max_downtime_retries – Number of retries caused by temporary downtime

  • downtime_timeout – How long we need to wait between retries (in seconds)

  • retry_idempotent – Retry idempotent POST requests (default). The only thing that is really non-idempotent in current API is MWDBObject.add_comment(), so it’s not a big deal. You can turn it off if possible doubled comments are problematic in your MWDB instance.

  • use_keyring – If true (default), APIClient uses keyring to fetch stored credentials. If not, they’re fetched from plaintext configuration.

  • emit_warnings – If true (default), warnings are emitted by APIClient

  • config_path – Path to the configuration file (default is ~/.mwdb). If None, configuration file will not be used by APIClient

  • api (mwdblib.APIClient, optional) – Custom APIClient to be used for communication with MWDB

New in version 2.6.0: API request will sleep for a dozen of seconds when rate limit has been exceeded.

New in version 3.2.0: You can enable retry_on_downtime to automatically retry requests in case of HTTP 502/504 or ConnectionError.

Changed in version 4.0.0: MWDB by default uses credentials and api_url set by mwdb login. If you don’t want to automatically fetch them from configuration, pass config_path=None to the constructor

New in version 4.0.0: Added use_keyring, emit_warnings and config_path options. username and password can be passed directly to the constructor.

Usage example:

from mwdblib import MWDB

mwdb = MWDB()
mwdb.login("example", "<password>")

file = mwdb.query_file("3629344675705286607dd0f680c66c19f7e310a1")
count(query: Optional[str] = None) → int[source]

Returns number of objects matching provided query in Lucene syntax. If you already know type of objects you want to count, use specialized variants:

Usage example:

from mwdblib import MWDB

mwdb = MWDB()

# Count samples tagged as evil and with size less than 100kB
result = mwdb.count_files("tag:evil AND file.size:[0 TO 100000]")
Parameters

query (str, optional) – Query in Lucene syntax

Return type

int

Raises

requests.exceptions.HTTPError

count_blobs(query: Optional[str] = None) → int[source]

Returns number of blobs matching provided query in Lucene syntax.

Parameters

query (str, optional) – Query in Lucene syntax

Return type

int

Raises

requests.exceptions.HTTPError

count_configs(query: Optional[str] = None) → int[source]

Returns number of configs matching provided query in Lucene syntax.

Parameters

query (str, optional) – Query in Lucene syntax

Return type

int

Raises

requests.exceptions.HTTPError

count_files(query: Optional[str] = None) → int[source]

Returns number of files matching provided query in Lucene syntax.

Parameters

query (str, optional) – Query in Lucene syntax

Return type

int

Raises

requests.exceptions.HTTPError

listen_for_blobs(last_object: Union[mwdblib.blob.MWDBBlob, str, None] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) → Iterator[mwdblib.blob.MWDBBlob][source]

Listens for recent blobs and yields newly added.

See also

More details can be found here: listen_for_objects()

New in version 3.2.0: Added listen_for_* methods

New in version 3.4.0: Added query parameter

New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object

Parameters
  • last_object (MWDBBlob or str) – MWDBBlob instance or object hash

  • blocking (bool, optional) – Enable blocking mode (default)

  • interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)

  • query (str, optional) – Lucene query to be used for listening for only specific blobs

Return type

Iterator[MWDBBlob]

listen_for_configs(last_object: Union[mwdblib.config.MWDBConfig, str, None] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) → Iterator[mwdblib.config.MWDBConfig][source]

Listens for recent configs and yields newly added.

See also

More details can be found here: listen_for_objects()

New in version 3.2.0: Added listen_for_* methods

New in version 3.4.0: Added query parameter

New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object

Parameters
  • last_object (MWDBConfig or str) – MWDBConfig instance or object hash

  • blocking (bool, optional) – Enable blocking mode (default)

  • interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)

  • query (str, optional) – Lucene query to be used for listening for only specific configs

Return type

Iterator[MWDBConfig]

listen_for_files(last_object: Union[mwdblib.file.MWDBFile, str, None] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) → Iterator[mwdblib.file.MWDBFile][source]

Listens for recent files and yields newly added.

See also

More details can be found here: listen_for_objects()

New in version 3.2.0: Added listen_for_* methods

New in version 3.4.0: Added query parameter

New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object

Parameters
  • last_object (MWDBFile or str) – MWDBFile instance or object hash

  • blocking (bool, optional) – Enable blocking mode (default)

  • interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)

  • query (str, optional) – Lucene query to be used for listening for only specific files

Return type

Iterator[MWDBFile]

listen_for_objects(last_object: Union[mwdblib.object.MWDBObject, str, None] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) → Iterator[mwdblib.object.MWDBObject][source]

Listens for recent objects and yields newly added.

In blocking mode (default) if last_object is provided: the method fetches the latest objects until the provided object is reached and yields new objects from the oldest one. Otherwise, the method periodically asks for recent objects until a new object appears. The default request interval is 15 seconds.

In a non-blocking mode: a generator stops if there are no more objects to fetch.

last_object argument accepts both identifier and MWDBObject instance. If the object identifier is provided: method firstly checks whether the object exists in repository and has the correct type.

If you already know type of object you are looking for, use specialized variants:

Warning

Make sure that last_object is valid in MWDB instance. If you provide MWDBObject that doesn’t exist, mwdblib will iterate over all objects and you can quickly hit your rate limit. Library is trying to protect you from that as much as possible by checking type and object existence, but it’s still possible to do something unusual.

Additionally, if using the query parameter and passing the last_object pivot, make sure that the passed object actually matches the query criteria. Otherwise the mechanism that catches faulty pivots will signal that there’s something wrong and raise an exception.

New in version 3.2.0: Added listen_for_* methods

New in version 3.4.0: Added query parameter

New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object

Parameters
  • last_object (MWDBObject or str) – MWDBObject instance or object hash

  • blocking (bool, optional) – Enable blocking mode (default)

  • interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)

  • query (str, optional) – Lucene query to be used for listening for only specific objects

Return type

Iterator[MWDBObject]

login(username: Optional[str] = None, password: Optional[str] = None) → None[source]

Performs user authentication using provided username and password.

Warning

Keep in mind that password-authenticated sessions are short lived, so password needs to be stored in APIClient object. Consider generating a new API key in your MWDB profile.

New in version 2.4.0: MWDB tries to reauthenticate on first Unauthorized exception

New in version 2.5.0: username and password arguments are optional. If one of the credentials is not provided via arguments, user will be asked for it.

New in version 2.6.0: MWDB.login() will warn if login is called after setting up API key

Changed in version 4.0.0: MWDB.login() no longer warns about password-authenticated sessions or credentials that are already set up.

Parameters
  • username (str) – User name

  • password (str) – Password

Raises

requests.exceptions.HTTPError

logout() → None[source]

Performs session logout and removes previously set API key.

property options

Returns object with current configuration of MWDB client

New in version 4.0.0: Added MWDB.options property.

query(hash: str, raise_not_found: bool = True) → Optional[mwdblib.object.MWDBObject][source]

Queries for object using provided hash. If you already know type of object you are looking for, use specialized variants:

New in version 2.4.0: Added raise_not_found optional argument

Changed in version 3.0.0: Fallback to query_file() if other hash than SHA256 was provided

Parameters
  • hash (str) – Object hash (identifier, MD5, SHA-1, SHA-2)

  • raise_not_found (bool, optional) – If True (default), method raises HTTPError when object is not found

Return type

MWDBObject or None (if raise_not_found=False)

Raises

requests.exceptions.HTTPError

query_blob(hash: str, raise_not_found: bool = True) → Optional[mwdblib.blob.MWDBBlob][source]

Queries for blob object using provided hash

Parameters
  • hash (str) – Object hash (SHA-256 identifier)

  • raise_not_found (bool) – If True (default), method raises HTTPError when object is not found

Return type

MWDBBlob or None (if raise_not_found=False)

Raises

requests.exceptions.HTTPError

query_config(hash: str, raise_not_found: bool = True) → Optional[mwdblib.config.MWDBConfig][source]

Queries for configuration object using provided hash

Parameters
  • hash (str) – Object hash (SHA-256 identifier)

  • raise_not_found (bool) – If True (default), method raises HTTPError when object is not found

Return type

MWDBConfig or None (if raise_not_found=False)

Raises

requests.exceptions.HTTPError

query_file(hash: str, raise_not_found: bool = True) → Optional[mwdblib.file.MWDBFile][source]

Queries for file using provided hash

Parameters
  • hash (str) – Object hash (identifier, MD5, SHA-1, SHA-2)

  • raise_not_found (bool) – If True (default), method raises HTTPError when object is not found

Return type

MWDBFile or None (if raise_not_found=False)

Raises

requests.exceptions.HTTPError

recent_blobs() → Iterator[mwdblib.blob.MWDBBlob][source]

Retrieves recently uploaded blob objects

Return type

Iterator[MWDBBlob]

Raises

requests.exceptions.HTTPError

recent_configs() → Iterator[mwdblib.config.MWDBConfig][source]

Retrieves recently uploaded configuration objects

Return type

Iterator[MWDBConfig]

Raises

requests.exceptions.HTTPError

recent_files() → Iterator[mwdblib.file.MWDBFile][source]

Retrieves recently uploaded files

Return type

Iterator[MWDBFile]

Raises

requests.exceptions.HTTPError

recent_objects() → Iterator[mwdblib.object.MWDBObject][source]

Retrieves recently uploaded objects If you already know type of object you are looking for, use specialized variants:

Usage example:

from mwdblib import MWDB
from itertools import islice

mwdb = MWDB()
mwdb.login("admin", "password123")

# recent_files is generator, do not execute list(recent_files)!
files = islice(mwdb.recent_files(), 25)
print([(f.name, f.tags) for f in files])
Return type

Iterator[MWDBObject]

Raises

requests.exceptions.HTTPError

search(query: str) → Iterator[mwdblib.object.MWDBObject][source]

Advanced search for objects using Lucene syntax. If you already know type of objects you are looking for, use specialized variants:

Usage example:

from mwdblib import MWDB

# Search for samples tagged as evil and with size less than 100kB
results = mwdb.search_files("tag:evil AND file.size:[0 TO 100000]")
Parameters

query (str) – Search query

Return type

Iterator[MWDBObject]

Raises

requests.exceptions.HTTPError

search_blobs(query: str) → Iterator[mwdblib.blob.MWDBBlob][source]

Advanced search for blob objects using Lucene syntax.

Parameters

query (str) – Search query

Return type

Iterator[MWDBBlob]

Raises

requests.exceptions.HTTPError

search_configs(query: str) → Iterator[mwdblib.config.MWDBConfig][source]

Advanced search for configuration objects using Lucene syntax.

Parameters

query (str) – Search query

Return type

Iterator[MWDBConfig]

Raises

requests.exceptions.HTTPError

search_files(query: str) → Iterator[mwdblib.file.MWDBFile][source]

Advanced search for files using Lucene syntax.

Parameters

query (str) – Search query

Return type

Iterator[MWDBFile]

Raises

requests.exceptions.HTTPError

upload_blob(name: str, type: str, content: str, parent: Union[mwdblib.object.MWDBObject, str, None] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) → mwdblib.blob.MWDBBlob[source]

Upload blob object

Parameters
  • name (str) – Blob name (see also MWDBBlob.blob_name)

  • type (str) – Blob type (see also MWDBBlob.blob_type)

  • content (str) – Blob content (see also MWDBBlob.content)

  • parent (MWDBObject or str, optional) – Parent object or parent identifier

  • metakeys (dict, optional) – Dictionary with metakeys. If you want to set many values with the same key: use list as value

  • share_with (str, optional) – Group name you want to share object with

  • private (bool, optional) – True if sample should be uploaded as private

  • public (bool, optional) – True if sample should be visible for everyone

Return type

MWDBBlob

Raises

requests.exceptions.HTTPError, ValueError

upload_config(family: str, cfg: Dict[str, Any], config_type: str = 'static', parent: Union[mwdblib.object.MWDBObject, str, None] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) → mwdblib.config.MWDBConfig[source]

Upload configuration object

Parameters
  • family (str) – Malware family name (see also MWDBConfig.family)

  • cfg (dict) – Dict object with configuration (see also MWDBConfig.cfg)

  • config_type (str, optional) – Configuration type (default: static, see also MWDBConfig.type)

  • parent (MWDBObject or str, optional) – Parent object or parent identifier

  • metakeys (dict, optional) – Dictionary with metakeys. If you want to set many values with the same key: use list as value

  • share_with (str, optional) – Group name you want to share object with

  • private (bool, optional) – True if sample should be uploaded as private

  • public (bool, optional) – True if sample should be visible for everyone

Return type

MWDBConfig

Raises

requests.exceptions.HTTPError, ValueError

mwdb.upload_config(
    "evil",
    {
        "botnet": "mal0123",
        "version": 2019,
        "urls": [
            "http://example.com",
            "http://example.com/2"
        ]
    }
    parent="3629344675705286607dd0f680c66c19f7e310a1",
    public=True)
upload_file(name: str, content: bytes, parent: Union[mwdblib.object.MWDBObject, str, None] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) → mwdblib.file.MWDBFile[source]

Upload file object

Parameters
  • name (str) – Original file name (see also MWDBFile.file_name)

  • content (bytes) – File contents

  • parent (MWDBObject or str, optional) – Parent object or parent identifier

  • metakeys (dict, optional) – Dictionary with metakeys. If you want to set many values with the same key: use list as value

  • share_with (str, optional) – Group name you want to share object with

  • private (bool, optional) – True if sample should be uploaded as private

  • public (bool, optional) – True if sample should be visible for everyone

Return type

MWDBFile

Usage example:

mwdb.upload_file(
    "malware.exe",
    open("malware.exe", "rb").read(),
    parent="3629344675705286607dd0f680c66c19f7e310a1",
    public=True)
class mwdblib.APIClient(_auth_token: Optional[str] = None, **api_options: Any)[source]

Client for MWDB REST API that performs authentication and low-level API request/response handling.

If you want to send arbitrary request to MWDB API, use get(), post(), put() and delete() methods using MWDB.api property.

mwdb = MWDB()
...
# Deletes object with given sha256
mwdb.api.delete(f'object/{sha256}')
request(method: str, url: str, noauth: bool = False, raw: bool = False, *args: Any, **kwargs: Any) → Any[source]

Sends request to MWDB API.

Other keyword arguments are the same as in requests library.

Parameters
  • method – HTTP method

  • url – Relative url of API endpoint

  • noauth


    Don’t check if user is authenticated before sending request (default: False)

  • raw – Return raw response bytes instead of parsed JSON (default: False)

static requires(required_version: str, always_check_version: bool = False) → Callable[source]

Method decorator that provides server version requirement and fallback to older implementation if available.

To optimize requests sent by CLI: first method is called always if server version is not already available. If it fails with EndpointNotFoundError, server version is fetched and used to determine if fallback is available.

If your method fails on something different than missing endpoint, you can check version always by enabling always_check_version flag.