Recipe 5.1 - Building a spectrum library using Hadoop


You want to create a spectrum library using Hadoop


The Mapper class which maps peptide spectra that are read from a sequence file to a peptide spectrum key value pair.

public static class PeptideMapper extends Mapper<SpectrumKey, PeptideSpectrum, Peptide, PeptideSpectrum> {

    protected void map(SpectrumKey key, PeptideSpectrum value, Context context) throws IOException, InterruptedException {

        context.write(value.getPeptide(), value);

The reducer class which creates consensus spectra for each peptide. If a peptide has spectra which have more than one precursor charge one consensus spectrum is emitted for each charge.

public static class ConsensusReducer extends Reducer<Peptide, PeptideSpectrum, SpectrumKey, PeptideConsensusSpectrum> {

    private Tolerance tolerance;
    private PeptideFragmenter fragmenter;

    protected void setup(Context context) throws IOException, InterruptedException {


        tolerance = new AbsoluteTolerance(0.4);
        fragmenter = new PeptideFragmenter(EnumSet.of(IonType.b, IonType.y), FLOAT);

    protected void reduce(Peptide peptide, Iterable<PeptideSpectrum> values, Context context) throws IOException, InterruptedException {

        final Multimap<Integer, PeakList> chargeMap = ArrayListMultimap.create();

        for (PeptideSpectrum spectrum : values) {

            chargeMap.put(spectrum.getPrecursor().getCharge(), spectrum);

        final URI source = new URIBuilder("org.expasy.mzjava", "hadoop-example").build();
        for (Integer charge : chargeMap.keySet()) {

            final PeptideConsensusSpectrum consensus = PeptideConsensusSpectrum.builder(FLOAT, source)
                    .setConsensusParameters(0.2, 0.2, SUM_INTENSITY)
                    .setAnnotationParameters(tolerance, fragmenter)
                    .setFilterParameters(0.2, 2)
                    .buildConsensus(charge, peptide, chargeMap.get(charge), Collections.<String>emptySet());

            double mz = peptide.calculateMz(charge);
            context.write(new SpectrumKey(mz, charge), consensus);

Code to configure Hadoop

public static void main(String[] args) throws Exception {

    int exitCode = HadoopPaperExampleRecipe(), args);

public int run(String[] args) throws Exception {

    Configuration configuration = getConf();
    //This line configures to Hadoop to use the MzJava serialization as well as the default Hadoop serialization
    configuration.set("io.serializations", ",,,");
    //This line configures Hadoop to use the DefaultRawComparator to sort the keys
    configuration.setClass(JobContext.KEY_COMPARATOR, DefaultRawComparator.class, RawComparator.class);

    String queryFile = args[0];
    String outputPath = args[1];

    Job job = Job.getInstance(configuration, "Hadoop Example");
    SequenceFileInputFormat.addInputPath(job, new Path(queryFile));

    Path outputDir = new Path(outputPath);
    FileOutputFormat.setOutputPath(job, outputDir);



    FileSystem fs = FileSystem.get(configuration);
    fs.delete(outputDir, true);

    if (job.waitForCompletion(true)) {

        return 0;
    } else {

        LOGGER.severe("wait for completion returned false");
        return 1;

 * Comparator for that is used by hadoop to compare the peptide keys
public static final class DefaultRawComparator implements RawComparator<Object>, Serializable {
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);

    public int compare(Object o1, Object o2) {

        throw new RuntimeException("Object comparison not supported");

See also

See Recipe 5.2 for an example that uses Apache Spark