0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.mllib.fpm;
0019
0020 import java.io.File;
0021 import java.util.Arrays;
0022 import java.util.List;
0023
0024 import org.junit.Assert;
0025 import org.junit.Test;
0026
0027 import org.apache.spark.SharedSparkSession;
0028 import org.apache.spark.api.java.JavaRDD;
0029 import org.apache.spark.mllib.fpm.PrefixSpan.FreqSequence;
0030 import org.apache.spark.util.Utils;
0031
0032 public class JavaPrefixSpanSuite extends SharedSparkSession {
0033
0034 @Test
0035 public void runPrefixSpan() {
0036 JavaRDD<List<List<Integer>>> sequences = jsc.parallelize(Arrays.asList(
0037 Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
0038 Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
0039 Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
0040 Arrays.asList(Arrays.asList(6))
0041 ), 2);
0042 PrefixSpan prefixSpan = new PrefixSpan()
0043 .setMinSupport(0.5)
0044 .setMaxPatternLength(5);
0045 PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
0046 JavaRDD<FreqSequence<Integer>> freqSeqs = model.freqSequences().toJavaRDD();
0047 List<FreqSequence<Integer>> localFreqSeqs = freqSeqs.collect();
0048 Assert.assertEquals(5, localFreqSeqs.size());
0049
0050 for (PrefixSpan.FreqSequence<Integer> freqSeq : localFreqSeqs) {
0051 List<List<Integer>> seq = freqSeq.javaSequence();
0052 long freq = freqSeq.freq();
0053 }
0054 }
0055
0056 @Test
0057 public void runPrefixSpanSaveLoad() {
0058 JavaRDD<List<List<Integer>>> sequences = jsc.parallelize(Arrays.asList(
0059 Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
0060 Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
0061 Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
0062 Arrays.asList(Arrays.asList(6))
0063 ), 2);
0064 PrefixSpan prefixSpan = new PrefixSpan()
0065 .setMinSupport(0.5)
0066 .setMaxPatternLength(5);
0067 PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
0068
0069 File tempDir = Utils.createTempDir(
0070 System.getProperty("java.io.tmpdir"), "JavaPrefixSpanSuite");
0071 String outputPath = tempDir.getPath();
0072
0073 try {
0074 model.save(spark.sparkContext(), outputPath);
0075 @SuppressWarnings("unchecked")
0076 PrefixSpanModel<Integer> newModel =
0077 (PrefixSpanModel<Integer>) PrefixSpanModel.load(spark.sparkContext(), outputPath);
0078 JavaRDD<FreqSequence<Integer>> freqSeqs = newModel.freqSequences().toJavaRDD();
0079 List<FreqSequence<Integer>> localFreqSeqs = freqSeqs.collect();
0080 Assert.assertEquals(5, localFreqSeqs.size());
0081
0082 for (PrefixSpan.FreqSequence<Integer> freqSeq : localFreqSeqs) {
0083 List<List<Integer>> seq = freqSeq.javaSequence();
0084 long freq = freqSeq.freq();
0085 }
0086 } finally {
0087 Utils.deleteRecursively(tempDir);
0088 }
0089
0090
0091 }
0092 }