Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.streaming.kinesis;
0018 
0019 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
0020 
0021 import java.io.Serializable;
0022 import java.util.Date;
0023 
0024 /**
0025  * A java wrapper for exposing [[InitialPositionInStream]]
0026  * to the corresponding Kinesis readers.
0027  */
0028 interface KinesisInitialPosition {
0029     InitialPositionInStream getPosition();
0030 }
0031 
0032 public class KinesisInitialPositions {
0033     public static class Latest implements KinesisInitialPosition, Serializable {
0034         public Latest() {}
0035 
0036         @Override
0037         public InitialPositionInStream getPosition() {
0038             return InitialPositionInStream.LATEST;
0039         }
0040     }
0041 
0042     public static class TrimHorizon implements KinesisInitialPosition, Serializable {
0043         public TrimHorizon() {}
0044 
0045         @Override
0046         public InitialPositionInStream getPosition() {
0047             return InitialPositionInStream.TRIM_HORIZON;
0048         }
0049     }
0050 
0051     public static class AtTimestamp implements KinesisInitialPosition, Serializable {
0052         private Date timestamp;
0053 
0054         public AtTimestamp(Date timestamp) {
0055             this.timestamp = timestamp;
0056         }
0057 
0058         @Override
0059         public InitialPositionInStream getPosition() {
0060             return InitialPositionInStream.AT_TIMESTAMP;
0061         }
0062 
0063         public Date getTimestamp() {
0064             return timestamp;
0065         }
0066     }
0067 
0068 
0069     /**
0070      * Returns instance of [[KinesisInitialPosition]] based on the passed
0071      * [[InitialPositionInStream]]. This method is used in KinesisUtils for translating the
0072      * InitialPositionInStream to InitialPosition. This function would be removed when we deprecate
0073      * the KinesisUtils.
0074      *
0075      * @return [[InitialPosition]]
0076      */
0077     public static KinesisInitialPosition fromKinesisInitialPosition(
0078             InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException {
0079         if (initialPositionInStream == InitialPositionInStream.LATEST) {
0080             return new Latest();
0081         } else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
0082             return new TrimHorizon();
0083         } else {
0084             // InitialPositionInStream.AT_TIMESTAMP is not supported.
0085             // Use InitialPosition.atTimestamp(timestamp) instead.
0086             throw new UnsupportedOperationException(
0087                     "Only InitialPositionInStream.LATEST and InitialPositionInStream." +
0088                             "TRIM_HORIZON supported in initialPositionInStream(). Please use " +
0089                             "the initialPosition() from builder API in KinesisInputDStream for " +
0090                             "using InitialPositionInStream.AT_TIMESTAMP");
0091         }
0092     }
0093 }