Recipe 5.1 - Building a spectrum library using Hadoop
Problem
You want to create a spectrum library using Hadoop
Solution
The Mapper class which maps peptide spectra that are read from a sequence file to a peptide spectrum key value pair.
public static class PeptideMapper extends Mapper<SpectrumKey, PeptideSpectrum, Peptide, PeptideSpectrum> {
@Override
protected void map(SpectrumKey key, PeptideSpectrum value, Context context) throws IOException, InterruptedException {
context.write(value.getPeptide(), value);
}
}
The reducer class which creates consensus spectra for each peptide. If a peptide has spectra which have more than one precursor charge one consensus spectrum is emitted for each charge.
public static class ConsensusReducer extends Reducer<Peptide, PeptideSpectrum, SpectrumKey, PeptideConsensusSpectrum> {
private Tolerance tolerance;
private PeptideFragmenter fragmenter;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
tolerance = new AbsoluteTolerance(0.4);
fragmenter = new PeptideFragmenter(EnumSet.of(IonType.b, IonType.y), FLOAT);
}
@Override
protected void reduce(Peptide peptide, Iterable<PeptideSpectrum> values, Context context) throws IOException, InterruptedException {
final Multimap<Integer, PeakList> chargeMap = ArrayListMultimap.create();
for (PeptideSpectrum spectrum : values) {
chargeMap.put(spectrum.getPrecursor().getCharge(), spectrum);
}
final URI source = new URIBuilder("org.expasy.mzjava", "hadoop-example").build();
for (Integer charge : chargeMap.keySet()) {
final PeptideConsensusSpectrum consensus = PeptideConsensusSpectrum.builder(FLOAT, source)
.setConsensusParameters(0.2, 0.2, SUM_INTENSITY)
.setAnnotationParameters(tolerance, fragmenter)
.setFilterParameters(0.2, 2)
.buildConsensus(charge, peptide, chargeMap.get(charge), Collections.<String>emptySet());
double mz = peptide.calculateMz(charge);
context.write(new SpectrumKey(mz, charge), consensus);
}
}
}
Code to configure Hadoop
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new HadoopPaperExampleRecipe(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
//This line configures to Hadoop to use the MzJava serialization as well as the default Hadoop serialization
configuration.set("io.serializations", "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization,org.expasy.mzjava.hadoop.io.MzJavaSerialization");
//This line configures Hadoop to use the DefaultRawComparator to sort the keys
configuration.setClass(JobContext.KEY_COMPARATOR, DefaultRawComparator.class, RawComparator.class);
String queryFile = args[0];
String outputPath = args[1];
Job job = Job.getInstance(configuration, "Hadoop Example");
job.setJarByClass(getClass());
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job, new Path(queryFile));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
Path outputDir = new Path(outputPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapperClass(PeptideMapper.class);
job.setMapOutputKeyClass(Peptide.class);
job.setMapOutputValueClass(PeptideSpectrum.class);
job.setReducerClass(ConsensusReducer.class);
job.setOutputKeyClass(SpectrumKey.class);
job.setOutputValueClass(PeptideConsensusSpectrum.class);
FileSystem fs = FileSystem.get(configuration);
fs.delete(outputDir, true);
if (job.waitForCompletion(true)) {
return 0;
} else {
LOGGER.severe("wait for completion returned false");
return 1;
}
}
/**
* Comparator for that is used by hadoop to compare the peptide keys
*/
public static final class DefaultRawComparator implements RawComparator<Object>, Serializable {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
}
@Override
public int compare(Object o1, Object o2) {
throw new RuntimeException("Object comparison not supported");
}
}
See also
See Recipe 5.2 for an example that uses Apache Spark
|