SIB

Recipe 5.1 - Building a spectrum library using Hadoop

Problem

You want to create a spectrum library using Hadoop

Solution

The Mapper class which maps peptide spectra that are read from a sequence file to a peptide spectrum key value pair.


public static class PeptideMapper extends Mapper<SpectrumKey, PeptideSpectrum, Peptide, PeptideSpectrum> {

    @Override
    protected void map(SpectrumKey key, PeptideSpectrum value, Context context) throws IOException, InterruptedException {

        context.write(value.getPeptide(), value);
    }
}

The reducer class which creates consensus spectra for each peptide. If a peptide has spectra which have more than one precursor charge one consensus spectrum is emitted for each charge.


public static class ConsensusReducer extends Reducer<Peptide, PeptideSpectrum, SpectrumKey, PeptideConsensusSpectrum> {

    private Tolerance tolerance;
    private PeptideFragmenter fragmenter;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        super.setup(context);

        tolerance = new AbsoluteTolerance(0.4);
        fragmenter = new PeptideFragmenter(EnumSet.of(IonType.b, IonType.y), FLOAT);
    }

    @Override
    protected void reduce(Peptide peptide, Iterable<PeptideSpectrum> values, Context context) throws IOException, InterruptedException {

        final Multimap<Integer, PeakList> chargeMap = ArrayListMultimap.create();

        for (PeptideSpectrum spectrum : values) {

            chargeMap.put(spectrum.getPrecursor().getCharge(), spectrum);
        }

        final URI source = new URIBuilder("org.expasy.mzjava", "hadoop-example").build();
        for (Integer charge : chargeMap.keySet()) {

            final PeptideConsensusSpectrum consensus = PeptideConsensusSpectrum.builder(FLOAT, source)
                    .setConsensusParameters(0.2, 0.2, SUM_INTENSITY)
                    .setAnnotationParameters(tolerance, fragmenter)
                    .setFilterParameters(0.2, 2)
                    .buildConsensus(charge, peptide, chargeMap.get(charge), Collections.<String>emptySet());

            double mz = peptide.calculateMz(charge);
            context.write(new SpectrumKey(mz, charge), consensus);
        }
    }
}

Code to configure Hadoop


public static void main(String[] args) throws Exception {

    int exitCode = ToolRunner.run(new HadoopPaperExampleRecipe(), args);
    System.exit(exitCode);
}

@Override
public int run(String[] args) throws Exception {

    Configuration configuration = getConf();
    //This line configures to Hadoop to use the MzJava serialization as well as the default Hadoop serialization
    configuration.set("io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.expasy.mzjava.hadoop.io.MzJavaSerialization");
    //This line configures Hadoop to use the DefaultRawComparator to sort the keys
    configuration.setClass(JobContext.KEY_COMPARATOR, DefaultRawComparator.class, RawComparator.class);

    String queryFile = args[0];
    String outputPath = args[1];

    Job job = Job.getInstance(configuration, "Hadoop Example");
    job.setJarByClass(getClass());
    job.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.addInputPath(job, new Path(queryFile));

    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    Path outputDir = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, outputDir);

    job.setMapperClass(PeptideMapper.class);
    job.setMapOutputKeyClass(Peptide.class);
    job.setMapOutputValueClass(PeptideSpectrum.class);

    job.setReducerClass(ConsensusReducer.class);
    job.setOutputKeyClass(SpectrumKey.class);
    job.setOutputValueClass(PeptideConsensusSpectrum.class);

    FileSystem fs = FileSystem.get(configuration);
    fs.delete(outputDir, true);

    if (job.waitForCompletion(true)) {

        return 0;
    } else {

        LOGGER.severe("wait for completion returned false");
        return 1;
    }
}

/**
 * Comparator for that is used by hadoop to compare the peptide keys
 */
public static final class DefaultRawComparator implements RawComparator<Object>, Serializable {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
    }

    @Override
    public int compare(Object o1, Object o2) {

        throw new RuntimeException("Object comparison not supported");
    }
}

See also

See Recipe 5.2 for an example that uses Apache Spark