PipelineTracks.py - Definition of tracks in pipelines

Author:Andreas Heger
Release:$Id$
Date:December 09, 2013
Tags:Python

Motivation

A pipeline typically processes the data streams from several experimental data sources. These data streams are usually processed separately (processing, quality control) and as aggregates. For example, consider the following experimental layout:

Filename Content
liver-stimulated-R1 liver, stimulated, replicate 1
liver-stimulated-R2 liver, stimulated, replicate 2
liver-unstimulated-R1 liver, unstimulated, replicate 1
liver-unstimulated-R2 liver, unstimulated, replicate 2
heart-stimulated-R1 heart, stimulated, replicate 1
heart-stimulated-R2 heart, stimulated, replicate 2
heart-unstimulated-R1 heart, unstimulated, replicate 1
heart-unstimulated-R2 heart, unstimulated, replicate 2

The experiment measured in two tissues with two conditions with two replicates each giving eight data streams. During the analysis, the streams are merged in a variety of combinations:

  • unmerged for initial processing, QC, etc.
  • by replicates to assess reproducibility of measurements
  • by condition to assess the size of the response to the stimulus
  • by tissue to assess differences between tissue and address the biological question.

The crossing of data streams complicates the building of pipelines, especially as no two experiments are the same. The PipelineTracks module assists in controlling these data streams. This module provides some tools to map tracks to different representations and to group them in flexible ways in order to provide convenient short-cuts in pipelines.

There are three class within PipelineTracks: Sample, Tracks and Aggregate.

A Track

The basic atomic data structure is a Sample or track. A track is a single measurement that can be combined with other tracks. A track identifier consists of a tuple of attributes. Each track in and experimental design has the same number of labels in the same order. In the example above, there are three attributes: tissue, condition and replicate. Identifiers are thus ('liver', 'stimulated','R1') or ('heart','unstimulated','R2').

The same track can be represented by different names depending on context, for example when it is used as a filename or a database table. As filename, the track ('heart','unstimulated','R2') is rendered as heart-unstimulated-R2 (avoiding spaces), while as a table, it reads heart-unstimulated-R2, avoiding -+.. The Sample class provides convenience methods to convert names from one context to another.

Track containers

A container of type Tracks stores one or more objects of type Sample.

Aggregates

Tracks can be combined into aggregates. Aggregation is indicated by the agg keyword.

For example, the liver-stimulated-agg aggregate combines the tracks liver-stimulated-R1 and liver-stimulated-R2. The aggregate agg-stimulated-agg combines all replicates and all tissues (liver-stimulated-R1, liver-stimulated-R2, heart-stimulated-R1, heart-stimulated-R2)

Usage

Defining tracks and aggregates

To use tracks, you need to first define a new Sample. In the example above with the attributes tissue, condition and replicate, the Sample could be:

import PipelineTracks

class MySample( PipelineTracks.Sample ):
     attributes = ( "tissue", "condition", "replicate" )

Once defined, you can add tracks to a tracks container. For example:

TRACKS = PipelineTracks.Tracks( MySample ).loadFromDirectory( glob.glob( "*.fastq.gz" ), 
                                                              pattern = "(\S+).fastq.gz" )

will collect all files ending in .fastq.gz. The track identifiers will be derived by removing the fastq.gz suffix. The variable TRACKS contains all the tracks derived from files ending in *.fastq.gz:

>>> print TRACKS
[liver-stimulated-R2, heart-stimulated-R2, liver-stimulated-R1, liver-unstimulated-R1, heart-unstimulated-R2, heart-stimulated-R1, heart-unstimulated-R1, liver-unstimulated-R2]

To build aggregates, use PipelineTracks.Aggregate. The following combines replicates for each experiment:

EXPERIMENTS = PipelineTracks.Aggregate( TRACKS, labels = ("condition", "tissue" ) )

Aggregates are simply containers of associated data sets. To get a list of experiments, type:

>>> EXPERIMENTS = PipelineTracks.Aggregate( TRACKS, labels = ("condition", "tissue" ) )
>>> print list(EXPERIMENT)
[heart-stimulated-agg, heart-unstimulated-agg, liver-stimulated-agg, liver-unstimulated-agg]

or:

>>> print EXPERIMENT.keys()
[heart-stimulated-agg, heart-unstimulated-agg, liver-stimulated-agg, liver-unstimulated-agg]

To obtain all replicates in the experiment heart-stimulated, use dictionary access:

>>> print EXPERIMENTS['heart-stimulated-agg']
[heart-stimulated-R2, heart-stimulated-R1]

The returned objects are tracks. To use a track as a tablename or as a file, use data access functions Sample.asTable() or Sample.asFile(), respectively:

>>> print [x.asFile() for x in EXPERIMENTS['heart-stimulated-agg'] ]
['heart-stimulated-R2', 'heart-stimulated-R1']

>>> print [str(x) for x in EXPERIMENTS['heart-stimulated-agg'] ]
['heart-stimulated-R2', 'heart-stimulated-R1']

>>> print [x.asTable() for x in EXPERIMENTS['heart-stimulated-agg'] ]
['heart_stimulated_R2', 'heart_stimulated_R1']

Note how the - is converted to _ as the former are illegal as SQL table names.

The default representation is file-based. By using the class method:

MySample.setDefault( "asTable" )

the default representation can be changed for all tracks simultaneously.

You can have multiple aggregates. For example, some tasks might require all conditions or all tissues:

CONDITIONS = PipelineTracks.Aggregate( TRACKS, labels = ("condition", ) )
TISSUES = PipelineTracks.Aggregate( TRACKS, labels = ("tissue", ) )

You can have several Tracks within a directory. Tracks are simply containers and as such do not have any actions associated with them.

Using tracks in pipelines

Unfortunately, tracks and aggregates do not work yet directly as ruffus task lists. Instead, they need to be converted to files explicitely using list comprehensions.

If you wanted to process all tracks separately, use:

@files( [ ("%s.fastq.gz" % x.asFile(),
            "%s.qc" % x.asFile()) for x in TRACKS ] )
def performQC( infile, outfile ):
   ....

The above statement will create the following list of input/output files for the performQC task:

[ ( "liver-stimulated-R1.fastq.gz", "liver-stimulated-R1.qc" )
  ( "liver-stimulated-R2.fastq.gz" , "liver-stimulated-R2.qc" ),
  ...
]

Using aggregates works similarly, though you will need to create the file lists yourself using nested list comprehensions. The following creates an analysis per experimemnt:

@files( [( ([ "%s.fastq.gz" % y.asFile() for y in EXPERIMENTS[x]]), 
                  "%s.out" % x.asFile()) 
                  for x in EXPERIMENTS ] )
def checkReproducibility( infiles, outfile ):
   ....

The above statement will create the following list of input/output files:

[ ( ( "liver-stimulated-R1.fastq.gz", "liver-stimulated-R2.fastq.gz" ), "liver-stimulated-agg.out" ),
  ( ( "liver-unstimulated-R1.fastq.gz", "liver-unstimulated-R2.fastq.gz" ), "liver-unstimulated-agg.out" ),
  ( ( "heart-stimulated-R1.fastq.gz", "heart-stimulated-R2.fastq.gz" ), "heart-stimulated-agg.out" ),
  ( ( "heart-unstimulated-R1.fastq.gz", "heart-unstimulated-R2.fastq.gz" ), "heart-unstimulated-agg.out" ),
]

The above code makes sure that the file dependencies are observed. Thus, if heart-stimulated-R1.fastq.gz changes, only heart-stimulated-agg.out will be re-computed.

Tracks and aggregates can be used within a task. The following code will collect all replicates for the experiment liver-stimulated-agg

>>> track = TRACKS.factory( filename = "liver-stimulated-agg" )
>>> replicates = PipelineTracks.getSamplesInTrack( track, TRACKS )
>>> print replicates
[liver-stimulated-R2, liver-stimulated-R1]

API

class PipelineTracks.Sample(filename=None)

Bases: object

a sample/track with one attribute called experiment.

create a new Sample.

If filename is given, the sample name will be derived from filename.

clone()

return a copy of self.

asFile()

return sample as a filename

asTable()

return sample as a tablename

asR()

return sample as valid R label

fromFile(fn)

build sample from filename fn

fromTable(tn)

build sample from tablename tn

fromR(rn)

build sample from R name rn

asAggregate(*args)

return a new aggregate Sample.

toLabels()

return attributes that this track is an aggregate of.

classmethod setDefault(representation=None)

set default representation for tracks to representation. If represenation is None, the representation will be set to the library default (asFile()).

class PipelineTracks.Sample3(filename=None)

Bases: PipelineTracks.Sample

a sample/track with three attributes: tissue, condition and replicate.

create a new Sample.

If filename is given, the sample name will be derived from filename.

asAggregate(*args)

return a new aggregate Sample.

asFile()

return sample as a filename

asR()

return sample as valid R label

asTable()

return sample as a tablename

clone()

return a copy of self.

fromFile(fn)

build sample from filename fn

fromR(rn)

build sample from R name rn

fromTable(tn)

build sample from tablename tn

classmethod setDefault(representation=None)

set default representation for tracks to representation. If represenation is None, the representation will be set to the library default (asFile()).

toLabels()

return attributes that this track is an aggregate of.

class PipelineTracks.Tracks(factory=<class 'PipelineTracks.Sample'>)

a collection of tracks.

create a new container.

New tracks are derived using factory.

factory

alias of Sample

loadFromDirectory(files, pattern, exclude=None)

load tracks from a list of files, applying pattern.

Pattern is a regular expression with at at least one group, for example (.*).gz.

If set, exclude files matching regular expression in exclude.

getTracks(pattern=None)

return all tracks in container.

PipelineTracks.getSamplesInTrack(track, tracks)

return all tracks in tracks that constitute track.

Table Of Contents

Previous topic

<no title>

Next topic

<no title>

This Page