Source code for mqrun.mqparams

"""
==========================================
Convert parameters (:mod:`mqrun.mqparams`)
==========================================

.. currentmodule:: mqrun.mqparams

Convert between json or yaml and MaxQuant configuration files.

Contents
========

.. autosummary::
    :toctree: generated/

    xml_to_data
    data_to_xml
    mqrun


Parameters files
================

Example data::

    >>> import json
    >>> from pathlib import Path
    >>> from pprint import pprint

    >>> # TODO how about some sensible parameters ;-)
    >>> with open('paramfile.json') as f:
    >>>    param_file = f.read()

    >>> print(param_file)
    {
        "rawFiles": [
            {
                "name": "input1",
                "params": {
                    "defaults": "default",
                    "variableModifications": [
                        "Oxidation (M)",
                    ]
                }
            },
            {
                "name": "input2",
                "params": {
                    "defaults" :"default",
                }
            }
        "fastaFiles": {
            "fileNames": ["fasta1"],
            "firstSearch": ["fasta1"],
        }
        "globalParams": {
            "defaults": "default",
            "matchBetweenRuns": True
        }
    }

    >>> # load json data
    >>> params = json.load(param_file)

Each parameter file must contain the sections "rawFiles" and "fastaFiles".
"globalParams" and "MSMSParams" are optional.

The input files (raw and fasta) are only identified by a unique name. You must
specify the paths for each of the input files in a dictionary and pass that to
the relevant functions.

The "params" sections in "rawFiles" and the sections "globalParams" and
"MSMSParams" have the optional argument "defaults", that specifies default
values for the other parameters in that section.

# TODO: write some of those and document how to add more

``mqschema.json`` contains a json-schema for this file format along with
descriptions for a few of the parameters.

Example
-------

To convert above parameters to xml wee need a mapping to the file paths of
the input files. Let's say they are stored in the following locations::


    >>> paths = {
    >>>     "input1": Path("C://data/input1.raw")
    >>>     "input2": Path("C://data/input2.raw")
    >>>     "fasta1": Path("C://data/fasta1.fasta")
    >>> }

Convert to xml::

    >>> outdir = Path('C://output/')
    >>> tmpdir = Path('C://tmp')
    >>> xmltree = data_to_xml(params, paths, outdir, tmpdir)

We can now write that xml file to disk and run MaxQuantCmd on it::

    >>> xmltree.write(str(outdir / "params.xml"))

To convert it back to our json format we do::

    >>> params_, path_data = xml_to_data(xmltree)
    >>> path_data.paths == paths
    True
    >>> path_data.outdir == outdir
    True
    >>> path_data.tmpdir == tmpdir
    True

Since the xml file does not contain information about which default values
have been used, it specifies all values and differs from the original.
But converting it back to xml should yield the same result as before::

    >>> from sort_xml_tags import equal_sorted
    >>> xmltree_ = data_to_xml(params_, *path_data)
    >>> equal_sorted(xmltree_, xmltree)
    True

"""

import json
from copy import deepcopy
import collections
from xml.etree import ElementTree
from pathlib import PureWindowsPath, Path
import subprocess
import logging
import numbers
import math

__all__ = ['xml_to_data', 'data_to_xml', 'mqrun']


datadir = Path(__file__).parent / "data"

with (datadir / 'mqschema.json').open() as f:
    _schema = json.load(f)


with (datadir / 'defaults.json').open() as f:
    _vals = json.load(f)
    _defaults = {}
    _defaults['default'] = {}
    _defaults.setdefault('globalParams', {})['default'] = _vals['globalParams']
    _defaults.setdefault('MSMSParams', {})['default'] = _vals['MSMSParams']
    _defaults.setdefault('rawFileParams', {})['default'] = (
        _vals['rawFiles'][0]['params']
    )
    _defaults.setdefault('topLevelParams', {})['default'] = (
        _vals['topLevelParams']
    )


def encode(value):
    """ Encode a value for use in xml """
    # Use capital E in scientific notation
    if isinstance(value, bool):
        return str(value).lower()
    elif isinstance(value, numbers.Real):
        if math.isnan(value):
            return "NaN"
        if value == 0:
            return "0"
        if int(value) == value and (-4 < math.log(abs(value), 10) < 15):
            value = int(value)
        return str(value).replace('e', 'E')
    else:
        return str(value)


def decode(string, dtype):
    """ Decode a value from an xml-file as dtype """
    if string is None:
        return None
    if dtype == "number":
        return float(string.strip())
    elif dtype == "string":
        return string.strip()
    elif dtype == "integer":
        return int(string.strip())
    elif dtype == "boolean":
        s = string.strip()
        if s == "true":
            return True
        elif s == "false":
            return False
        else:
            raise ValueError("not a bool: " + s)

    raise ValueError("Could not parse '{}' as type {}, type not known".format(
        string, dtype))


def rec_update(d, u):
    """ Recursivly update a nested dictionary.

    See https://stackoverflow.com/questions/3232943
    """
    assert isinstance(d, collections.Mapping)
    for k, v in u.items():
        if isinstance(v, collections.Mapping):
            r = rec_update(d.get(k, {}), v)
            d[k] = r
        else:
            d[k] = u[k]


[docs]def data_to_xml(user_data, file_paths, fasta_paths, output_dir, tmp_dir=None, logger=None): """ Convert parameter set to MaxQuant xml. Parameters ---------- user_data : dict The parameters for MaxQuant as described the module doc of mqparams. file_paths : dict Mapping from names in parameter set to actual file paths. fasta_paths : dict # TODO merge with file_paths Mapping from names to file paths. output_dir : pathlib.Path Write the output files to this directory. tmp_dir : pathlib.Path, optional Base dir for temporary data, needs lots of space. Use system default if not specified. logger : logging.Logger, optional Logger for the conversion process. Use ``logging.getLogger('mqparams')`` if not specified Returns ------- params_xml : xml.lxml.ElementTree.ElementTree Configuration file for use with ``MaxQuantCmd.exe`` """ if logger is None: logger = logging.getLogger('mqparams') if file_paths is None: file_paths = {} if fasta_paths is None: fasta_paths = {} extra_data = ExtraMQData(file_paths, fasta_paths, output_dir, tmp_dir) root = ElementTree.Element('MaxQuantParams') tree = ElementTree.ElementTree(root) for klass, key in [(MSMSParams, 'MSMSParams'), (GlobalParams, 'globalParams'), (RawFileParams, 'rawFiles'), (OutputParams, None), (FastaParams, 'fastaFiles'), (TopLevelParams, 'topLevelParams')]: writer = klass(logger) writer.update_data(extra_data=extra_data) if key is not None: writer.update_data(user_data.get(key, None)) writer.write_into_xml(tree) return tree
[docs]def xml_to_data(xml_tree, logger=None): """ Extract parameters and file-paths from MaxQuant configuration file Parameters ---------- xml_tree : xml.etree.ElementTree.Element.Tree MaxQuant configuration file. logger : logging.Logger, optional Logger for the conversion process. Use ``logging.getLogger('mqparams')`` if not specified. Returns ------- params : dict Parameters as described in module doc path_data : mqparams.ExtraMQData Path information from xml file extra_data has the following attributes: file_paths : dict Mapping from names for input files to paths fasta_paths : dict # TODO merge with file_paths Same for fasta files output_dir : pathlib.Path Path to the output directory tmp_dir : pathlib.Path Path to temporary directory """ if logger is None: logger = logging.getLogger('mqparams') data = {} extra = ExtraMQData(None, None, None, None) for klass, key in [(MSMSParams, 'MSMSParams'), (GlobalParams, 'globalParams'), (RawFileParams, 'rawFiles'), (OutputParams, None), (FastaParams, 'fastaFiles'), (TopLevelParams, 'topLevelParams')]: reader = klass(logger) reader.from_xml(xml_tree) if key is not None: data[key] = reader.data reader.update_data(extra_data=extra) extra = reader.extra_data return data, extra
[docs]def mqrun(binpath, params, datapaths, outdir, tmpdir, logger=None): """ Run MaxQuant with specified parameters and paths. """ if logger is None: logger = logging datapaths = {k: Path(v) for k, v in datapaths.items()} fasta_files = {k: str(v) for k, v in datapaths.items() if v.suffix.lower() == '.fasta'} raw_files = {k: str(v) for k, v in datapaths.items() if v.suffix.lower() == '.raw'} outdir = Path(outdir) logging.info("Writing parameter file") xml_path = outdir / "params.xml" with xml_path.open('wb') as f: xml = data_to_xml(params, raw_files, fasta_files, outdir, tmpdir, logger) xml.write(f) logger.info("Run MaxQuant") mqcall = subprocess.Popen( [str(binpath), '-mqpar', str(xml_path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) return mqcall
ExtraMQData = collections.namedtuple( 'ExtraMQData', ['file_paths', 'fasta_paths', 'output_dir', 'tmp_dir'] ) class MQParamSet(object): @property def data(self): return self._data @property def extra_data(self): return self._extra_data def __init__(self, schema, defaults={}, logger=None): if logger is None: logger = logging self._logger = logger self._schema = schema self._data = None self._extra_data = ExtraMQData({}, {}, None, None) self._defaults = defaults def update_data(self, user_data=None, extra_data=None): if extra_data is not None: old = list(self._extra_data) for i, dat in enumerate(extra_data): if dat is not None and dat != {}: old[i] = dat self._extra_data = ExtraMQData(*old) if user_data is not None and user_data != {}: if 'defaults' in user_data: self._data = deepcopy(self._defaults[user_data['defaults']]) else: assert self._schema["type"] == 'object' self._data = {} rec_update(self._data, user_data) def from_xml(self, xml_tree, ignore=[]): if not self._schema["type"] == "object": raise ValueError("type {} not supported" .format(self._schema["type"])) base = xml_tree.getroot() data = self._simple_read_from_xml(base, self._schema, ignore=ignore) self.update_data(data) def write_into_xml(self, xml_tree, ignore=[]): if not self._schema["type"] == "object": raise ValueError("type {} not supported" .format(self._schema["type"])) base = xml_tree.getroot() self._simple_write_into_xml(base, self.data, self._schema, ignore=ignore) def _simple_read_from_xml(self, base_element, schema, ignore=[]): ignore = set(ignore) ignore.add('#defaults') if schema['type'] != 'object': raise ValueError("expected schema to contain an object") schema = schema['properties'] data = {} for key in schema: if key == 'defaults': continue if schema[key]["id"] in ignore: continue el = base_element.find(key) type_ = schema[key]["type"] if type_ == "array": item_type = schema[key]["items"]["type"] if item_type == "string": if el.text is not None: strings = [s.text.strip() for s in el] else: strings = [] data[key] = strings elif item_type == "array": if not schema[key]["items"]["items"]["type"] == "string": raise ValueError("can not decode element " + key) strings = [] for s in el: if s.text is not None: strings.append(s.text.split(';')) else: strings.append([]) data[key] = strings else: raise ValueError("only list of list of string and " + "list of string are supported") else: data[key] = decode(el.text, type_) return data def _simple_write_into_xml(self, base_element, data, schema, ignore=[]): ignore = set(ignore) if schema['type'] != 'object': raise ValueError("expected schema to contain an object") schema = schema['properties'] for key, value in data.items(): if key == 'defaults': continue if key not in schema: raise ValueError("Unknown key: {}".format(key)) if schema[key]['id'] in ignore: continue data_el = ElementTree.Element(key) base_element.append(data_el) if schema[key]["type"] == "array": assert isinstance(value, collections.Sequence) if schema[key]["items"]["type"] == "array": for value_list in value: assert isinstance(value_list, collections.Sequence) str_el = ElementTree.Element("string") if len(value_list) > 0: str_el.text = ';'.join( encode(v) for v in value_list ) data_el.append(str_el) elif schema[key]["items"]["type"] == "string": for val in value: str_el = ElementTree.Element("string") str_el.text = encode(val) data_el.append(str_el) else: raise ValueError("list of {} not supported" .format(schema[key]["items"]["type"])) else: if value is not None: data_el.text = encode(value) class RawFileParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['rawFiles'], _defaults['rawFileParams'], logger, ) def update_data(self, user_data=None, extra_data=None): if user_data is not None: data = [] for user_item in user_data: if 'defaults' in user_item['params']: default = deepcopy( self._defaults[user_item['params']['defaults']] ) rec_update(default, user_item['params']) user_item['params'] = default data.append(user_item) self._data = data super().update_data(extra_data=extra_data) def from_xml(self, xml_tree): root = xml_tree.getroot() files = [] experiments = root.find('experiments') file_paths = root.find('filePaths') fractions = root.find('fractions') param_group_inds = root.find('paramGroupIndices') param_groups = root.find('parameterGroups') for elems in zip(experiments, file_paths, fractions): exp, path, frac = elems file = {} if exp.text and exp.text.strip(): file['experiment'] = exp.text.strip() if path.text and path.text.strip(): file['path'] = path.text.strip() file['name'] = PureWindowsPath(file['path']).stem if frac.text and frac.text.strip(): file['fraction'] = int(frac.text.strip()) files.append(file) params_schema = self._schema['items']['properties']['params'] for i, param_group in enumerate(param_group_inds): index = int(param_group.text.strip()) params_xml = param_groups[index] files[i]['params'] = self._simple_read_from_xml( params_xml, params_schema ) self.update_data(files) def write_into_xml(self, xml_tree): assert isinstance(self._data, list) xml_root = xml_tree.getroot() experiments = ElementTree.Element('experiments') file_paths = ElementTree.Element('filePaths') fractions = ElementTree.Element('fractions') param_group_inds = ElementTree.Element('paramGroupIndices') param_groups = ElementTree.Element('parameterGroups') xml_root.extend([experiments, file_paths, fractions, param_group_inds, param_groups]) for i, file_data in enumerate(self._data): files = file_data['files'] params = file_data['params'] for file in files: experiment = ElementTree.Element('string') if 'experiment' in file: experiment.text = encode(file['experiment']) experiments.append(experiment) file_path = ElementTree.Element('string') if file['name'] in self.extra_data.file_paths: file_path.text = encode( self.extra_data.file_paths[file['name']] ) else: file_path.text = encode(file['path']) file_paths.append(file_path) fraction = ElementTree.Element('short') if 'fraction' in file: fraction.text = encode(file['fraction']) fractions.append(fraction) param_group_ind = ElementTree.Element('int') param_group_ind.text = encode(i) param_group_inds.append(param_group_ind) param_group = ElementTree.Element('parameterGroup') params_schema = self._schema['items']['properties']['params'] self._simple_write_into_xml( param_group, params, params_schema ) param_groups.append(param_group) class MSMSParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['MSMSParams'], _defaults['MSMSParams'], logger, ) def from_xml(self, xml_tree): ignore = {'#msmsParamsArray'} super().from_xml(xml_tree, ignore) key = 'msmsParamsArray' array_schema = self._schema['properties']['msmsParamsArray'] msms_data = [] array_root = xml_tree.find(key) schema = array_schema['items']['properties'] for param_set in array_root: data = {} for name in schema: if name in ['DeNovoTolerance', 'DeisotopeTolerance', 'MatchTolerance']: elem = param_set.find(name) data[name] = decode(elem.text, "number") else: data[name] = param_set.attrib[name] msms_data.append(data) self._data[key] = msms_data def write_into_xml(self, xml_tree): ignore = {'#msmsParamsArray'} super().write_into_xml(xml_tree, ignore) key = 'msmsParamsArray' val = self._data[key] base = ElementTree.Element(key) xml_tree.getroot().append(base) assert isinstance(val, collections.Sequence) for param_set in val: param_set_el = ElementTree.Element('msmsParams') base.append(param_set_el) assert isinstance(param_set, collections.Mapping) for name, value in param_set.items(): if name in ['DeNovoTolerance', 'DeisotopeTolerance', 'MatchTolerance']: tol = ElementTree.Element(name) param_set_el.append(tol) tol.text = encode(value) else: param_set_el.attrib[name] = encode(value) class GlobalParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['globalParams'], _defaults['globalParams'], logger, ) def write_into_xml(self, xml_tree, ignore=[]): super().write_into_xml(xml_tree, ignore) params = xml_tree.getroot() version = ElementTree.SubElement(params, 'maxQuantVersion') version.text = "1.5.0.0" name = ElementTree.SubElement(params, 'name') name.text = "Session1" numThreads = ElementTree.SubElement(params, 'numThreads') numThreads.text = "1" mail = ElementTree.SubElement(params, 'sendEmail') mail.text = "false" class OutputParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['outputOptions'], None, logger, ) def from_xml(self, xml_tree, ignore=[]): assert ignore == [] tmp_folder = decode(xml_tree.find('tempFolder').text, 'string') outdir = decode(xml_tree.find('fixedCombinedFolder').text, 'string') self.update_data( extra_data=ExtraMQData(None, None, outdir, tmp_folder) ) def write_into_xml(self, xml_tree, ignore=[]): assert ignore == [] data = self.extra_data root = xml_tree.getroot() tempFolder = ElementTree.Element('tempFolder') if data.tmp_dir is not None: tempFolder.text = encode(data.tmp_dir) root.append(tempFolder) outdir = ElementTree.Element('fixedCombinedFolder') if data.output_dir is not None: outdir.text = encode(data.output_dir) root.append(outdir) class TopLevelParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['topLevelParams'], _defaults['topLevelParams'], logger, ) def from_xml(self, xml_tree, ignore=[]): assert ignore == [] root = xml_tree.getroot() data = {} for key in self._schema['properties']: if key == 'defaults': continue data[key] = decode( root.attrib[key], self._schema['properties'][key]['type'], ) self.update_data(user_data=data) def write_into_xml(self, xml_tree, ignore=[]): assert ignore == [] root = xml_tree.getroot() for key in self._schema['properties']: if key == 'defaults': continue root.attrib[key] = encode(self.data[key]) class FastaParams(MQParamSet): def __init__(self, logger=None): super().__init__( _schema['properties']['fastaFiles'], None, logger, ) def from_xml(self, xml_tree, ignore=[]): assert ignore == [] root = xml_tree.getroot() data = {} fasta_files = {} for file in root.find('fastaFiles'): path = PureWindowsPath(file.text) fasta_files[path.stem] = str(path) data['fileNames'] = list(fasta_files.keys()) first_search = [] for file in root.find('fastaFilesFirstSearch'): path = PureWindowsPath(file.text) if fasta_files.get(path.stem, str(path)) != str(path): raise ValueError("File name for fasta file not unique") fasta_files[path.stem] = str(path) first_search.append(path.stem) data['firstSearch'] = first_search self.update_data(extra_data=ExtraMQData(None, fasta_files, None, None)) self.update_data(user_data=data) def write_into_xml(self, xml_tree, ignore=[]): assert ignore == [] root = xml_tree.getroot() file_paths = self.extra_data.fasta_paths base = ElementTree.Element('fastaFiles') root.append(base) for name in self.data['fileNames']: item = ElementTree.Element('string') base.append(item) item.text = file_paths[name] base = ElementTree.Element('fastaFilesFirstSearch') root.append(base) for name in self.data['firstSearch']: item = ElementTree.Element('string') base.append(item) item.text = file_paths[name]