Recipe 5.2 - Spark Spectrum Library Search


You want to run a spectrum library search using Apache Spark


public static void main(String[] args) throws IOException {

    final String libSpectraPath = args[0];
    final String querySpectraPath = args[1];
    final String outputPath = args[2];

    final SparkConf conf = new SparkConf()
            .setAppName("Spark lib search")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")              //Configure Spark to use Kryo serialization
            .set("spark.kryo.registrator", "org.expasy.mzjava.spark.MzJavaKryoRegistrator");    //Configure Spark to use the MzJavaKryoRegistrator to register classes
    final JavaSparkContext sc = new JavaSparkContext(conf);

    //Read the spectrum library and broadcast it to all nodes
    final Broadcast<List<PeptideConsensusSpectrum>> libSpectraBroadcast = sc.broadcast(
            IterativeReaders.toArrayList(new SptxtReader(new File(libSpectraPath), PeakList.Precision.DOUBLE))

    //Set up the function that performs the library search
    final PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>> libSearchFunction =
            new PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>>() {

                private transient SimFunc<PepLibPeakAnnotation, PeakAnnotation> simFunc;
                private transient SpectrumLibrary<PeptideConsensusSpectrum> library;

                public Tuple2<String, List<Tuple2<Double, Peptide>>> call(final MsnSpectrum querySpectrum)
                        throws Exception {

                    if (simFunc == null) {
                        simFunc = new NdpSimFunc<>(0, new AbsoluteTolerance(0.02));
                        library = new DefaultSpectrumLibrary<>(new PpmTolerance(20), libSpectraBroadcast.getValue());

                    final List<Tuple2<Double, Peptide>> peptideMatches = new ArrayList<>();
                    final Procedure<PeptideConsensusSpectrum> simFuncProcedure = new Procedure<PeptideConsensusSpectrum>() {

                        public void execute(PeptideConsensusSpectrum libSpectrum) {

                            double score = simFunc.calcSimilarity(libSpectrum, querySpectrum);
                            if (score > 0.6)
                                peptideMatches.add(new Tuple2<>(score, libSpectrum.getPeptide()));
                    library.forEach(querySpectrum.getPrecursor(), simFuncProcedure);

                    return new Tuple2<>(querySpectrum.getId() + ":" + querySpectrum.getComment(), peptideMatches);

    // Read the query spectrum from a Hadoop sequence file
    final JavaRDD<MsnSpectrum> querySpectra = MzJavaSparkUtils.msnSpectra(sc, querySpectraPath);
    // Perform the library search
    final JavaPairRDD<String, List<Tuple2<Double, Peptide>>> searchResults = querySpectra.mapToPair(libSearchFunction);

