0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming;
0019
0020 import org.apache.spark.api.java.JavaRDD;
0021 import org.apache.spark.api.java.function.VoidFunction;
0022 import org.apache.spark.streaming.api.java.JavaDStream;
0023 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0024 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0025
0026 import com.google.common.io.Closeables;
0027 import org.junit.After;
0028 import org.junit.Assert;
0029 import org.junit.Before;
0030 import org.junit.Test;
0031
0032 import org.apache.spark.storage.StorageLevel;
0033 import org.apache.spark.streaming.receiver.Receiver;
0034 import org.apache.spark.api.java.function.Function;
0035
0036 import java.io.BufferedReader;
0037 import java.io.InputStreamReader;
0038 import java.io.Serializable;
0039 import java.net.ConnectException;
0040 import java.net.Socket;
0041 import java.nio.charset.StandardCharsets;
0042 import java.util.concurrent.TimeUnit;
0043 import java.util.concurrent.atomic.AtomicLong;
0044
0045 public class JavaReceiverAPISuite implements Serializable {
0046
0047 @Before
0048 public void setUp() {
0049 System.clearProperty("spark.streaming.clock");
0050 }
0051
0052 @After
0053 public void tearDown() {
0054 System.clearProperty("spark.streaming.clock");
0055 }
0056
0057 @Test
0058 public void testReceiver() throws InterruptedException {
0059 TestServer server = new TestServer(0);
0060 server.start();
0061
0062 AtomicLong dataCounter = new AtomicLong(0);
0063
0064 try {
0065 JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
0066 JavaReceiverInputDStream<String> input =
0067 ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
0068 JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + ".");
0069 mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
0070 long count = rdd.count();
0071 dataCounter.addAndGet(count);
0072 });
0073
0074 ssc.start();
0075 long startTimeNs = System.nanoTime();
0076 long timeout = TimeUnit.SECONDS.toNanos(10);
0077
0078 Thread.sleep(200);
0079 for (int i = 0; i < 6; i++) {
0080 server.send(i + "\n");
0081 Thread.sleep(100);
0082 }
0083 while (dataCounter.get() == 0 && System.nanoTime() - startTimeNs < timeout) {
0084 Thread.sleep(100);
0085 }
0086 ssc.stop();
0087 Assert.assertTrue(dataCounter.get() > 0);
0088 } finally {
0089 server.stop();
0090 }
0091 }
0092
0093 private static class JavaSocketReceiver extends Receiver<String> {
0094
0095 private String host = null;
0096 private int port = -1;
0097
0098 JavaSocketReceiver(String host_ , int port_) {
0099 super(StorageLevel.MEMORY_AND_DISK());
0100 host = host_;
0101 port = port_;
0102 }
0103
0104 @Override
0105 public void onStart() {
0106 new Thread(this::receive).start();
0107 }
0108
0109 @Override
0110 public void onStop() {
0111 }
0112
0113 private void receive() {
0114 try {
0115 Socket socket = null;
0116 BufferedReader in = null;
0117 try {
0118 socket = new Socket(host, port);
0119 in = new BufferedReader(
0120 new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
0121 String userInput;
0122 while ((userInput = in.readLine()) != null) {
0123 store(userInput);
0124 }
0125 } finally {
0126 Closeables.close(in, true);
0127 Closeables.close(socket, true);
0128 }
0129 } catch(ConnectException ce) {
0130 ce.printStackTrace();
0131 restart("Could not connect", ce);
0132 } catch(Throwable t) {
0133 t.printStackTrace();
0134 restart("Error receiving data", t);
0135 }
0136 }
0137 }
0138
0139 }