Source code for lib.patterns_targets

"""
This module handles the reading and filling-in of patterns. It can be used from
within Snakefiles or in downstream (figure-making) scripts.
"""

import os
import collections
import yaml
from . import common
from . import chipseq
from . import helpers

HERE = os.path.abspath(os.path.dirname(__file__))

# Note: when adding support for new peak callers, add them here.
PEAK_CALLERS = ['macs2', 'spp', 'sicer', 'epic2']


def update_recursive(d, u):
    """
    Update dictionary `d` with items in dictionary `u`, recursively
    """
    for k, v in u.items():
        if isinstance(v, collections.abc.Mapping):
            d[k] = update_recursive(d.get(k, {}), v)
        else:
            d[k] = v
    return d


[docs] class SeqConfig(object): def __init__(self, config, patterns, workdir=None): """ This class takes care of common tasks related to config and patterns files (reading the sampletable, etc) but is intended to be subclassed. Parameters ---------- config : str or dict patterns : str Path to patterns YAML file workdir : str Config, patterns, and all paths in `config` should be interpreted as relative to `workdir` """ self.path = None self.workdir = '.' if workdir is not None: config = os.path.join(workdir, config) patterns = os.path.join(workdir, patterns) self.workdir = workdir if isinstance(config, str): self.path = config self.config = common.load_config( common.resolve_config(config, workdir)) stranded = self.config.get('stranded', None) self.stranded = None if stranded: if stranded in ('unstranded'): self.stranded = 'unstranded' elif stranded in ('fr-firststrand', 'ISR', 'SR', 'reverse'): self.stranded = 'fr-firststrand' elif stranded in ('fr-secondstrand', 'ISF', 'SF', 'forward'): self.stranded = 'fr-secondstrand' # Read the config file and extract all sort of useful bits. This mostly # uses the `common` module to handle the details. self.config['references_dir'] = common.get_references_dir(self.config) self.samples, self.sampletable = common.get_sampletable(self.config) self.refdict, self.conversion_kwargs = common.references_dict(self.config) self.organism = self.config['organism'] self.patterns = yaml.load(open(patterns), Loader=yaml.FullLoader) self.is_paired = helpers.detect_layout(self.sampletable) == 'PE' if self.is_paired: self.n = [1, 2] else: self.n = [1] helpers.preflight(self.config)
[docs] class RNASeqConfig(SeqConfig): def __init__(self, config, patterns, workdir=None): """ Config object specific to RNA-seq workflows. Fills in patterns to create targets by handling the by-sample and by-aggregate sections separately. Parameters ---------- config : dict patterns : str Path to patterns YAML file workdir : str Config, patterns, and all paths in `config` should be interpreted as relative to `workdir` """ SeqConfig.__init__(self, config, patterns, workdir) self.fill = dict(sample=self.samples, n=self.n) self.patterns_by_aggregation = self.patterns.pop('patterns_by_aggregate', None) self.targets = helpers.fill_patterns(self.patterns, self.fill, zip) # Then the aggregation if self.patterns_by_aggregation is not None and 'merged_bigwigs' in self.config: self.fill_by_aggregation = dict( merged_bigwig_label=self.config['merged_bigwigs'].keys(), ) self.targets_by_aggregation = helpers.fill_patterns( self.patterns_by_aggregation, self.fill_by_aggregation ) self.targets.update(self.targets_by_aggregation) self.patterns.update(self.patterns_by_aggregation) helpers.rnaseq_preflight(self)
[docs] class ChIPSeqConfig(SeqConfig): def __init__(self, config, patterns, workdir=None): """ Config object specific to ChIP-seq workflows. Fills in patterns to create targets by handling the by-sample, by-peak, and by-aggregate sections separately. Parameters ---------- config : dict patterns : str Path to patterns YAML file workdir : str Config, patterns, and all paths in `config` should be interpreted as relative to `workdir` """ SeqConfig.__init__(self, config, patterns, workdir) self.targets = {} # For ChIP-seq, the structure of the patterns is quite different for # samples than it is for peaks. For example, the peaks do not have any # sample info in the filenames but aggregate possibly many different samples # # So construct them separately, and then later update self.patterns and # self.targets. # # The averaged bigwigs are also aggregated, but in a different way. # They will be handled separately. # # First, the samples... self.patterns_by_sample = self.patterns['patterns_by_sample'] self.fill_by_sample = dict( n=self.n, sample=self.samples.values, label=self.sampletable.label.values, ip_label=self.sampletable.label[ self.sampletable.antibody != 'input'].values ) self.targets_by_sample = helpers.fill_patterns( self.patterns_by_sample, self.fill_by_sample) self.targets.update(self.targets_by_sample) self.patterns.update(self.patterns_by_sample) # Then the aggregation... self.patterns_by_aggregation = self.patterns.pop('patterns_by_aggregate', None) if self.patterns_by_aggregation is not None and 'merged_bigwigs' in self.config: self.fill_by_aggregation = dict( merged_bigwig_label=self.config['merged_bigwigs'].keys(), ) self.targets_by_aggregation = helpers.fill_patterns( self.patterns_by_aggregation, self.fill_by_aggregation ) self.targets.update(self.targets_by_aggregation) self.patterns.update(self.patterns_by_aggregation) # Then the peaks... # self.patterns_by_peaks = self.patterns['patterns_by_peaks'] self.targets_for_peaks = {} # We need to fill in just those peak-calling runs that are specified # for each peak-caller. For reference, here's an example # `patterns_by_peaks` from the YAML: # # peaks: # macs2: '{peak_calling}/macs2/{macs2_run}/peaks.bed' # spp: '{peak_calling}/spp/{spp_run}/peaks.bed' # bigbed: # macs2: '{peak_calling}/macs2/{macs2_run}/peaks.bigbed' # spp: '{peak_calling}/spp/{spp_run}/peaks.bigbed' # Also note that the snakefile's all rule uses # utils.flatten(c.targets['peaks']), but in the case where no # peak-calling runs are specified these should be initialized, # otherwise we'll get a KeyError. self.targets['peaks'] = [] self.targets['bigbed'] = [] for pc in PEAK_CALLERS: # Extract out just the subset of `patterns_by_peaks` for this # peak-caller e.g., from the example above, if pc='macs2' this # would only be: # # peaks: # macs2: '{peak_calling}/macs2/{macs2_run}/peaks.bed' # bigbed: # macs2: '{peak_calling}/macs2/{macs2_run}/peaks.bigbed' # _peak_patterns = { k: {pc: v[pc]} for k, v in self.patterns_by_peaks.items() } # Fix for issue #166, which was caused by commit 8a211122: # # If no runs for the peak-caller are configured, this will be # empty and we should continue on. peaks_to_fill = list(chipseq.peak_calling_dict(self.config, algorithm=pc).keys()) if not peaks_to_fill: continue _fill = {pc + '_run': peaks_to_fill} # The trick here is the recursive updating of targets_for_peaks. # We're adding the filled-in runs of each peak caller to the # targets as they're built. update_recursive( self.targets_for_peaks, helpers.fill_patterns(_peak_patterns, _fill) ) self.targets.update(self.targets_for_peaks) self.patterns.update(self.patterns_by_peaks) helpers.chipseq_preflight(self)