Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.streaming;
0019 
0020 import org.apache.spark.api.java.JavaRDD;
0021 import org.apache.spark.api.java.function.VoidFunction;
0022 import org.apache.spark.streaming.api.java.JavaDStream;
0023 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
0024 import org.apache.spark.streaming.api.java.JavaStreamingContext;
0025 
0026 import com.google.common.io.Closeables;
0027 import org.junit.After;
0028 import org.junit.Assert;
0029 import org.junit.Before;
0030 import org.junit.Test;
0031 
0032 import org.apache.spark.storage.StorageLevel;
0033 import org.apache.spark.streaming.receiver.Receiver;
0034 import org.apache.spark.api.java.function.Function;
0035 
0036 import java.io.BufferedReader;
0037 import java.io.InputStreamReader;
0038 import java.io.Serializable;
0039 import java.net.ConnectException;
0040 import java.net.Socket;
0041 import java.nio.charset.StandardCharsets;
0042 import java.util.concurrent.TimeUnit;
0043 import java.util.concurrent.atomic.AtomicLong;
0044 
0045 public class JavaReceiverAPISuite implements Serializable {
0046 
0047   @Before
0048   public void setUp() {
0049     System.clearProperty("spark.streaming.clock");
0050   }
0051 
0052   @After
0053   public void tearDown() {
0054     System.clearProperty("spark.streaming.clock");
0055   }
0056 
0057   @Test
0058   public void testReceiver() throws InterruptedException {
0059     TestServer server = new TestServer(0);
0060     server.start();
0061 
0062     AtomicLong dataCounter = new AtomicLong(0);
0063 
0064     try {
0065       JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
0066       JavaReceiverInputDStream<String> input =
0067         ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
0068       JavaDStream<String> mapped = input.map((Function<String, String>) v1 -> v1 + ".");
0069       mapped.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
0070         long count = rdd.count();
0071         dataCounter.addAndGet(count);
0072       });
0073 
0074       ssc.start();
0075       long startTimeNs = System.nanoTime();
0076       long timeout = TimeUnit.SECONDS.toNanos(10);
0077 
0078       Thread.sleep(200);
0079       for (int i = 0; i < 6; i++) {
0080         server.send(i + "\n"); // \n to make sure these are separate lines
0081         Thread.sleep(100);
0082       }
0083       while (dataCounter.get() == 0 && System.nanoTime() - startTimeNs < timeout) {
0084         Thread.sleep(100);
0085       }
0086       ssc.stop();
0087       Assert.assertTrue(dataCounter.get() > 0);
0088     } finally {
0089       server.stop();
0090     }
0091   }
0092 
0093   private static class JavaSocketReceiver extends Receiver<String> {
0094 
0095     private String host = null;
0096     private int port = -1;
0097 
0098     JavaSocketReceiver(String host_ , int port_) {
0099       super(StorageLevel.MEMORY_AND_DISK());
0100       host = host_;
0101       port = port_;
0102     }
0103 
0104     @Override
0105     public void onStart() {
0106       new Thread(this::receive).start();
0107     }
0108 
0109     @Override
0110     public void onStop() {
0111     }
0112 
0113     private void receive() {
0114       try {
0115         Socket socket = null;
0116         BufferedReader in = null;
0117         try {
0118           socket = new Socket(host, port);
0119           in = new BufferedReader(
0120               new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
0121           String userInput;
0122           while ((userInput = in.readLine()) != null) {
0123             store(userInput);
0124           }
0125         } finally {
0126           Closeables.close(in, /* swallowIOException = */ true);
0127           Closeables.close(socket,  /* swallowIOException = */ true);
0128         }
0129       } catch(ConnectException ce) {
0130         ce.printStackTrace();
0131         restart("Could not connect", ce);
0132       } catch(Throwable t) {
0133         t.printStackTrace();
0134         restart("Error receiving data", t);
0135       }
0136     }
0137   }
0138 
0139 }