Core interface (MWDB)
- class mwdblib.MWDB(api: Optional[mwdblib.api.api.APIClient] = None, **api_options: Any)[source]
Main object used for communication with MWDB REST API
- Parameters
api_url – MWDB API URL (that ends with ‘/api/’).
api_key – MWDB API key
username – MWDB account username
password – MWDB account password
autologin – Login automatically using credentials stored in configuration or provided in arguments (default: True)
verify_ssl – Verify SSL certificate correctness (default: True)
obey_ratelimiter – If
False
, HTTP 429 errors will cause an exception like all other error codes. IfTrue
, library will transparently handle them by sleeping for a specified duration. Default isTrue
.retry_on_downtime – If
True
, requests will be automatically retried afterdowntime_timeout
seconds on HTTP 502/504 and ConnectionError. Default isFalse
.max_downtime_retries – Number of retries caused by temporary downtime
downtime_timeout – How long we need to wait between retries (in seconds). Default is 10.
retry_idempotent – Retry idempotent POST requests (default). The only thing that is really non-idempotent in current API is
MWDBObject.add_comment()
, so it’s not a big deal. You can turn it off if possible doubled comments are problematic in your MWDB instance. Default isTrue
.use_keyring – If
True
, APIClient uses keyring to fetch stored credentials. If not, they’re fetched from plaintext configuration. Default isTrue
.emit_warnings – If
True
, warnings are emitted by APIClient. Default isTrue
.config_path – Path to the configuration file (default is ~/.mwdb). If None, configuration file will not be used by APIClient
api (
mwdblib.APIClient
, optional) – CustomAPIClient
to be used for communication with MWDB
New in version 2.6.0: API request will sleep for a dozen of seconds when rate limit has been exceeded.
New in version 3.2.0: You can enable
retry_on_downtime
to automatically retry requests in case of HTTP 502/504 or ConnectionError.Changed in version 4.0.0:
MWDB
by default uses credentials and api_url set by mwdb login. If you don’t want to automatically fetch them from configuration, pass config_path=None to the constructorNew in version 4.0.0: Added
use_keyring
,emit_warnings
andconfig_path
options.username
andpassword
can be passed directly to the constructor.New in version 4.4.0: Added
autologin
option.Usage example:
from mwdblib import MWDB mwdb = MWDB() mwdb.login("example", "<password>") file = mwdb.query_file("3629344675705286607dd0f680c66c19f7e310a1")
- count(query: Optional[str] = None) int [source]
Returns number of objects matching provided query in Lucene syntax. If you already know type of objects you want to count, use specialized variants:
Usage example:
from mwdblib import MWDB mwdb = MWDB() # Count samples tagged as evil and with size less than 100kB result = mwdb.count_files("tag:evil AND file.size:[0 TO 100000]")
- Parameters
query (str, optional) – Query in Lucene syntax
- Return type
int
- Raises
requests.exceptions.HTTPError
- count_blobs(query: Optional[str] = None) int [source]
Returns number of blobs matching provided query in Lucene syntax.
- Parameters
query (str, optional) – Query in Lucene syntax
- Return type
int
- Raises
requests.exceptions.HTTPError
- count_configs(query: Optional[str] = None) int [source]
Returns number of configs matching provided query in Lucene syntax.
- Parameters
query (str, optional) – Query in Lucene syntax
- Return type
int
- Raises
requests.exceptions.HTTPError
- count_files(query: Optional[str] = None) int [source]
Returns number of files matching provided query in Lucene syntax.
- Parameters
query (str, optional) – Query in Lucene syntax
- Return type
int
- Raises
requests.exceptions.HTTPError
- listen_for_blobs(last_object: Optional[Union[mwdblib.blob.MWDBBlob, str]] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) Iterator[mwdblib.blob.MWDBBlob] [source]
Listens for recent blobs and yields newly added.
See also
More details can be found here:
listen_for_objects()
New in version 3.2.0: Added listen_for_* methods
New in version 3.4.0: Added query parameter
New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object
- Parameters
last_object (MWDBBlob or str) – MWDBBlob instance or object hash
blocking (bool, optional) – Enable blocking mode (default)
interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)
query (str, optional) – Lucene query to be used for listening for only specific blobs
- Return type
Iterator[
MWDBBlob
]
- listen_for_configs(last_object: Optional[Union[mwdblib.config.MWDBConfig, str]] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) Iterator[mwdblib.config.MWDBConfig] [source]
Listens for recent configs and yields newly added.
See also
More details can be found here:
listen_for_objects()
New in version 3.2.0: Added listen_for_* methods
New in version 3.4.0: Added query parameter
New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object
- Parameters
last_object (MWDBConfig or str) – MWDBConfig instance or object hash
blocking (bool, optional) – Enable blocking mode (default)
interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)
query (str, optional) – Lucene query to be used for listening for only specific configs
- Return type
Iterator[
MWDBConfig
]
- listen_for_files(last_object: Optional[Union[mwdblib.file.MWDBFile, str]] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) Iterator[mwdblib.file.MWDBFile] [source]
Listens for recent files and yields newly added.
See also
More details can be found here:
listen_for_objects()
New in version 3.2.0: Added listen_for_* methods
New in version 3.4.0: Added query parameter
New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object
- Parameters
last_object (MWDBFile or str) – MWDBFile instance or object hash
blocking (bool, optional) – Enable blocking mode (default)
interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)
query (str, optional) – Lucene query to be used for listening for only specific files
- Return type
Iterator[
MWDBFile
]
- listen_for_objects(last_object: Optional[Union[mwdblib.object.MWDBObject, str]] = None, blocking: bool = True, interval: int = 15, query: Optional[str] = None) Iterator[mwdblib.object.MWDBObject] [source]
Listens for recent objects and yields newly added.
In blocking mode (default) if last_object is provided: the method fetches the latest objects until the provided object is reached and yields new objects from the oldest one. Otherwise, the method periodically asks for recent objects until a new object appears. The default request interval is 15 seconds.
In a non-blocking mode: a generator stops if there are no more objects to fetch.
last_object argument accepts both identifier and MWDBObject instance. If the object identifier is provided: method firstly checks whether the object exists in repository and has the correct type.
If you already know type of object you are looking for, use specialized variants:
Warning
Make sure that last_object is valid in MWDB instance. If you provide MWDBObject that doesn’t exist, mwdblib will iterate over all objects and you can quickly hit your rate limit. Library is trying to protect you from that as much as possible by checking type and object existence, but it’s still possible to do something unusual.
Additionally, if using the
query
parameter and passing thelast_object
pivot, make sure that the passed object actually matches the query criteria. Otherwise the mechanism that catches faulty pivots will signal that there’s something wrong and raise an exception.New in version 3.2.0: Added listen_for_* methods
New in version 3.4.0: Added query parameter
New in version 3.4.0: The listen_for_* methods will now try to prevent you from iterating over the whole database by throwing an exception if they detect that there is something wrong with the pivot object
- Parameters
last_object (MWDBObject or str) – MWDBObject instance or object hash
blocking (bool, optional) – Enable blocking mode (default)
interval (int, optional) – Interval for periodic queries in blocking mode (default is 15 seconds)
query (str, optional) – Lucene query to be used for listening for only specific objects
- Return type
Iterator[
MWDBObject
]
- login(username: Optional[str] = None, password: Optional[str] = None) None [source]
Performs user authentication using provided username and password.
If credentials are not set, asks interactively for credentials.
Warning
Keep in mind that password-authenticated sessions are short-lived, so password needs to be stored in
APIClient
object. Consider generating a new API key in your MWDB profile.New in version 2.4.0: MWDB tries to reauthenticate on first Unauthorized exception
New in version 2.5.0: username and password arguments are optional. If one of the credentials is not provided via arguments, user will be asked for it.
New in version 2.6.0:
MWDB.login()
will warn if login is called after setting up API keyChanged in version 4.0.0:
MWDB.login()
no longer warns about password-authenticated sessions or credentials that are already set up.- Parameters
username (str) – Username
password (str) – Password
- Raises
requests.exceptions.HTTPError
- property options: APIClientOptions
Returns object with current configuration of MWDB client
New in version 4.0.0: Added MWDB.options property.
- query(hash: str, raise_not_found: bool = True) Optional[mwdblib.object.MWDBObject] [source]
Queries for object using provided hash. If you already know type of object you are looking for, use specialized variants:
New in version 2.4.0: Added raise_not_found optional argument
Changed in version 3.0.0: Fallback to
query_file()
if other hash than SHA256 was provided- Parameters
hash (str) – Object hash (identifier, MD5, SHA-1, SHA-2)
raise_not_found (bool, optional) – If True (default), method raises HTTPError when object is not found
- Return type
MWDBObject
or None (if raise_not_found=False)- Raises
requests.exceptions.HTTPError
- query_blob(hash: str, raise_not_found: bool = True) Optional[mwdblib.blob.MWDBBlob] [source]
Queries for blob object using provided hash
- Parameters
hash (str) – Object hash (SHA-256 identifier)
raise_not_found (bool) – If True (default), method raises HTTPError when object is not found
- Return type
MWDBBlob
or None (if raise_not_found=False)- Raises
requests.exceptions.HTTPError
- query_config(hash: str, raise_not_found: bool = True) Optional[mwdblib.config.MWDBConfig] [source]
Queries for configuration object using provided hash
- Parameters
hash (str) – Object hash (SHA-256 identifier)
raise_not_found (bool) – If True (default), method raises HTTPError when object is not found
- Return type
MWDBConfig
or None (if raise_not_found=False)- Raises
requests.exceptions.HTTPError
- query_file(hash: str, raise_not_found: bool = True) Optional[mwdblib.file.MWDBFile] [source]
Queries for file using provided hash
- Parameters
hash (str) – Object hash (identifier, MD5, SHA-1, SHA-2)
raise_not_found (bool) – If True (default), method raises HTTPError when object is not found
- Return type
MWDBFile
or None (if raise_not_found=False)- Raises
requests.exceptions.HTTPError
- recent_blobs(chunk_size: Optional[int] = None) Iterator[mwdblib.blob.MWDBBlob] [source]
Retrieves recently uploaded blob objects
- Parameters
chunk_size (int) – Number of blobs returned per API request
- Return type
Iterator[
MWDBBlob
]- Raises
requests.exceptions.HTTPError
- recent_configs(chunk_size: Optional[int] = None) Iterator[mwdblib.config.MWDBConfig] [source]
Retrieves recently uploaded configuration objects
- Parameters
chunk_size (int) – Number of configs returned per API request
- Return type
Iterator[
MWDBConfig
]- Raises
requests.exceptions.HTTPError
- recent_files(chunk_size: Optional[int] = None) Iterator[mwdblib.file.MWDBFile] [source]
Retrieves recently uploaded files
- Parameters
chunk_size (int) – Number of files returned per API request
- Return type
Iterator[
MWDBFile
]- Raises
requests.exceptions.HTTPError
- recent_objects(chunk_size: Optional[int] = None) Iterator[mwdblib.object.MWDBObject] [source]
Retrieves recently uploaded objects If you already know type of object you are looking for, use specialized variants:
Usage example:
from mwdblib import MWDB from itertools import islice mwdb = MWDB() mwdb.login("admin", "password123") # recent_files is generator, do not execute list(recent_files)! files = islice(mwdb.recent_files(), 25) print([(f.name, f.tags) for f in files])
- Parameters
chunk_size (int) – Number of objects returned per API request
- Return type
Iterator[
MWDBObject
]- Raises
requests.exceptions.HTTPError
- search(query: str, chunk_size: Optional[int] = None) Iterator[mwdblib.object.MWDBObject] [source]
Advanced search for objects using Lucene syntax. If you already know type of objects you are looking for, use specialized variants:
Usage example:
from mwdblib import MWDB # Search for samples tagged as evil and with size less than 100kB results = mwdb.search_files("tag:evil AND file.size:[0 TO 100000]")
- Parameters
query (str) – Search query
chunk_size (int) – Number of objects returned per API request
- Return type
Iterator[
MWDBObject
]- Raises
requests.exceptions.HTTPError
- search_blobs(query: str, chunk_size: Optional[int] = None) Iterator[mwdblib.blob.MWDBBlob] [source]
Advanced search for blob objects using Lucene syntax.
- Parameters
query (str) – Search query
chunk_size (int) – Number of blobs returned per API request
- Return type
Iterator[
MWDBBlob
]- Raises
requests.exceptions.HTTPError
- search_configs(query: str, chunk_size: Optional[int] = None) Iterator[mwdblib.config.MWDBConfig] [source]
Advanced search for configuration objects using Lucene syntax.
- Parameters
query (str) – Search query
chunk_size (int) – Number of configs returned per API request
- Return type
Iterator[
MWDBConfig
]- Raises
requests.exceptions.HTTPError
- search_files(query: str, chunk_size: Optional[int] = None) Iterator[mwdblib.file.MWDBFile] [source]
Advanced search for files using Lucene syntax.
- Parameters
query (str) – Search query
chunk_size (int) – Number of files returned per API request
- Return type
Iterator[
MWDBFile
]- Raises
requests.exceptions.HTTPError
- upload_blob(name: str, type: str, content: str, parent: Optional[Union[mwdblib.object.MWDBObject, str]] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, attributes: Optional[Dict[str, Union[Any, List[Any]]]] = None, karton_id: Optional[str] = None, karton_arguments: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) mwdblib.blob.MWDBBlob [source]
Upload blob object
- Parameters
name (str) – Blob name (see also
MWDBBlob.blob_name
)type (str) – Blob type (see also
MWDBBlob.blob_type
)content (str) – Blob content (see also
MWDBBlob.content
)parent (
MWDBObject
or str, optional) – Parent object or parent identifiermetakeys (dict, optional) – Dictionary with string attributes (to be used for MWDB Core older than 2.6.0)
attributes (dict, optional) – Dictionary with attributes to be set after upload. If you want to set many values with the same key: use list as value. Attributes support object values that are JSON-serializable.
karton_id (str, optional) – Karton analysis identifier to be attached to the uploaded file
karton_arguments (dict, optional) – Karton analysis arguments. Reserved for future.
tags (list, optional) – Dictionary with tags to be set after upload.
share_with (str, optional) – Group name you want to share object with
private (bool, optional) – True if sample should be uploaded as private
public (bool, optional) – True if sample should be visible for everyone
- Return type
New in version 4.0.0: Added
attributes
andtags
arguments. They are supported by MWDB Core >= 2.6.0, usemetakeys
if your MWDB Core version is older.New in version 4.1.0: Added
karton_id
andkarton_arguments
parameters. Usekarton_id
instead ofmetakeys={"karton": "<id>"}
if you use MWDB Core >= 2.3.0
- upload_config(family: str, cfg: Dict[str, Any], config_type: str = 'static', parent: Optional[Union[mwdblib.object.MWDBObject, str]] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, attributes: Optional[Dict[str, Union[Any, List[Any]]]] = None, karton_id: Optional[str] = None, karton_arguments: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) mwdblib.config.MWDBConfig [source]
Upload configuration object
- Parameters
family (str) – Malware family name (see also
MWDBConfig.family
)cfg (dict) – Dict object with configuration (see also
MWDBConfig.cfg
)config_type (str, optional) – Configuration type (default: static, see also
MWDBConfig.type
)parent (
MWDBObject
or str, optional) – Parent object or parent identifiermetakeys (dict, optional) – Dictionary with string attributes (to be used for MWDB Core older than 2.6.0)
attributes (dict, optional) – Dictionary with attributes to be set after upload. If you want to set many values with the same key: use list as value. Attributes support object values that are JSON-serializable.
karton_id (str, optional) – Karton analysis identifier to be attached to the uploaded file
karton_arguments (dict, optional) – Karton analysis arguments. Reserved for future.
tags (list, optional) – Dictionary with tags to be set after upload.
share_with (str, optional) – Group name you want to share object with
private (bool, optional) – True if sample should be uploaded as private
public (bool, optional) – True if sample should be visible for everyone
- Return type
New in version 4.0.0: Added
attributes
andtags
arguments. They are supported by MWDB Core >= 2.6.0, usemetakeys
if your MWDB Core version is older.New in version 4.1.0: Added
karton_id
andkarton_arguments
parameters. Usekarton_id
instead ofmetakeys={"karton": "<id>"}
if you use MWDB Core >= 2.3.0mwdb.upload_config( "evil", { "botnet": "mal0123", "version": 2019, "urls": [ "http://example.com", "http://example.com/2" ] } parent="3629344675705286607dd0f680c66c19f7e310a1", public=True)
- upload_file(name: str, content: Union[bytes, BinaryIO], parent: Optional[Union[mwdblib.object.MWDBObject, str]] = None, metakeys: Optional[Dict[str, Union[str, List[str]]]] = None, attributes: Optional[Dict[str, Union[Any, List[Any]]]] = None, karton_id: Optional[str] = None, karton_arguments: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, share_with: Optional[str] = None, private: bool = False, public: bool = False) mwdblib.file.MWDBFile [source]
Upload file object
- Parameters
name (str) – Original file name (see also
MWDBFile.file_name
)content (bytes or BinaryIO) – File contents
parent (
MWDBObject
or str, optional) – Parent object or parent identifiermetakeys (dict, optional) – Dictionary with string attributes (to be used for MWDB Core older than 2.6.0)
attributes (dict, optional) – Dictionary with attributes to be set after upload. If you want to set many values with the same key: use list as value. Attributes support object values that are JSON-serializable.
karton_id (str, optional) – Karton analysis identifier to be attached to the uploaded file
karton_arguments (dict, optional) – Karton analysis arguments. Reserved for future.
tags (list, optional) – Dictionary with tags to be set after upload.
share_with (str, optional) – Group name you want to share object with
private (bool, optional) – True if sample should be uploaded as private
public (bool, optional) – True if sample should be visible for everyone
- Return type
New in version 4.0.0: Added
attributes
andtags
arguments. They are supported by MWDB Core >= 2.6.0, usemetakeys
if your MWDB Core version is older.New in version 4.1.0: Added
karton_id
andkarton_arguments
parameters. Usekarton_id
instead ofmetakeys={"karton": "<id>"}
if you use MWDB Core >= 2.3.0Usage example:
mwdb.upload_file( "malware.exe", open("malware.exe", "rb").read(), parent="3629344675705286607dd0f680c66c19f7e310a1", public=True)
- class mwdblib.APIClient(_auth_token: Optional[str] = None, autologin: bool = True, **api_options: Any)[source]
Client for MWDB REST API that performs authentication and low-level API request/response handling.
If you want to send arbitrary request to MWDB API, use
get()
,post()
,put()
anddelete()
methods usingMWDB.api
property.mwdb = MWDB() ... # Deletes object with given sha256 mwdb.api.delete(f'object/{sha256}')
- property logged_user: Optional[str]
Username of logged-in user or the owner of used API key. Returns None if no credentials are provided
- login(username: str, password: str) None [source]
Performs authentication using provided credentials
- Parameters
username – Account username
password – Account password
- request(method: str, url: str, noauth: bool = False, raw: bool = False, *args: Any, **kwargs: Any) Any [source]
Sends request to MWDB API. This method can be used for accessing features that are not directly supported by mwdblib library.
Other keyword arguments are the same as in requests library.
See also
Use functions specific for HTTP methods instead of passing
method
argument on your own:APIClient.get()
APIClient.post()
APIClient.put()
APIClient.delete()
- Parameters
method – HTTP method
url – Relative url of API endpoint
noauth –
Don’t check if user is authenticated before sending request (default: False)
raw – Return raw response bytes instead of parsed JSON (default: False)
- static requires(required_version: str, always_check_version: bool = False) Callable [source]
Method decorator that provides server version requirement and fallback to older implementation if available.
To optimize requests sent by CLI: first method is called always if server version is not already available. If it fails with EndpointNotFoundError, server version is fetched and used to determine if fallback is available.
If your method fails on something different than missing endpoint, you can check version always by enabling
always_check_version
flag.
- property server_metadata: dict
Information about MWDB Core server from
/api/server
endpoint.
- property server_version: str
MWDB Core server version
- class mwdblib.APIClientOptions(config_path: Optional[pathlib.Path] = PosixPath('/home/docs/.mwdb'), **api_options: Any)[source]
Options bag that contains configuration for APIClient.
Field values are loaded using the following precedence:
built-in defaults accessible via class properties e.g.
APIClientOptions.api_url
values from
~/.mwdb
configuration filevalues passed as an arguments to the
APIClientOptions
constructor
Configuration may depend on
api_url
value, so remember to set it if you want to talk with specific MWDB Core instance.