"""Database for accessing and serializing fingerprints.
Author: Seth Axen
E-mail: seth.axen@gmail.com
"""
from __future__ import division
from collections import defaultdict
import pickle as pkl
import logging
import warnings
import numpy as np
import scipy
from scipy.sparse import vstack, csr_matrix
import smart_open
from ..util import deprecated, E3FPEfficiencyWarning
from .fprint import (
Fingerprint,
CountFingerprint,
FloatFingerprint,
fptype_from_dtype,
dtype_from_fptype,
NAME_PROP_KEY,
)
from .util import E3FPBitsValueError, E3FPInvalidFingerprintError
[docs]class FingerprintDatabase(object):
"""Efficiently build, access, compare, and save fingerprints.
Fingerprints must have the same values of `bits` and `level`.
Additionally, all fingerprints will be cast to the type of fingerprint
passed to the database upon instantiation.
Parameters
----------
fp_type : type, optional
Type of fingerprint (`Fingerprint`, `CountFingerprint`,
`FloatFingerprint`).
level : int, optional
Level, or number of iterations used during fingerprinting.
name : str, optional
Name of database.
Attributes
----------
array : scipy.sparse.csr_matrix
Sparse matrix with dimensions N x M, where M is `bits`, and M is
`fp_num`.
bits : int
Number of bits (length) of fingerprints.
fp_names : list of str
Names of fingerprints.
fp_names_to_indices : dict
Map from fingerprint name to row indices of `array`.
fp_num : int
Number of fingerprints in database.
fp_type : type
Type of fingerprint (`Fingerprint`, `CountFingerprint`,
`FloatFingerprint`)
level : int
Level, or number of iterations used during fingerprinting.
name : str
Name of database
props : dict
Dict with keys specifying names of fingerprint properties and values
corresponding to array of values.
Notes
-----
Since most fingerprints are very sparse length-wise, `FingerprintDatabase`
is implemented as a wrapper around a `scipy.sparse.csr_matrix` for
efficient memory usage. This provides easy access to underlying data for
tight integration with NumPy/SciPy and machine learning packages while
simultaneously providing several fingerprint-specific features.
See Also
--------
e3fp.fingerprint.fprint.Fingerprint: A fingerprint that stores indices of
"on" bits
Examples
--------
>>> from e3fp.fingerprint.db import FingerprintDatabase
>>> from e3fp.fingerprint.fprint import Fingerprint
>>> import numpy as np
>>> np.random.seed(2)
>>> db = FingerprintDatabase(fp_type=Fingerprint, name="TestDB")
>>> print(db)
FingerprintDatabase[name: TestDB, fp_type: Fingerprint, level: -1, bits: None, fp_num: 0]
>>> bvs = (np.random.uniform(size=(3, 1024)) > .9).astype(bool)
>>> fps = [Fingerprint.from_vector(bvs[i, :], name="fp" + str(i))
... for i in range(bvs.shape[0])]
>>> db.add_fingerprints(fps)
>>> print(db)
FingerprintDatabase[name: TestDB, fp_type: Fingerprint, level: -1, bits: 1024, fp_num: 3]
The contained fingerprints may be accessed by index or name.
>>> db[0]
Fingerprint(indices=array([40, ..., 1012]), level=-1, bits=1024, name=fp0)
>>> db['fp2']
[Fingerprint(indices=array([0, ..., 1013]), level=-1, bits=1024, name=fp2)]
Alternatively, the underlying `scipy.sparse.csr_matrix` may be accessed.
>>> db.array
<3x1024 sparse matrix of type '<... 'numpy.bool_'>'
...with 327 stored elements in Compressed Sparse Row format>
>>> db.array.toarray()
array([[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
[ True, False, False, ..., False, False, False]])
Fingerprint properties may be stored in the database.
>>> db.set_prop("prop", np.arange(3))
The database can be efficiently stored and loaded.
>>> db.savez("/tmp/test_db.fpz")
>>> db = FingerprintDatabase.load("/tmp/test_db.fpz")
>>> print(db)
FingerprintDatabase[name: TestDB, fp_type: Fingerprint, level: -1, bits: 1024, fp_num: 3]
Various comparison metrics in `e3fp.fingerprint.metrics` can operate
efficiently directly on databases
>>> from e3fp.fingerprint.metrics import tanimoto, dice, cosine
>>> tanimoto(db, db)
array([[1. , 0.0591133 , 0.04245283],
[0.0591133 , 1. , 0.0531401 ],
[0.04245283, 0.0531401 , 1. ]])
>>> dice(db, db)
array([[1. , 0.11162791, 0.08144796],
[0.11162791, 1. , 0.10091743],
[0.08144796, 0.10091743, 1. ]])
>>> cosine(db, db)
array([[1. , 0.11163878, 0.08145547],
[0.11163878, 1. , 0.10095568],
[0.08145547, 0.10095568, 1. ]])
"""
def __init__(self, fp_type=Fingerprint, level=-1, name=None):
if fp_type not in (Fingerprint, CountFingerprint, FloatFingerprint):
raise TypeError(
"{} is not a valid fingerprint type".format(fp_type)
)
self.name = name
self.fp_type = fp_type
self.level = level
self.array = None
self.fp_names = []
self.fp_names_to_indices = defaultdict(list)
self.props = {}
[docs] def add_fingerprints(self, fprints):
"""Add fingerprints to database.
Parameters
----------
fprints : iterable of Fingerprint
Fingerprints to add to database
"""
self._check_fingerprints_are_valid(fprints)
dtype = self.fp_type.vector_dtype
if self.fp_num > 0:
prop_names = self.props.keys()
else:
prop_names = [
k for k in fprints[0].props.keys() if k != NAME_PROP_KEY
]
new_rows = []
new_names = []
new_props = {x: [] for x in prop_names}
for fprint in fprints:
new_rows.append(fprint.to_vector(sparse=True, dtype=dtype))
new_names.append(fprint.name)
for prop_name in prop_names:
new_props[prop_name].append(fprint.get_prop(prop_name))
try:
old_fp_num = self.fp_num
self.array = vstack([self.array] + list(new_rows))
except (AttributeError, ValueError): # array not yet defined
old_fp_num = 0
self.array = vstack(new_rows)
self.array = self.array.tocsr()
del new_rows
self.fp_names += new_names
self.update_names_map(new_names=new_names, offset=old_fp_num)
self.update_props(new_props, append=True)
[docs] def update_names_map(self, new_names=None, offset=0):
"""Update map of fingerprint names to row indices of `self.array`.
Parameters
----------
new_names : iterable of str, optional
Names to add to map. If None, map is completely rebuilt.
offset : int, optional
Number of rows before new rows.
"""
if new_names is None:
new_names = self.fp_names
for i, name in enumerate(new_names):
self.fp_names_to_indices[name].append(i + offset)
[docs] def update_props(self, props_dict, append=False, check_length=True):
"""Set multiple properties at once.
Parameters
----------
props_dict : dict
Dict of properties. Values must be array-like of length `fp_num`.
append : bool, optional
Append values to those already in database. By default,
properties are overwritten if already present.
check_length : bool, optional
Check to ensure number of properties match number of fingerprints
already in database. This should only be set to False for
temporary iterative updating.
"""
for prop_name, prop_vals in props_dict.items():
if append and prop_name in self.props:
prop_vals = np.append(self.get_prop(prop_name), prop_vals)
self.set_prop(prop_name, prop_vals, check_length=check_length)
[docs] def get_subset(self, fp_names, name=None):
"""Get database with subset of fingerprints.
Parameters
----------
fp_names : list of str
List of fingerprint names to include in new db.
name : str, optional
Name of database
"""
try:
indices, fp_names = zip(
*[
(y, x)
for x in fp_names
for y in self.fp_names_to_indices[x]
]
)
except KeyError:
raise ValueError(
"Not all provided fingerprint names are in database."
)
array = self.array[indices, :]
props = {k: v[list(indices)] for k, v in self.props.items()}
return FingerprintDatabase.from_array(
array,
fp_names=fp_names,
fp_type=self.fp_type,
level=self.level,
name=name,
props=props,
)
[docs] def get_density(self, index=None):
"""Get percentage of fingerprints with 'on' bit at position.
Parameters
----------
index : int or None, optional
Index to bit for which to return positional density. If None,
density for whole database is returned.
Returns
-------
float
Density of 'on' position in database
"""
if index is not None:
if not isinstance(index, int):
raise TypeError("Index must be an integer")
return (self.array.indices == index).sum() / self.fp_num
return self.array.nnz / (self.bits * self.fp_num)
[docs] def as_type(self, fp_type, copy=False):
"""Get database with fingerprint type `fp_type`.
Parameters
----------
fp_type : type
Type of fingerprint (`Fingerprint`, `CountFingerprint`,
`FloatFingerprint`)
copy : bool, optional
Force copy of database. If False, if database is already of
requested type, no copy is made.
Returns
-------
FingerprintDatabase
Database coerced to fingerprint type of `fp_type`.
"""
if fp_type is self.fp_type and not copy:
return self
return FingerprintDatabase.from_array(
self.array,
fp_names=self.fp_names,
fp_type=fp_type,
level=self.level,
name=self.name,
props=self.props,
)
[docs] def fold(self, bits, fp_type=None, name=None):
"""Get copy of database folded to specified bit length.
Parameters
----------
bits : int
Number of bits to which to fold database.
fp_type : type or None, optional
Type of fingerprint (Fingerprint, CountFingerprint,
FloatFingerprint). Defaults to same type.
name : str, optional
Name of database
Returns
-------
FingerprintDatabase
Database folded to specified length.
Raises
------
BitsValueError
If `bits` is greater than the length of the database or database
cannot be evenly folded to length `bits`.
"""
if bits > self.bits:
raise E3FPBitsValueError("Folded bits greater than existing bits")
if not np.log2(self.bits / bits).is_integer():
raise E3FPBitsValueError(
"Existing bits divided by power of 2 does not give folded bits"
)
if fp_type is None:
fp_type = self.fp_type
dtype = dtype_from_fptype(fp_type)
if name is None:
name = self.name
fold_arr = csr_matrix(
(self.array.data, self.array.indices % bits, self.array.indptr),
shape=self.array.shape,
)
fold_arr.sum_duplicates()
fold_arr = fold_arr[:, :bits].tocsr()
fold_arr.data = fold_arr.data.astype(dtype, copy=False)
return self.from_array(
fold_arr,
fp_names=self.fp_names,
fp_type=fp_type,
level=self.level,
name=name,
props=self.props,
)
[docs] @classmethod
def from_array(
cls, array, fp_names, fp_type=None, level=-1, name=None, props={}
):
"""Instantiate from array.
Parameters
----------
array : numpy.ndarray or scipy.sparse.csr_matrix
Sparse matrix with dimensions `N` x `M`, where `M` is the number
of bits in the fingerprints.
fp_names : list of str
`N` names of fingerprints in `array`.
fp_type : type, optional
Type of fingerprint (Fingerprint, CountFingerprint,
FloatFingerprint).
level : int, optional
Level, or number of iterations used during fingerprinting.
name : str or None, optional
Name of database.
props : dict, optional
Dict with keys specifying names of fingerprint properties and
values corresponding to length `N` array of values.
Returns
-------
FingerprintDatabase
Database containing fingerprints in `array`.
"""
dtype = array.dtype
if fp_type is None:
try:
fp_type = fptype_from_dtype(dtype)
except TypeError:
logging.warning(
(
"`fp_type` not provided and array dtype {} does not "
"match fingerprint-associated dtype. Defaulting to "
"binary `Fingerprint.`"
).format(dtype)
)
fp_type = Fingerprint
dtype = dtype_from_fptype(fp_type)
else:
dtype = dtype_from_fptype(fp_type)
db = cls(fp_type=fp_type, level=level, name=name)
db.array = csr_matrix(array, dtype=dtype)
db.fp_names = list(fp_names)
db.update_names_map()
db.update_props(props)
return db
@deprecated("1.2", msg="Use `savez` instead.")
def save(self, fn="fingerprints.fps.bz2"):
"""Save database to file.
Parameters
----------
fn : str, optional
Filename or basename if extension does not include '.fps'
"""
if ".fps" not in fn:
fn += ".fps.bz2"
with smart_open.open(fn, "wb") as f:
pkl.dump(self, f)
[docs] def savez(self, fn="fingerprints.fpz"):
"""Save database to file.
Database is serialized using `numpy.savez_compressed`.
Parameters
----------
fn : str, optional
Filename or basename if extension is not '.fpz'
"""
if not fn.endswith(".fpz"):
fn += ".fpz"
array_dict = {
"data": self.array.data,
"shape": self.array.shape,
"indices": self.array.indices,
"indptr": self.array.indptr,
"fp_names": np.array(self.fp_names),
"level": self.level,
"name": self.name,
"fp_type": self.fp_type,
}
for k, v in self.props.items():
array_dict["_" + str(k)] = v
with open(fn, "wb") as f:
np.savez_compressed(f, **array_dict)
[docs] def savetxt(self, fn, with_names=True):
"""Save bitstring representation to text file.
Only implemented for `fp_type` of `Fingerprint`. This should not be
attempted for large numbers of bits.
Parameters
----------
fn : str or filehandle
Out file. Extension is automatically parsed to determine whether
compression is used.
with_names : bool, optional
Include name of fingerprint in same row after bitstring.
Raises
------
E3FPInvalidFingerprintError
If `fp_type` is not `Fingerprint`.
E3FPEfficiencyWarning
If `bits` is over 2^14 = 16384.
"""
if self.fp_type is not Fingerprint:
raise E3FPInvalidFingerprintError(
"Only binary `Fingerprint` databases may be saved to "
"bitstrings."
)
if self.bits > 2 ** 14:
warnings.warn(
(
"Saving sparse bitstrings to text file is highly "
"inefficient for large bit lengths"
),
category=E3FPEfficiencyWarning,
stacklevel=2,
)
row_fmt = "{0:s}"
if with_names:
row_fmt += " {1:s}"
with smart_open.open(fn, "w") as f:
for i in range(self.fp_num):
# Much more efficient to access underlying arrays
indices = self.array.indices[
self.array.indptr[i] : self.array.indptr[i + 1]
]
bs = "1".join(
[
"0" * j
for j in np.diff(np.r_[-1, indices, self.bits]) - 1
]
)
f.write(row_fmt.format(bs, self.fp_names[i]) + "\n")
[docs] @classmethod
def load(cls, fn):
"""Load database from file.
The extension is used to determine how database was serialized
(`save` vs `savez`).
Parameters
----------
fn : str
Filename
Returns
-------
FingerprintDatabase
Database
"""
if fn.endswith(".fpz"):
if scipy.__version__ < "1.0":
warnings.warn(
(
"Use SciPy 1.0 or newer to efficiently load large "
"FingerprintDatabases."
),
category=E3FPEfficiencyWarning,
stacklevel=2,
)
array_dict = dict(np.load(fn, allow_pickle=True).items())
props_dict = {}
for k in list(array_dict.keys()):
if k.startswith("_"):
v = array_dict.pop(k)
props_dict[k[1:]] = v
array = csr_matrix(
(
array_dict["data"],
array_dict["indices"],
array_dict["indptr"],
),
shape=array_dict["shape"],
)
return FingerprintDatabase.from_array(
array,
array_dict["fp_names"],
fp_type=array_dict["fp_type"].item(),
level=array_dict["level"].item(),
name=array_dict["name"].item(),
props=props_dict,
)
else:
with smart_open.open(fn, "rb") as f:
return pkl.load(f)
@property
def fp_num(self):
try:
return self.array.shape[0]
except AttributeError:
return 0
@property
def bits(self):
try:
return self.array.shape[1]
except AttributeError:
return None
[docs] def get_prop(self, key):
"""Get property.
Raises
------
KeyError
If `key` not in `props`.
"""
try:
return self.props[key]
except KeyError:
raise KeyError("Database does not have property.")
[docs] def set_prop(self, key, vals, check_length=True):
"""Set values of property for fingerprints.
Parameters
----------
key : str
Name of property
vals : array_like
Values of property.
check_length : bool, optional
Check to ensure number of properties match number of fingerprints
already in database. This should only be set to False for
temporary iterative updating.
"""
vals = np.asanyarray(vals)
if check_length and vals.shape[0] != len(self.fp_names):
raise ValueError("props must have the same count as fingerprints.")
self.props[key] = vals
def _get_fprint_at_index(self, i):
return self.fp_type.from_vector(
self.array[i, :],
level=self.level,
name=self.fp_names[i],
props=self._get_fprint_props(i),
)
def _get_fprint_props(self, i):
return {k: v[i] for k, v in self.props.items()}
def _check_fingerprints_are_valid(self, fprints):
"""Check if passed fingerprints fit database."""
if fprints[0].level != self.level:
raise ValueError(
"Provided fingerprints must have database level"
" {}".format(self.level)
)
if self.fp_type is None:
self.fp_type = fprints[0].__class__
elif self.fp_type is not fprints[0].__class__:
logging.warning(
"Database is of type {}. Fingerprints will be cast"
" to this type.".format(self.fp_type.__name__)
)
def __eq__(self, other):
if (
self.fp_type == other.fp_type
and self.level == other.level
and self.bits == other.bits
and self.fp_num == other.fp_num
and self.fp_names_to_indices == other.fp_names_to_indices
):
if self.array is None or other.array is None:
return self.array is other.array
else:
return (self.array - other.array).nnz == 0
else:
return False
def __neq__(self, other):
return not self == other
def __iter__(self):
for i in range(self.fp_num):
yield self.fp_type.from_vector(
self.array[i, :], level=self.level, name=self.fp_names[i]
)
def __add__(self, other):
return concat([self, other])
def __repr__(self):
return "FingerprintDatabase(fp_type={}, level={}, name='{}')".format(
self.fp_type.__name__, self.level, self.name
)
def __str__(self):
return (
"FingerprintDatabase[name: {}, fp_type: {}, level: {}, "
"bits: {}, fp_num: {}]"
).format(
self.name,
self.fp_type.__name__,
self.level,
self.bits,
self.fp_num,
)
def __len__(self):
return self.fp_num
def __getitem__(self, key):
"""Get list of fingerprints with name."""
if isinstance(key, str):
try:
indices = self.fp_names_to_indices[key]
except AttributeError:
raise KeyError(
"fingerprint named {} is not in the database".format(key)
)
return [self._get_fprint_at_index(i) for i in indices]
elif isinstance(key, int):
try:
return self._get_fprint_at_index(key)
except (IndexError, AttributeError):
raise IndexError("index out of range")
else:
raise TypeError("Key or index must be str or int.")
def __copy__(self):
return FingerprintDatabase.from_array(
self.array,
self.fp_names,
fp_type=self.fp_type,
level=self.level,
name=self.name,
props=self.props,
)
def __getstate__(self):
d = {}
d["name"] = self.name
d["fp_type"] = self.fp_type
d["level"] = self.level
d["array"] = self.array
d["fp_names"] = self.fp_names
d["props"] = self.props
return d
def __setstate__(self, state):
self.__dict__.update(state)
self.__dict__["fp_names_to_indices"] = defaultdict(list)
self.update_names_map()
if "props" not in state:
self.props = {}
@deprecated("1.2", msg="Use `concat` instead.")
def append(dbs):
"""Efficiently concatenate `FingerprintDatabase` objects.
The databases must be of the same type with the same number of bits,
level, and property names.
Parameters
----------
dbs : iterable of FingerprintDatabase
Fingerprint databases
Returns
-------
FingerprintDatabase
Database with all fingerprints from provided databases.
"""
return concat(dbs)
[docs]def concat(dbs):
"""Efficiently concatenate `FingerprintDatabase` objects.
The databases must be of the same type with the same number of bits,
level, and property names.
Parameters
----------
dbs : iterable of FingerprintDatabase
Fingerprint databases
Returns
-------
FingerprintDatabase
Database with all fingerprints from provided databases.
See Also
--------
FingerprintDatabase
Examples
--------
>>> from e3fp.fingerprint.db import FingerprintDatabase, concat
>>> from e3fp.fingerprint.fprint import Fingerprint
>>> import numpy as np
>>> np.random.seed(2)
>>> db1 = FingerprintDatabase(fp_type=Fingerprint, name="TestDB1", level=5)
>>> db2 = FingerprintDatabase(fp_type=Fingerprint, name="TestDB2", level=5)
>>> bvs = (np.random.uniform(size=(6, 1024)) > .9).astype(bool)
>>> fps = [Fingerprint.from_vector(bvs[i, :], name="fp" + str(i), level=5)
... for i in range(bvs.shape[0])]
>>> db1.add_fingerprints(fps[:3])
>>> db2.add_fingerprints(fps[3:])
>>> print(concat([db1, db2]))
FingerprintDatabase[name: None, fp_type: Fingerprint, level: 5, bits: 1024, fp_num: 6]
"""
dbs = list(dbs)
level = dbs[0].level
bits = dbs[0].bits
fp_type = dbs[0].fp_type
arrays = []
fp_names = []
full_db = FingerprintDatabase(fp_type=fp_type, level=level)
for i, db in enumerate(dbs):
if db.level != level:
raise TypeError(
"Cannot concatenate databases with different levels"
)
elif db.bits != bits:
raise TypeError(
"Cannot concatenate databases with different bit lengths"
)
elif db.fp_type != fp_type:
raise TypeError(
"Cannot concatenate databases with different "
"fingerprint types"
)
arrays.append(db.array)
fp_names.extend(db.fp_names)
full_db.update_props(db.props, append=True, check_length=False)
full_db.array = vstack(arrays).tocsr()
full_db.fp_names = fp_names
for prop_name, prop_vals in full_db.props.items():
if len(prop_vals) != full_db.fp_num:
raise ValueError("props must have the same count as fingerprints.")
full_db.update_names_map()
return full_db