SIB

Recipe 5.2 - Spark Spectrum Library Search

Problem

You want to run a spectrum library search using Apache Spark

Solution


public static void main(String[] args) throws IOException {

    final String libSpectraPath = args[0];
    final String querySpectraPath = args[1];
    final String outputPath = args[2];

    final SparkConf conf = new SparkConf()
            .setAppName("Spark lib search")
            .setMaster("local[3]")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")              //Configure Spark to use Kryo serialization
            .set("spark.kryo.registrator", "org.expasy.mzjava.spark.MzJavaKryoRegistrator");    //Configure Spark to use the MzJavaKryoRegistrator to register classes
    final JavaSparkContext sc = new JavaSparkContext(conf);

    //Read the spectrum library and broadcast it to all nodes
    final Broadcast<List<PeptideConsensusSpectrum>> libSpectraBroadcast = sc.broadcast(
            IterativeReaders.toArrayList(new SptxtReader(new File(libSpectraPath), PeakList.Precision.DOUBLE))
    );

    //Set up the function that performs the library search
    final PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>> libSearchFunction =
            new PairFunction<MsnSpectrum, String, List<Tuple2<Double, Peptide>>>() {

                private transient SimFunc<PepLibPeakAnnotation, PeakAnnotation> simFunc;
                private transient SpectrumLibrary<PeptideConsensusSpectrum> library;

                @Override
                public Tuple2<String, List<Tuple2<Double, Peptide>>> call(final MsnSpectrum querySpectrum)
                        throws Exception {

                    if (simFunc == null) {
                        simFunc = new NdpSimFunc<>(0, new AbsoluteTolerance(0.02));
                        library = new DefaultSpectrumLibrary<>(new PpmTolerance(20), libSpectraBroadcast.getValue());
                    }

                    final List<Tuple2<Double, Peptide>> peptideMatches = new ArrayList<>();
                    final Procedure<PeptideConsensusSpectrum> simFuncProcedure = new Procedure<PeptideConsensusSpectrum>() {

                        @Override
                        public void execute(PeptideConsensusSpectrum libSpectrum) {

                            double score = simFunc.calcSimilarity(libSpectrum, querySpectrum);
                            if (score > 0.6)
                                peptideMatches.add(new Tuple2<>(score, libSpectrum.getPeptide()));
                        }
                    };
                    library.forEach(querySpectrum.getPrecursor(), simFuncProcedure);

                    return new Tuple2<>(querySpectrum.getId() + ":" + querySpectrum.getComment(), peptideMatches);
                }
            };

    // Read the query spectrum from a Hadoop sequence file
    final JavaRDD<MsnSpectrum> querySpectra = MzJavaSparkUtils.msnSpectra(sc, querySpectraPath);
    // Perform the library search
    final JavaPairRDD<String, List<Tuple2<Double, Peptide>>> searchResults = querySpectra.mapToPair(libSearchFunction);
    searchResults.saveAsTextFile(outputPath);
}

See also

See Recipe 5.1 for an example that uses Apache Spark