0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql.streaming;
0019
0020 import java.io.File;
0021 import java.util.concurrent.TimeoutException;
0022
0023 import org.junit.After;
0024 import org.junit.Before;
0025 import org.junit.Test;
0026
0027 import org.apache.spark.api.java.function.VoidFunction2;
0028 import org.apache.spark.sql.Dataset;
0029 import org.apache.spark.sql.ForeachWriter;
0030 import org.apache.spark.sql.SparkSession;
0031 import org.apache.spark.sql.streaming.StreamingQuery;
0032 import org.apache.spark.sql.test.TestSparkSession;
0033 import org.apache.spark.util.Utils;
0034
0035 public class JavaDataStreamReaderWriterSuite {
0036 private SparkSession spark;
0037 private String input;
0038
0039 @Before
0040 public void setUp() {
0041 spark = new TestSparkSession();
0042 input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString();
0043 }
0044
0045 @After
0046 public void tearDown() {
0047 try {
0048 Utils.deleteRecursively(new File(input));
0049 } finally {
0050 spark.stop();
0051 spark = null;
0052 }
0053 }
0054
0055 @Test
0056 public void testForeachBatchAPI() throws TimeoutException {
0057 StreamingQuery query = spark
0058 .readStream()
0059 .textFile(input)
0060 .writeStream()
0061 .foreachBatch(new VoidFunction2<Dataset<String>, Long>() {
0062 @Override
0063 public void call(Dataset<String> v1, Long v2) throws Exception {}
0064 })
0065 .start();
0066 query.stop();
0067 }
0068
0069 @Test
0070 public void testForeachAPI() throws TimeoutException {
0071 StreamingQuery query = spark
0072 .readStream()
0073 .textFile(input)
0074 .writeStream()
0075 .foreach(new ForeachWriter<String>() {
0076 @Override
0077 public boolean open(long partitionId, long epochId) {
0078 return true;
0079 }
0080
0081 @Override
0082 public void process(String value) {}
0083
0084 @Override
0085 public void close(Throwable errorOrNull) {}
0086 })
0087 .start();
0088 query.stop();
0089 }
0090 }