Recipe 5.2 - Spark Spectrum Library Search
Problem
You want to run a spectrum library search using Apache Spark
Solution
public static void main(String[] args) throws IOException {
final String libSpectraPath = args[0];
final String querySpectraPath = args[1];
final String outputPath = args[2];
final SparkConf conf = new SparkConf()
.setAppName("Spark lib search")
.setMaster("local[3]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //Configure Spark to use Kryo serialization
.set("spark.kryo.registrator", "org.expasy.mzjava.spark.MzJavaKryoRegistrator"); //Configure Spark to use the MzJavaKryoRegistrator to register classes
final JavaSparkContext sc = new JavaSparkContext(conf);
//Read the spectrum library and broadcast it to all nodes
final Broadcast<List<PeptideConsensusSpectrum>> libSpectraBroadcast = sc.broadcast(
IterativeReaders.toArrayList(new SptxtReader(new File(libSpectraPath), PeakList.Precision.DOUBLE))
);
//Set up the function that performs the library search
final PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>> libSearchFunction =
new PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>>() {
private transient SimFunc<PepLibPeakAnnotation, PeakAnnotation> simFunc;
private transient SpectrumLibrary<PeptideConsensusSpectrum> library;
@Override
public Tuple2<String, List<Tuple2<Double, Peptide>>> call(final MsnSpectrum querySpectrum)
throws Exception {
if (simFunc == null) {
simFunc = new NdpSimFunc<>(0, new AbsoluteTolerance(0.02));
library = new DefaultSpectrumLibrary<>(new PpmTolerance(20), libSpectraBroadcast.getValue());
}
final List<Tuple2<Double, Peptide>> peptideMatches = new ArrayList<>();
final Procedure<PeptideConsensusSpectrum> simFuncProcedure = new Procedure<PeptideConsensusSpectrum>() {
@Override
public void execute(PeptideConsensusSpectrum libSpectrum) {
double score = simFunc.calcSimilarity(libSpectrum, querySpectrum);
if (score > 0.6)
peptideMatches.add(new Tuple2<>(score, libSpectrum.getPeptide()));
}
};
library.forEach(querySpectrum.getPrecursor(), simFuncProcedure);
return new Tuple2<>(querySpectrum.getId() + ":" + querySpectrum.getComment(), peptideMatches);
}
};
// Read the query spectrum from a Hadoop sequence file
final JavaRDD<MsnSpectrum> querySpectra = MzJavaSparkUtils.msnSpectra(sc, querySpectraPath);
// Perform the library search
final JavaPairRDD<String, List<Tuple2<Double, Peptide>>> searchResults = querySpectra.mapToPair(libSearchFunction);
searchResults.saveAsTextFile(outputPath);
}
See also
See Recipe 5.1 for an example that uses Apache Spark
|