0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.streaming.kinesis;
0018
0019 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
0020
0021 import java.io.Serializable;
0022 import java.util.Date;
0023
0024
0025
0026
0027
0028 interface KinesisInitialPosition {
0029 InitialPositionInStream getPosition();
0030 }
0031
0032 public class KinesisInitialPositions {
0033 public static class Latest implements KinesisInitialPosition, Serializable {
0034 public Latest() {}
0035
0036 @Override
0037 public InitialPositionInStream getPosition() {
0038 return InitialPositionInStream.LATEST;
0039 }
0040 }
0041
0042 public static class TrimHorizon implements KinesisInitialPosition, Serializable {
0043 public TrimHorizon() {}
0044
0045 @Override
0046 public InitialPositionInStream getPosition() {
0047 return InitialPositionInStream.TRIM_HORIZON;
0048 }
0049 }
0050
0051 public static class AtTimestamp implements KinesisInitialPosition, Serializable {
0052 private Date timestamp;
0053
0054 public AtTimestamp(Date timestamp) {
0055 this.timestamp = timestamp;
0056 }
0057
0058 @Override
0059 public InitialPositionInStream getPosition() {
0060 return InitialPositionInStream.AT_TIMESTAMP;
0061 }
0062
0063 public Date getTimestamp() {
0064 return timestamp;
0065 }
0066 }
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 public static KinesisInitialPosition fromKinesisInitialPosition(
0078 InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException {
0079 if (initialPositionInStream == InitialPositionInStream.LATEST) {
0080 return new Latest();
0081 } else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
0082 return new TrimHorizon();
0083 } else {
0084
0085
0086 throw new UnsupportedOperationException(
0087 "Only InitialPositionInStream.LATEST and InitialPositionInStream." +
0088 "TRIM_HORIZON supported in initialPositionInStream(). Please use " +
0089 "the initialPosition() from builder API in KinesisInputDStream for " +
0090 "using InitialPositionInStream.AT_TIMESTAMP");
0091 }
0092 }
0093 }