Source code for e3fp.conformer.protonation

"""Functions for generating protonation states of molecules.

Author: Seth Axen
E-mail: seth.axen@gmail.com
"""
import os
import tempfile
import subprocess
import itertools
import logging

from .util import iter_to_smiles, MolItemName


[docs]def smiles_dict_to_proto_smiles_dict( in_smiles_dict, max_states=3, pka=7.4, dist_cutoff=20.0, add_missing=False, parallelizer=None, chunk_size=100, ): """Generate dict of SMILES for protonated states from SMILES dict.""" kwargs = {"max_states": max_states, "pka": pka, "dist_cutoff": dist_cutoff} in_smiles_iter = ( (smiles, mol_name) for mol_name, smiles in in_smiles_dict.items() ) if parallelizer is None: proto_smiles_iter = iter( smiles_list_to_proto_smiles_list(in_smiles_iter, **kwargs) ) else: smiles_chunks_iter = ( (chunk,) for chunk in _chunk_iter_to_lists( in_smiles_iter, chunk_size=chunk_size ) ) results_iter = ( result for result, data in parallelizer.run_gen( smiles_list_to_proto_smiles_list, smiles_chunks_iter, kwargs=kwargs, ) if result is not False ) proto_smiles_iter = itertools.chain.from_iterable(results_iter) proto_smiles_dict = { mol_name: smiles for smiles, mol_name in proto_smiles_iter } if add_missing: for mol_name, smiles in in_smiles_dict.items(): proto_name = MolItemName(mol_name, proto_state_num=0).proto_name if proto_name not in proto_smiles_dict: logging.debug( ( "Protonated SMILES for {} could not be generated. " "Returning input SMILES." ).format(mol_name) ) proto_smiles_dict[mol_name] = smiles return proto_smiles_dict
[docs]def smiles_list_to_proto_smiles_list( in_smiles_list, max_states=3, pka=7.4, dist_cutoff=20.0 ): """Generate list of SMILES for protonated states from single SMILES.""" in_smiles_file = tempfile.mkstemp(suffix=".smi")[1] iter_to_smiles( in_smiles_file, ((mol_name, smiles) for smiles, mol_name in in_smiles_list), ) logging.debug("Protonating SMILES in %s" % (in_smiles_file)) proc = subprocess.Popen( ( "cxcalc %s --ignore-error dominanttautomerdistribution -H %g -C " 'false -t dist -f "smiles:n,T:dist"' ).format(in_smiles_file, pka), shell=True, stdout=subprocess.PIPE, ) proto_smiles_list = [] try: stdout_iter = iter(proc.stdout.readline, b"") next(stdout_iter) curr_mol_name = None curr_states_count = 0 for line in stdout_iter: try: smiles, mol_name, dist = line.rstrip("\r\n").split() except ValueError: logging.warning("Error parsing line:\n%s" % line) continue if mol_name != curr_mol_name: curr_states_count = 0 curr_mol_name = mol_name if curr_states_count >= max_states: continue if float(dist) > dist_cutoff: proto_name = MolItemName( mol_name, proto_state_num=curr_states_count ).proto_name curr_states_count += 1 proto_smiles_list.append((smiles, proto_name)) logging.debug("Finished protonating SMILES in %s" % (in_smiles_file)) except Exception: logging.exception("Error running cxcalc", exc_info=True) proc.kill() os.remove(in_smiles_file) return proto_smiles_list
[docs]def smiles_to_proto_smiles( smiles, mol_name, max_states=3, pka=7.4, dist_cutoff=20.0 ): """Generate list of SMILES for protonated states from single SMILES. This is very inefficient in batch. """ logging.debug("Protonating SMILES in %s" % (mol_name)) proc = subprocess.Popen( ( 'cxcalc "%s %s" --ignore-error dominanttautomerdistribution -H %g ' '-C false -t dist -f "smiles:n,T:dist"' ).format(smiles, mol_name, pka), shell=True, stdout=subprocess.PIPE, ) states_count = 0 proto_smiles_list = [] try: stdout_iter = iter(proc.stdout.readline, b"") next(stdout_iter) for line in stdout_iter: try: this_smiles, this_name, dist = line.rstrip("\r\n").split() except ValueError: logging.warning("Error parsing line:\n%s" % line) continue if states_count >= max_states: break if float(dist) > dist_cutoff: proto_name = MolItemName( mol_name, proto_state_num=states_count ).proto_name states_count += 1 proto_smiles_list.append((smiles, proto_name)) logging.debug("Finished protonating SMILES in %s" % (mol_name)) except OSError: logging.exception( "Error running cxcalc on %s" % (mol_name), exc_info=True ) proc.kill() return proto_smiles_list
def _chunk_iter_to_lists(iterable, chunk_size=100): """Yield chunks of size `chunk_size` from iterator.""" i = 0 chunk = [] for item in iterable: if i >= chunk_size: yield chunk chunk = [] i = 0 chunk.append(item) i += 1 if len(chunk) != 0: yield chunk