Source code for e3fp.fingerprint.fprint

"""Classes and methods for chemical fingerprint storage and comparison.

Author: Seth Axen
E-mail: seth.axen@gmail.com
"""
from __future__ import division, print_function
from collections import defaultdict

import pickle as pkl

import numpy as np
from scipy.sparse import issparse, csr_matrix

try:
    from rdkit.DataStructs.cDataStructs import ExplicitBitVect, SparseBitVect

    WITH_RDKIT = True
except ImportError:
    WITH_RDKIT = False
import smart_open
from e3fp.fingerprint.util import (
    E3FPInvalidFingerprintError,
    E3FPMolError,
    E3FPBitsValueError,
    E3FPCountsError,
    E3FPOptionError,
)

# ----------------------------------------------------------------------------#
# Fingerprint Classes
# ----------------------------------------------------------------------------#

BITS_DEF = 2 ** 32
FOLD_BITS_DEF = 1024
FP_DTYPE = np.bool_
COUNT_FP_DTYPE = np.uint16
FLOAT_FP_DTYPE = np.float64
NAME_PROP_KEY = "Name"
MOL_PROP_KEY = "Mol"


[docs]def fptype_from_dtype(dtype): """Get corresponding fingerprint type from NumPy data type. Parameters ---------- dtype : numpy.dtype or str NumPy data type. Returns ------- class: {Fingerprint, CountFingerprint, FloatFingerprint} Class of fingerprint """ if np.issubdtype(dtype, np.bool_): return Fingerprint elif np.issubdtype(dtype, np.integer): return CountFingerprint elif np.issubdtype(dtype, np.floating): return FloatFingerprint else: raise TypeError("dtype {} is invalid for fingerprint".format(dtype))
[docs]def dtype_from_fptype(fp_type): """Get NumPy data type from fingerprint type. Parameters ---------- fp_type : class or Fingerprint Class of fingerprint Returns ------- numpy.dtype NumPy data type """ if isinstance(fp_type, Fingerprint): fp_type = fp_type.__class__ if fp_type is Fingerprint: return FP_DTYPE elif fp_type is CountFingerprint: return COUNT_FP_DTYPE elif fp_type is FloatFingerprint: return FLOAT_FP_DTYPE else: raise E3FPInvalidFingerprintError( "fp_type {} is not a valid fp_type.".format(fp_type) )
[docs]def coerce_to_valid_dtype(dtype): """Coerce provided NumPy data type to closest fingerprint data type. If provided `dtype` cannot be read, default corresponding to bit `Fingerprint` is returned. Parameters ---------- dtype : numpy.dtype or str Input NumPy data type. Returns ------- numpy.dtype Output NumPy data type. """ try: fp_type = fptype_from_dtype(dtype) return dtype_from_fptype(fp_type) except TypeError: return FP_DTYPE
[docs]class Fingerprint(object): """A fingerprint that stores indices of "on" bits. Parameters ---------- indices : array_like of int, optional log2(`bits`)-bit indices in a sparse bitvector of `bits` which correspond to 1. bits : int, optional Number of bits in bitvector. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Attributes ---------- bits : int Number of bits in bitvector, length of fingerprint. counts : dict Dict matching each index in `indices` to number of counts (1 for bits). indices : numpy.ndarray of int Indices of "on" bits level : int Level of fingerprint, corresponding to fingerprinting iterations. mol : RDKit Mol Mol to which fingerprint corresponds (stored in `props`). name : str or None Name of fingerprint (stored in `props`). props : dict Custom properties of fingerprint, consisting of a string keyword and some value. vector_dtype : numpy.dtype NumPy data type associated with fingerprint values (e.g. bits) See Also -------- CountFingerprint: A fingerprint that stores number of occurrences of each index FloatFingerprint: A fingerprint that stores indices of "on" bits e3fp.fingerprint.db.FingerprintDatabase: Efficiently store fingerprints Examples -------- >>> import e3fp.fingerprint.fprint as fp >>> from e3fp.fingerprint.metrics import tanimoto >>> import numpy as np >>> np.random.seed(0) >>> bits = 1024 >>> indices = np.random.randint(0, bits, 30) >>> print(indices) [684 559 629 192 835 763 707 359 9 723 277 754 804 599 70 472 600 396 314 705 486 551 87 174 600 849 677 537 845 72] >>> f = fp.Fingerprint(indices, bits=bits, level=0) >>> f_folded = f.fold(bits=32) >>> print(f_folded.indices) [ 0 1 3 4 5 6 7 8 9 12 13 14 15 17 18 19 21 23 24 25 26 27] >>> print(f_folded.to_vector(sparse=False, dtype=int)) [1 1 0 1 1 1 1 1 1 1 0 0 1 1 1 1 0 1 1 1 0 1 0 1 1 1 1 1 0 0 0 0] >>> print(f_folded.to_bitstring()) 11011111110011110111010111110000 >>> print(f_folded.to_rdkit()) <rdkit.DataStructs.cDataStructs.ExplicitBitVect object at 0x...> >>> f_folded2 = fp.Fingerprint.from_indices(np.random.randint(0, bits, 30), ... bits=bits).fold(bits=32) >>> print(f_folded2.indices) [ 0 1 3 5 7 9 10 14 15 16 17 18 19 20 23 24 25 29 30 31] >>> print(tanimoto(f_folded, f_folded2)) 0.5 """ vector_dtype = FP_DTYPE def __init__( self, indices, bits=BITS_DEF, level=-1, name=None, props={}, **kwargs ): """Initialize Fingerprint object.""" self.reset() indices = np.asarray(indices, dtype=np.int64) if np.any(indices >= bits): raise E3FPBitsValueError( "number of bits is lower than provided indices" ) self.indices = np.unique(indices) self.bits = bits self.level = level self.update_props(props) if name: self.name = name
[docs] def clear(self): """Clear temporary (and possibly large) values.""" pass
[docs] def reset(self): """Reset all values.""" self.indices = np.asarray([], dtype=np.int64) self.bits = 0 self.level = -1 self.folded_fingerprint = {} self.index_to_folded_index_dict = None self.unfolded_fingerprint = None self.index_to_unfolded_index_dict = None self.props = {} self.clear()
[docs] @classmethod def from_indices(cls, indices, bits=BITS_DEF, level=-1, **kwargs): """Initialize from an array of indices. Parameters ---------- indices : array_like of int Indices in a sparse bitvector of length `bits` which correspond to 1. bits : int, optional Number of bits in array. Indices will be log2(`bits`)-bit integers. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : Fingerprint """ return cls(indices, bits=bits, level=level, **kwargs)
[docs] @classmethod def from_vector(cls, vector, level=-1, **kwargs): """Initialize from vector. Parameters ---------- vector : numpy.ndarray or scipy.sparse.csr_matrix Array of bits/counts/floats level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : Fingerprint """ if kwargs.get("bits", None) is None: try: kwargs["bits"] = vector.shape[1] except IndexError: kwargs["bits"] = vector.shape[0] if issparse(vector): indices = vector.indices.astype(np.int64) counts = vector.data else: indices = np.asarray(np.where(vector), dtype=np.int64).flatten() counts = vector[indices] counts = dict(zip(indices, counts)) return cls.from_indices(indices, counts=counts, level=level, **kwargs)
[docs] @classmethod def from_bitstring(cls, bitstring, level=-1, **kwargs): """Initialize from bitstring (e.g. '10010011'). Parameters ---------- bitstring : str String of 1s and 0s. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : Fingerprint """ indices = [i for i, char in enumerate(bitstring) if char != "0"] if kwargs.get("bits", None) is None: kwargs["bits"] = len(bitstring) return cls.from_indices(indices, level=level, **kwargs)
[docs] @classmethod def from_fingerprint(cls, fp, **kwargs): """Initialize by copying existing fingerprint. Parameters ---------- fp : Fingerprint Existing fingerprint. Returns ------- fingerprint : Fingerprint """ if not isinstance(fp, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (fp.__class__.__name__) ) new_fp = cls.from_indices(fp.indices, bits=fp.bits, level=fp.level) new_fp.update_props(fp.props) new_fp.folded_fingerprint = dict( [ (k, v.__class__.from_fingerprint(v)) for k, v in fp.folded_fingerprint.items() ] ) return new_fp
[docs] @classmethod def from_rdkit(cls, rdkit_fprint, **kwargs): """Initialize from RDKit fingerprint. If provided fingerprint is of length 2^32 - 1, assumes real fingerprint is of length 2^32. Parameters ---------- rdkit_fprint : RDKit ExplicitBitVect or SparseBitVect Existing RDKit fingerprint. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : Fingerprint """ if not WITH_RDKIT: raise ImportError("RDKit not available.") if not ( isinstance(rdkit_fprint, ExplicitBitVect) or isinstance(rdkit_fprint, SparseBitVect) ): raise TypeError( "RDKit fingerprint must be a SparseBitVect or ExplicitBitVect" ) bits = rdkit_fprint.GetNumBits() if bits == 2 ** 32 - 1: bits = 2 ** 32 indices = np.asarray(rdkit_fprint.GetOnBits(), dtype=np.int64) return cls.from_indices(indices, bits=bits, **kwargs)
@property def indices(self): return self._indices @indices.setter def indices(self, indices): self._indices = np.asarray(indices, dtype=np.int64) @property def level(self): return self._level @level.setter def level(self, level): self._level = level @property def bits(self): return self._bits @bits.setter def bits(self, bits): self._bits = bits @property def props(self): return self._props @props.setter def props(self, props): self._props = props
[docs] def get_prop(self, key): """Get property. If not set, raise KeyError.""" try: return self.props[key] except AttributeError: raise KeyError
[docs] def set_prop(self, key, val): """Set property.""" self.props[key] = val
[docs] def update_props(self, props_dict): """Set multiple properties at once.""" self.props.update(props_dict)
@property def name(self): try: return self.props[NAME_PROP_KEY] except KeyError: return None @name.setter def name(self, name): self.props[NAME_PROP_KEY] = str(name) @property def mol(self): try: return self.props[MOL_PROP_KEY] except KeyError: return None @mol.setter def mol(self, mol): self.props[MOL_PROP_KEY] = mol @property def index_id_map(self): try: return self.props["index_id_map"] except (KeyError, AttributeError): return None @index_id_map.setter def index_id_map(self, index_id_map): self.props["index_id_map"] = index_id_map
[docs] def to_vector(self, sparse=True, dtype=None): """Get vector of bits/counts/floats. Returns ------- numpy.ndarray or scipy.sparse.csr_matrix Vector of bits/counts/floats """ if dtype is None: dtype = self.vector_dtype counts = self.counts if sparse: try: return csr_matrix( ( [counts[i] for i in self.indices], ([0] * self.bit_count, self.indices), ), shape=(1, self.bits), dtype=dtype, ) except ValueError: raise E3FPBitsValueError( "Number of bits is lower than size of indices" ) else: bitvector = np.zeros(self.bits, dtype=dtype) try: bitvector[self.indices] = [counts[i] for i in self.indices] return bitvector except IndexError: raise E3FPBitsValueError( "Number of bits is lower than size of indices" )
[docs] def to_bitvector(self, sparse=True): """Get full bitvector. Returns ------- numpy.ndarray or scipy.sparse.csr_matrix of bool : Bitvector """ return self.to_vector(sparse=sparse, dtype=FP_DTYPE)
[docs] def to_bitstring(self): """Get bitstring as string of 1s and 0s. Returns ------- str : bitstring """ bitvector = self.to_bitvector(sparse=False) return "".join(map(str, np.asarray(bitvector, dtype=int)))
[docs] def to_rdkit(self): """Convert to RDKit fingerprint. If number of bits exceeds 2^31 - 1, fingerprint will be folded to length 2^31 - 1 before conversion. Returns ------- rdkit_fprint : RDKit ExplicitBitVect or SparseBitVect Convert to bitvector used for RDKit fingerprints. If `self.bits` is less than 10^5, `ExplicitBitVect` is used. Otherwise, `SparseBitVect` is used. """ if not WITH_RDKIT: raise ImportError("RDKit not available.") rdkit_fp_type = SparseBitVect if self.bits < 1e5: rdkit_fp_type = ExplicitBitVect # RDKit Bitvect types can't exceed 2**31 - 1 in length bits = min(self.bits, 2 ** 31 - 1) indices = self.indices % (2 ** 31 - 1) rdkit_fprint = rdkit_fp_type(bits) rdkit_fprint.SetBitsFromList(indices.tolist()) return rdkit_fprint
@property def bit_count(self): return self.indices.shape[0] @property def density(self): return self.bit_count / self.bits
[docs] def get_count(self, index): """Return count index in fingerprint. Defaults to 1 if index in `self.indices` Returns ------- int : Count of bit in fingerprint """ if index in self.indices: return 1 else: return 0
@property def counts(self): return dict([(k, 1) for k in self.indices])
[docs] def mean(self): """Return mean, i.e. proportion of "on" bits in fingerprint. Returns ------- float : Mean """ return self.density
[docs] def std(self): """Return standard deviation of fingerprint. Returns ------- float : Standard deviation """ mean = self.mean() return (mean * (1 - mean)) ** 0.5
# Folding/unfolding to a new fingerprint
[docs] def fold(self, bits=FOLD_BITS_DEF, method=0, linked=True): """Return fingerprint for bitvector folded to size `bits`. Parameters ---------- bits : int, optional Length of new bitvector, ideally multiple of 2. method : {0, 1}, optional Method to use for folding. 0 partitioning (array is divided into equal sized arrays of length `bits` which are bitwise combined with OR) 1 compression (adjacent bits pairs are combined with OR until length is `bits`) linked : bool, optional Link folded and unfolded fingerprints for easy referencing. Set to False if intending to save and want to reduce file size. Returns ------- Fingerprint : Fingerprint of folded bitvector """ if bits > self.bits: raise E3FPBitsValueError("folded bits greater than existing bits") if not np.log2(self.bits / bits).is_integer(): raise E3FPBitsValueError( "existing bits divided by power of 2 does not give folded bits" ) if method not in (0, 1): raise E3FPOptionError("method must be 0 or 1") if (bits, method) not in self.folded_fingerprint: if method == 0: folded_indices = self.indices % bits elif method == 1: folded_indices = self.indices / (self.bits / bits) self.index_to_folded_index_dict = dict( zip(self.indices, folded_indices) ) folded_index_to_index_dict = {} for index, folded_index in self.index_to_folded_index_dict.items(): folded_index_to_index_dict.setdefault( folded_index, set([]) ).add(index) fp = self.__class__.from_indices( folded_indices, bits=bits, level=self.level ) fp.update_props(self.props) fp.index_to_unfolded_index_dict = folded_index_to_index_dict if self.index_id_map is not None: fp.index_id_map = {} for index, id_set in self.index_id_map.items(): fp.index_id_map.setdefault( self.index_to_folded_index_dict[index], set() ).update(id_set) if linked: fp.unfolded_fingerprint = self self.folded_fingerprint[(bits, method)] = fp assert isinstance( self.folded_fingerprint[(bits, method)], self.__class__ ) return self.folded_fingerprint[(bits, method)]
[docs] def get_folding_index_map(self): """Get map of sparse indices to folded indices. Returns ------- dict : Map of sparse index (keys) to corresponding folded index. """ return self.index_to_folded_index_dict
[docs] def unfold(self): """Return unfolded parent fingerprint for bitvector. Returns ------- Fingerprint : Fingerprint of unfolded bitvector. If None, return None. """ return self.unfolded_fingerprint
[docs] def get_unfolding_index_map(self): """Get map of sparse indices to unfolded indices. Returns ------- dict : Map of sparse index (keys) to set of corresponding unfolded indices. """ return self.index_to_unfolded_index_dict
# summary magic methods def __repr__(self): return "%s(indices=%s, level=%r, bits=%r, name=%s)" % ( self.__class__.__name__, repr(self.indices) .replace("\n", "") .replace(" ", "") .replace(",", ", "), self.level, self.bits, self.name, ) def __str__(self): return self.__repr__() # logical/comparative magic methods def __eq__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) return ( self.level == other.level and self.bits == other.bits and self.__class__ == other.__class__ and np.all( np.in1d(self.indices, other.indices, assume_unique=True) ) ) def __ne__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) return not self.__eq__(other) def __add__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot add fingerprints of different sizes" ) return Fingerprint( np.union1d(self.indices, other.indices), bits=self.bits ) def __sub__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot subtract fingerprints of different sizes" ) return Fingerprint( np.setdiff1d(self.indices, other.indices, assume_unique=True), bits=self.bits, ) def __and__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot compare fingerprints of different sizes" ) return Fingerprint( np.intersect1d(self.indices, other.indices, assume_unique=True), bits=self.bits, ) def __or__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot compare fingerprints of different sizes" ) return Fingerprint( np.union1d(self.indices, other.indices), bits=self.bits ) def __xor__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (other.__class__.__name__) ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot compare fingerprints of different sizes" ) return Fingerprint( np.setxor1d(self.indices, other.indices, assume_unique=True), bits=self.bits, ) def __radd__(self, other): return self.__add__(other) def __rsub__(self, other): return self.__sub__(other) def __rand__(self, other): return self.__and__(other) def __ror__(self, other): return self.__ror__(other) def __rxor__(self, other): return self.__rxor__(other) def __iadd__(self, other): return self.__add__(other) def __isub__(self, other): return self.__sub__(other) def __iand__(self, other): return self.__and__(other) def __ior__(self, other): return self.__ror__(other) def __ixor__(self, other): return self.__rxor__(other) # iterable magic methods def __len__(self): return self.bits def __getitem__(self, key): if type(key) is not int: raise TypeError elif self.indices is None: raise KeyError elif key > self.bits: raise KeyError elif key < -self.bits: raise KeyError else: return key in self.indices # pickle magic methods, reduces size of fingerprint file def __getstate__(self): return dict([(k, v) for k, v in self.__dict__.items()]) def __setstate__(self, state): self.__dict__.update(state) self.clear()
[docs]class CountFingerprint(Fingerprint): """A fingerprint that stores number of occurrences of each index. Parameters ---------- indices : array_like of int, optional log2(`bits`)-bit indices in a sparse vector, corresponding to positions with counts greater than 0. If not provided, `counts` must be provided. counts : dict, optional Dict matching each index in `indices` to number of counts. All counts default to 1 if not provided. bits : int, optional Number of bits in bitvector. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Attributes ---------- bits : int Number of bits in bitvector, length of fingerprint. counts : dict Dict matching each index in `indices` to number of counts. indices : numpy.ndarray of int Indices of fingerprint with counts greater than 0. level : int Level of fingerprint, corresponding to fingerprinting iterations. mol : RDKit Mol Mol to which fingerprint corresponds (stored in `props`). name : str or None Name of fingerprint (stored in `props`). props : dict Custom properties of fingerprint, consisting of a string keyword and some value. vector_dtype : numpy.dtype NumPy data type associated with fingerprint values (e.g. bits) See Also -------- Fingerprint: A fingerprint that stores indices of "on" bits FloatFingerprint: A fingerprint that stores float counts Examples -------- >>> import e3fp.fingerprint.fprint as fp >>> from e3fp.fingerprint.metrics import soergel >>> import numpy as np >>> np.random.seed(1) >>> bits = 1024 >>> indices = np.random.randint(0, bits, 30) >>> print(indices) [ 37 235 908 72 767 905 715 645 847 960 144 129 972 583 749 508 390 281 178 276 254 357 914 468 907 252 490 668 925 398] >>> counts = dict(zip(indices, ... np.random.randint(1, 100, indices.shape[0]))) >>> print(sorted(counts.items())) [(37, 51), (72, 88), (129, 62), ..., (925, 50), (960, 8), (972, 23)] >>> f = fp.CountFingerprint(indices, counts=counts, bits=bits, level=0) >>> f_folded = f.fold(bits=32) >>> print(sorted(f_folded.counts.items())) [(0, 8), (1, 62), (5, 113), ..., (29, 50), (30, 14), (31, 95)] >>> print(f_folded.to_vector(sparse=False, dtype=int)) [ 8 62 0 0 0 113 61 58 88 97 71 228 111 2 58 10 64 0 82 0 120 0 0 0 0 82 0 0 27 50 14 95] >>> fp.Fingerprint.from_fingerprint(f_folded) Fingerprint(indices=array([0, 1, ...]), level=0, bits=32, name=None) >>> indices2 = np.random.randint(0, bits, 30) >>> counts2 = dict(zip(indices2, ... np.random.randint(1, 100, indices.shape[0]))) >>> f_folded2 = fp.CountFingerprint.from_indices(indices2, counts=counts2, ... bits=bits).fold(bits=32) >>> print(sorted(f_folded2.counts.items())) [(0, 93), (2, 33), (3, 106), ..., (25, 129), (26, 89), (30, 53)] >>> print(soergel(f_folded, f_folded2)) 0.17492946392... """ vector_dtype = COUNT_FP_DTYPE def __init__( self, indices=None, counts=None, bits=BITS_DEF, level=-1, name=None, props={}, **kwargs ): """Initialize.""" if indices is None and counts is None: raise E3FPOptionError("indices or counts must be specified") self.reset() if indices is not None: indices = np.asarray(indices, dtype=np.int64) if np.any(indices >= bits): raise E3FPBitsValueError( "number of bits is lower than provided indices" ) if counts is None: indices, counts = np.unique(indices, return_counts=True) counts = dict(zip(indices, counts)) else: indices = np.unique(indices) if not np.all([x in indices for x in counts]): raise E3FPCountsError( "At least one index in `counts` is not in `indices`." ) if len(set(indices).symmetric_difference(counts)) > 0: raise E3FPCountsError( "At least one index in `indices` is not in `counts`." ) else: indices = np.asarray(sorted(counts.keys()), dtype=np.int64) if np.any(indices >= bits): raise E3FPBitsValueError( "number of bits is lower than provided indices" ) self.indices = indices self.counts = counts self.bits = bits self.level = level if name: self.props[NAME_PROP_KEY] = name self.update_props(props)
[docs] @classmethod def from_indices( cls, indices, counts=None, bits=BITS_DEF, level=-1, **kwargs ): """Initialize from an array of indices. Parameters ---------- indices : array_like of int, optional Indices in a sparse bitvector of length `bits` which correspond to 1. counts : dict, optional Dictionary mapping sparse indices to counts. bits : int, optional Number of bits in array. Indices will be log2(`bits`)-bit integers. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : CountFingerprint """ return cls(indices, counts=counts, bits=bits, level=level, **kwargs)
[docs] @classmethod def from_counts(cls, counts, bits=BITS_DEF, level=-1, **kwargs): """Initialize from an array of indices. Parameters ---------- counts : dict Dictionary mapping sparse indices to counts. bits : int, optional Number of bits in array. Indices will be log2(`bits`)-bit integers. level : int, optional Level of fingerprint, corresponding to fingerprinting iterations. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : CountFingerprint """ return cls(counts=counts, bits=bits, level=level, **kwargs)
[docs] @classmethod def from_fingerprint(cls, fp, **kwargs): """Initialize by copying existing fingerprint. Parameters ---------- fp : Fingerprint Existing fingerprint. name : str, optional Name of fingerprint. props : dict, optional Custom properties of fingerprint, consisting of a string keyword and some value. Returns ------- fingerprint : Fingerprint """ if not isinstance(fp, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not Fingerprint" % (fp.__class__.__name__) ) counts = dict([(i, c) for i, c in fp.counts.items() if c > 0]) new_fp = cls.from_counts(counts, bits=fp.bits, level=fp.level) new_fp.update_props(fp.props) new_fp.folded_fingerprint = dict( [ (k, v.__class__.from_fingerprint(v)) for k, v in fp.folded_fingerprint.items() ] ) return new_fp
[docs] def reset(self, *args, **kwargs): """Reset all values.""" super(CountFingerprint, self).reset(*args, **kwargs) self.counts = {}
[docs] def get_count(self, index): """Return count index in fingerprint. Returns ------- int : Count of index in fingerprint """ return self.counts.get(index, 0)
@property def counts(self): return self._counts @counts.setter def counts(self, counts): self._counts = dict([(k, int(v)) for k, v in counts.items()])
[docs] def mean(self): """Return mean of counts. Returns ------- float : Mean """ return sum(self._counts.values()) / self.bits
[docs] def std(self): """Return standard deviation of fingerprint. Returns ------- float : Standard deviation """ mean = self.mean() return ( sum(v ** 2 for v in self._counts.values()) / self.bits - mean ** 2 ) ** 0.5
[docs] def fold(self, *args, **kwargs): """Fold fingerprint while considering counts. Optionally, provide a function to reduce colliding counts. Parameters ---------- bits : int, optional Length of new bitvector, ideally multiple of 2. method : {0, 1}, optional Method to use for folding. 0 partitioning (array is divided into equal sized arrays of length `bits` which are bitwise combined with `counts_method`) 1 compression (adjacent bits pairs are combined with `counts_method` until length is `bits`) linked : bool, optional Link folded and unfolded fingerprints for easy referencing. Set to False if intending to save and want to reduce file size. counts_method : function, optional Function for combining counts. Default is summation. Returns ------- CountFingerprint : Fingerprint of folded vector """ counts_method = kwargs.get("counts_method", sum) fp = super(CountFingerprint, self).fold(*args, **kwargs) counts = dict( [ (fold_ind, counts_method([self.get_count(x) for x in ind_set])) for fold_ind, ind_set in fp.index_to_unfolded_index_dict.items() ] ) fp.counts = counts return fp
# summary magic methods def __repr__(self): return "%s(counts=%r, level=%r, bits=%r, name=%s)" % ( self.__class__.__name__, self.counts, self.level, self.bits, self.name, ) # logical/comparative magic methods def __eq__(self, other): if not isinstance(other, CountFingerprint): raise E3FPInvalidFingerprintError( "variable is %s not CountFingerprint" % (other.__class__.__name__) ) return ( self.level == other.level and self.bits == other.bits and self.counts == other.counts and self.__class__ == other.__class__ ) def __ne__(self, other): if not isinstance(other, Fingerprint): raise E3FPInvalidFingerprintError( "variable is %s not CountFingerprint" % (other.__class__.__name__) ) return not self.__eq__(other) def __add__(self, other): if not isinstance(other, CountFingerprint): raise E3FPInvalidFingerprintError( "variable is not CountFingerprint." ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot add fingerprints of different sizes" ) if self.level == other.level: level = self.level else: level = -1 new_counts = self.counts.copy() for k, v in list(other.counts.items()): new_counts[k] = new_counts.get(k, 0) + v new_indices = np.asarray(list(new_counts.keys()), dtype=np.int64) if other.__class__ is FloatFingerprint: new_class = FloatFingerprint else: new_class = self.__class__ return new_class( new_indices, counts=new_counts, bits=self.bits, level=level ) def __sub__(self, other): if not isinstance(other, CountFingerprint): raise E3FPInvalidFingerprintError( "variable is not CountFingerprint." ) if self.bits != other.bits: raise E3FPBitsValueError( "cannot subtract fingerprints of different sizes" ) if self.level == other.level: level = self.level else: level = -1 new_counts = self.counts.copy() for k, v in other.counts.items(): new_counts[k] = new_counts.get(k, 0) - v new_indices = np.asarray(new_counts.keys(), dtype=np.int64) if other.__class__ is FloatFingerprint: new_class = FloatFingerprint else: new_class = self.__class__ return new_class( new_indices, counts=new_counts, bits=self.bits, level=level ) def __floordiv__(self, x): cf = CountFingerprint.from_fingerprint(self) cf.counts = dict( [(k, int(v / x)) for k, v in self.counts.items() if v >= x] ) return cf def __div__(self, x): x = float(x) cf = FloatFingerprint.from_fingerprint(self) cf.counts = dict([(k, v / x) for k, v in self.counts.items()]) return cf def __truediv__(self, x): return self.__div__(x) def __mul__(self, x): cf = self.__class__.from_fingerprint(self) cf.counts = dict([(k, v * float(x)) for k, v in self.counts.items()]) return cf def __rfloordiv__(self, x): return self.__floordiv__(x) def __rdiv__(self, x): return self.__div__(x) def __rtruediv__(self, x): return self.__truediv__(x) def __rmul__(self, x): return self.__mul__(x) def __ifloordiv__(self, x): return self.__floordiv__(x) def __idiv__(self, x): return self.__div__(x) def __itruediv__(self, x): return self.__truediv__(x) def __imul__(self, x): return self.__mul__(x) # iterable magic mathods def __len__(self): return self.bits def __getitem__(self, key): if type(key) is not int: raise TypeError elif self.indices is None: raise KeyError elif key > self.bits: raise KeyError elif key < -self.bits: raise KeyError else: return key in self.indices # pickle magic methods, reduces size of fingerprint def __getstate__(self): return dict( [(k, v) for k, v in self.__dict__.items() if k not in ("indices",)] ) def __setstate__(self, state): self.__dict__.update(state) self.indices = sorted(self.counts.keys()) self.clear()
[docs]class FloatFingerprint(CountFingerprint): """A Fingerprint that stores float counts. Nearly identical to `CountFingerprint`. Mainly a naming convention, but count values are stored as floats. See Also -------- Fingerprint: A fingerprint that stores indices of "on" bits CountFingerprint: A fingerprint that stores number of occurrences of each index """ vector_dtype = FLOAT_FP_DTYPE @property def counts(self): return self._counts @counts.setter def counts(self, counts): self._counts = dict([(k, float(v)) for k, v in counts.items()])
# ----------------------------------------------------------------------------# # Serialization Methods # ----------------------------------------------------------------------------#
[docs]def load(f, update_structure=True): """Load `Fingerprint` object from file. Parameters ---------- f : str or File File name or file-like object to load file from. update_structure : bool, optional Attempt to update the class structure by initializing a new, shiny fingerprint from each fingerprint in the file. Useful for guaranteeing that old, dusty fingerprints are always upgradeable. Returns ------- Fingerprint : Pickled fingerprint. See Also -------- loadz, save """ fps = _load(f, update_structure) if len(fps) == 0: return None else: return fps[0]
[docs]def loadz(f, update_structure=True): """Load `Fingerprint` objects from file. Parameters ---------- f : str or File File name or file-like object to load file from. update_structure : bool, optional Attempt to update the class structure by initializing a new, shiny fingerprint from each fingerprint in the file. Useful for guaranteeing that old, dusty fingerprints are always upgradeable. If this doesn't work, falls back to the original saved fingerprint. Returns ------- list of Fingerprint : Fingerprints in pickle. See Also -------- load, savez """ return _load(f, update_structure)
def _load(f, update_structure=True): fps = [] with smart_open.open(f, "r") as fh: try: while True: fp = pkl.load(fh) if update_structure: try: fps.append(fp.__class__.from_fingerprint(fp)) except AttributeError: fps.append(fp) else: fps.append(fp) except EOFError: pass return fps
[docs]def save(f, fp, **kwargs): """Save `Fingerprint` object to file. Parameters ---------- f : str or File filename `str` or file-like object to save file to fp : Fingerprint Fingerprint to save to file protocol : {0, 1, 2, None}, optional Pickle protocol to use. If None, highest available protocol is used. This will not affect fingerprint loading. Returns ------- bool : Success or fail See Also -------- savez, load """ return _save(f, fp, **kwargs)
[docs]def savez(f, *fps, **kwargs): """Save multiple `Fingerprint` objects to file. Parameters ---------- f : str or File filename `str` or file-like object to save file to fps : list of Fingerprint List of Fingerprints to save to file protocol : {0, 1, 2, None}, optional Pickle protocol to use. If None, highest available protocol is used. This will not affect fingerprint loading. Returns ------- bool : Success or fail See Also -------- save, loadz """ return _save(f, *fps, **kwargs)
def _save(f, *fps, **kwargs): default_dict = {"protocol": None} default_dict.update(kwargs) protocol = default_dict["protocol"] with smart_open.open(f, "w") as fh: if protocol is None: protocol = pkl.HIGHEST_PROTOCOL for fp in fps: pkl.dump(fp, fh, protocol) return True
[docs]def add(fprints, weights=None): """Add fingerprints by count to new `CountFingerprint`. If any of the fingerprints are `FloatFingerprint`, resulting fingerprint is likewise a `FloatFingerprint`. Otherwise, resulting fingerprint is `CountFingerprint`. Parameters ---------- fprints : iterable of Fingerprint Fingerprints to be added by count. weights : iterable of float Weights for weighted sum. Results in `FloatFingerprint` output. Returns ------- CountFingerprint or FloatFingerprint Fingerprint with counts as sum of counts in `fprints`. See Also -------- mean """ if len(fprints) == 0: return None if weights is None: new_counts = sum_counts_dict(*fprints) for fprint in fprints: if isinstance(fprint, FloatFingerprint): new_class = FloatFingerprint break else: new_class = CountFingerprint elif len(weights) != len(fprints): raise ValueError( "Number of fingerprints and weights must be the same." ) else: new_counts = sum_counts_dict(*fprints, weights=weights) new_class = FloatFingerprint new_indices = np.asarray(sorted(new_counts.keys()), dtype=np.int64) return new_class( new_indices, counts=new_counts, bits=fprints[0].bits, level=fprints[0].level, )
[docs]def mean(fprints, weights=None): """Average fingerprints to generate `FloatFingerprint`. Parameters ---------- fprints : iterable of Fingerprint Fingerprints to be added by count. weights : array_like of float, optional Weights for weighted mean. Weights are normalized to a sum of 1. Returns ------- FloatFingerprint : Fingerprint with float counts as average of counts in `fprints`. """ if weights is not None: weights = np.asarray(weights) weight_sum = np.sum(weights) if weight_sum == 0.0: raise ValueError("Sum of weights is 0.") weights = weights / weight_sum return add(fprints, weights=weights) else: return add(fprints) / len(fprints)
[docs]def sum_counts_dict(*fprints, **kwargs): """Given fingerprints, return sum of their counts dicts. If an optional `weights` iterable of the same length as `fprints` is provided, the weighted sum is returned. Parameters ---------- *fprints One or more `Fingerprint` objects weights : iterable of float, optional Weights for weighted mean. Weights are normalized to a sum of 1. Returns ------- dict : Dict of non-zero count indices in any of the `fprints` with value as sum of counts. See Also -------- diff_counts_dict """ counts_sum = defaultdict(int) if "weights" not in kwargs: for fprint in fprints: for k, v in fprint.counts.items(): counts_sum[k] += v else: weights = kwargs["weights"] for (fprint, weight) in zip(fprints, weights): for k, v in fprint.counts.items(): counts_sum[k] += v * weight return counts_sum
[docs]def diff_counts_dict(fp1, fp2, only_positive=False): """Given two fingerprints, returns difference of their counts dicts. Parameters ---------- fp1, fp2 : Fingerprint `Fingerprint` objects, `fp2` subtracted from `fp1`. only_positive : bool, optional Return only positive counts, negative being thresholded to 0. Returns ------- counts_diff : dict Count indices in either `fp1` or `fp2` with value as diff of counts. See Also -------- sum_counts_dict """ counts_diff = fp1.counts.copy() for k, v in fp2.counts.items(): counts_diff[k] = counts_diff.get(k, 0) - v if only_positive and counts_diff[k] < 0: del counts_diff[k] return counts_diff