0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from pyspark.serializers import NoOpSerializer
0019 from pyspark.storagelevel import StorageLevel
0020 from pyspark.streaming import DStream
0021 from pyspark.util import _print_missing_jar
0022
0023
0024 __all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
0025
0026
0027 def utf8_decoder(s):
0028 """ Decode the unicode as UTF-8 """
0029 if s is None:
0030 return None
0031 return s.decode('utf-8')
0032
0033
0034 class KinesisUtils(object):
0035
0036 @staticmethod
0037 def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
0038 initialPositionInStream, checkpointInterval,
0039 storageLevel=StorageLevel.MEMORY_AND_DISK_2,
0040 awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder,
0041 stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None):
0042 """
0043 Create an input stream that pulls messages from a Kinesis stream. This uses the
0044 Kinesis Client Library (KCL) to pull messages from Kinesis.
0045
0046 .. note:: The given AWS credentials will get saved in DStream checkpoints if checkpointing
0047 is enabled. Make sure that your checkpoint directory is secure.
0048
0049 :param ssc: StreamingContext object
0050 :param kinesisAppName: Kinesis application name used by the Kinesis Client Library (KCL) to
0051 update DynamoDB
0052 :param streamName: Kinesis stream name
0053 :param endpointUrl: Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
0054 :param regionName: Name of region used by the Kinesis Client Library (KCL) to update
0055 DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
0056 :param initialPositionInStream: In the absence of Kinesis checkpoint info, this is the
0057 worker's initial starting position in the stream. The
0058 values are either the beginning of the stream per Kinesis'
0059 limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
0060 the tip of the stream (InitialPositionInStream.LATEST).
0061 :param checkpointInterval: Checkpoint interval for Kinesis checkpointing. See the Kinesis
0062 Spark Streaming documentation for more details on the different
0063 types of checkpoints.
0064 :param storageLevel: Storage level to use for storing the received objects (default is
0065 StorageLevel.MEMORY_AND_DISK_2)
0066 :param awsAccessKeyId: AWS AccessKeyId (default is None. If None, will use
0067 DefaultAWSCredentialsProviderChain)
0068 :param awsSecretKey: AWS SecretKey (default is None. If None, will use
0069 DefaultAWSCredentialsProviderChain)
0070 :param decoder: A function used to decode value (default is utf8_decoder)
0071 :param stsAssumeRoleArn: ARN of IAM role to assume when using STS sessions to read from
0072 the Kinesis stream (default is None).
0073 :param stsSessionName: Name to uniquely identify STS sessions used to read from Kinesis
0074 stream, if STS is being used (default is None).
0075 :param stsExternalId: External ID that can be used to validate against the assumed IAM
0076 role's trust policy, if STS is being used (default is None).
0077 :return: A DStream object
0078 """
0079 jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
0080 jduration = ssc._jduration(checkpointInterval)
0081
0082 try:
0083 helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
0084 except TypeError as e:
0085 if str(e) == "'JavaPackage' object is not callable":
0086 _print_missing_jar(
0087 "Streaming's Kinesis",
0088 "streaming-kinesis-asl",
0089 "streaming-kinesis-asl-assembly",
0090 ssc.sparkContext.version)
0091 raise
0092 jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
0093 regionName, initialPositionInStream, jduration, jlevel,
0094 awsAccessKeyId, awsSecretKey, stsAssumeRoleArn,
0095 stsSessionName, stsExternalId)
0096 stream = DStream(jstream, ssc, NoOpSerializer())
0097 return stream.map(lambda v: decoder(v))
0098
0099
0100 class InitialPositionInStream(object):
0101 LATEST, TRIM_HORIZON = (0, 1)