0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.streaming.kinesis;
0019
0020 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
0021 import org.junit.Assert;
0022 import org.junit.Test;
0023
0024 import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon;
0025 import org.apache.spark.storage.StorageLevel;
0026 import org.apache.spark.streaming.Duration;
0027 import org.apache.spark.streaming.LocalJavaStreamingContext;
0028 import org.apache.spark.streaming.Seconds;
0029
0030 public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingContext {
0031
0032
0033
0034 @Test
0035 public void testJavaKinesisDStreamBuilder() {
0036 String streamName = "a-very-nice-stream-name";
0037 String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
0038 String region = "us-west-2";
0039 KinesisInitialPosition initialPosition = new TrimHorizon();
0040 String appName = "a-very-nice-kinesis-app";
0041 Duration checkpointInterval = Seconds.apply(30);
0042 StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
0043
0044 KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
0045 .streamingContext(ssc)
0046 .streamName(streamName)
0047 .endpointUrl(endpointUrl)
0048 .regionName(region)
0049 .initialPosition(initialPosition)
0050 .checkpointAppName(appName)
0051 .checkpointInterval(checkpointInterval)
0052 .storageLevel(storageLevel)
0053 .build();
0054 Assert.assertEquals(streamName, kinesisDStream.streamName());
0055 Assert.assertEquals(endpointUrl, kinesisDStream.endpointUrl());
0056 Assert.assertEquals(region, kinesisDStream.regionName());
0057 Assert.assertEquals(initialPosition.getPosition(),
0058 kinesisDStream.initialPosition().getPosition());
0059 Assert.assertEquals(appName, kinesisDStream.checkpointAppName());
0060 Assert.assertEquals(checkpointInterval, kinesisDStream.checkpointInterval());
0061 Assert.assertEquals(storageLevel, kinesisDStream._storageLevel());
0062 ssc.stop();
0063 }
0064
0065
0066
0067
0068
0069
0070 @Test
0071 public void testJavaKinesisDStreamBuilderOldApi() {
0072 String streamName = "a-very-nice-stream-name";
0073 String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
0074 String region = "us-west-2";
0075 String appName = "a-very-nice-kinesis-app";
0076 Duration checkpointInterval = Seconds.apply(30);
0077 StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
0078
0079 KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
0080 .streamingContext(ssc)
0081 .streamName(streamName)
0082 .endpointUrl(endpointUrl)
0083 .regionName(region)
0084 .initialPositionInStream(InitialPositionInStream.LATEST)
0085 .checkpointAppName(appName)
0086 .checkpointInterval(checkpointInterval)
0087 .storageLevel(storageLevel)
0088 .build();
0089 Assert.assertEquals(streamName, kinesisDStream.streamName());
0090 Assert.assertEquals(endpointUrl, kinesisDStream.endpointUrl());
0091 Assert.assertEquals(region, kinesisDStream.regionName());
0092 Assert.assertEquals(InitialPositionInStream.LATEST,
0093 kinesisDStream.initialPosition().getPosition());
0094 Assert.assertEquals(appName, kinesisDStream.checkpointAppName());
0095 Assert.assertEquals(checkpointInterval, kinesisDStream.checkpointInterval());
0096 Assert.assertEquals(storageLevel, kinesisDStream._storageLevel());
0097 ssc.stop();
0098 }
0099 }