Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.streaming.kinesis;
0019 
0020 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
0021 import org.junit.Assert;
0022 import org.junit.Test;
0023 
0024 import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon;
0025 import org.apache.spark.storage.StorageLevel;
0026 import org.apache.spark.streaming.Duration;
0027 import org.apache.spark.streaming.LocalJavaStreamingContext;
0028 import org.apache.spark.streaming.Seconds;
0029 
0030 public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingContext {
0031   /**
0032    * Basic test to ensure that the KinesisDStream.Builder interface is accessible from Java.
0033    */
0034   @Test
0035   public void testJavaKinesisDStreamBuilder() {
0036     String streamName = "a-very-nice-stream-name";
0037     String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
0038     String region = "us-west-2";
0039     KinesisInitialPosition initialPosition = new TrimHorizon();
0040     String appName = "a-very-nice-kinesis-app";
0041     Duration checkpointInterval = Seconds.apply(30);
0042     StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
0043 
0044     KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
0045       .streamingContext(ssc)
0046       .streamName(streamName)
0047       .endpointUrl(endpointUrl)
0048       .regionName(region)
0049       .initialPosition(initialPosition)
0050       .checkpointAppName(appName)
0051       .checkpointInterval(checkpointInterval)
0052       .storageLevel(storageLevel)
0053       .build();
0054     Assert.assertEquals(streamName, kinesisDStream.streamName());
0055     Assert.assertEquals(endpointUrl, kinesisDStream.endpointUrl());
0056     Assert.assertEquals(region, kinesisDStream.regionName());
0057     Assert.assertEquals(initialPosition.getPosition(),
0058         kinesisDStream.initialPosition().getPosition());
0059     Assert.assertEquals(appName, kinesisDStream.checkpointAppName());
0060     Assert.assertEquals(checkpointInterval, kinesisDStream.checkpointInterval());
0061     Assert.assertEquals(storageLevel, kinesisDStream._storageLevel());
0062     ssc.stop();
0063   }
0064 
0065   /**
0066    * Test to ensure that the old API for InitialPositionInStream
0067    * is supported in KinesisDStream.Builder.
0068    * This test would be removed when we deprecate the KinesisUtils.
0069    */
0070   @Test
0071   public void testJavaKinesisDStreamBuilderOldApi() {
0072     String streamName = "a-very-nice-stream-name";
0073     String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
0074     String region = "us-west-2";
0075     String appName = "a-very-nice-kinesis-app";
0076     Duration checkpointInterval = Seconds.apply(30);
0077     StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
0078 
0079     KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
0080       .streamingContext(ssc)
0081       .streamName(streamName)
0082       .endpointUrl(endpointUrl)
0083       .regionName(region)
0084       .initialPositionInStream(InitialPositionInStream.LATEST)
0085       .checkpointAppName(appName)
0086       .checkpointInterval(checkpointInterval)
0087       .storageLevel(storageLevel)
0088       .build();
0089     Assert.assertEquals(streamName, kinesisDStream.streamName());
0090     Assert.assertEquals(endpointUrl, kinesisDStream.endpointUrl());
0091     Assert.assertEquals(region, kinesisDStream.regionName());
0092     Assert.assertEquals(InitialPositionInStream.LATEST,
0093         kinesisDStream.initialPosition().getPosition());
0094     Assert.assertEquals(appName, kinesisDStream.checkpointAppName());
0095     Assert.assertEquals(checkpointInterval, kinesisDStream.checkpointInterval());
0096     Assert.assertEquals(storageLevel, kinesisDStream._storageLevel());
0097     ssc.stop();
0098   }
0099 }