Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from pyspark.serializers import NoOpSerializer
0019 from pyspark.storagelevel import StorageLevel
0020 from pyspark.streaming import DStream
0021 from pyspark.util import _print_missing_jar
0022 
0023 
0024 __all__ = ['KinesisUtils', 'InitialPositionInStream', 'utf8_decoder']
0025 
0026 
0027 def utf8_decoder(s):
0028     """ Decode the unicode as UTF-8 """
0029     if s is None:
0030         return None
0031     return s.decode('utf-8')
0032 
0033 
0034 class KinesisUtils(object):
0035 
0036     @staticmethod
0037     def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,
0038                      initialPositionInStream, checkpointInterval,
0039                      storageLevel=StorageLevel.MEMORY_AND_DISK_2,
0040                      awsAccessKeyId=None, awsSecretKey=None, decoder=utf8_decoder,
0041                      stsAssumeRoleArn=None, stsSessionName=None, stsExternalId=None):
0042         """
0043         Create an input stream that pulls messages from a Kinesis stream. This uses the
0044         Kinesis Client Library (KCL) to pull messages from Kinesis.
0045 
0046         .. note:: The given AWS credentials will get saved in DStream checkpoints if checkpointing
0047             is enabled. Make sure that your checkpoint directory is secure.
0048 
0049         :param ssc:  StreamingContext object
0050         :param kinesisAppName:  Kinesis application name used by the Kinesis Client Library (KCL) to
0051                                 update DynamoDB
0052         :param streamName:  Kinesis stream name
0053         :param endpointUrl:  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
0054         :param regionName:  Name of region used by the Kinesis Client Library (KCL) to update
0055                             DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
0056         :param initialPositionInStream:  In the absence of Kinesis checkpoint info, this is the
0057                                          worker's initial starting position in the stream. The
0058                                          values are either the beginning of the stream per Kinesis'
0059                                          limit of 24 hours (InitialPositionInStream.TRIM_HORIZON) or
0060                                          the tip of the stream (InitialPositionInStream.LATEST).
0061         :param checkpointInterval:  Checkpoint interval for Kinesis checkpointing. See the Kinesis
0062                                     Spark Streaming documentation for more details on the different
0063                                     types of checkpoints.
0064         :param storageLevel:  Storage level to use for storing the received objects (default is
0065                               StorageLevel.MEMORY_AND_DISK_2)
0066         :param awsAccessKeyId:  AWS AccessKeyId (default is None. If None, will use
0067                                 DefaultAWSCredentialsProviderChain)
0068         :param awsSecretKey:  AWS SecretKey (default is None. If None, will use
0069                               DefaultAWSCredentialsProviderChain)
0070         :param decoder:  A function used to decode value (default is utf8_decoder)
0071         :param stsAssumeRoleArn: ARN of IAM role to assume when using STS sessions to read from
0072                                  the Kinesis stream (default is None).
0073         :param stsSessionName: Name to uniquely identify STS sessions used to read from Kinesis
0074                                stream, if STS is being used (default is None).
0075         :param stsExternalId: External ID that can be used to validate against the assumed IAM
0076                               role's trust policy, if STS is being used (default is None).
0077         :return: A DStream object
0078         """
0079         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
0080         jduration = ssc._jduration(checkpointInterval)
0081 
0082         try:
0083             helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
0084         except TypeError as e:
0085             if str(e) == "'JavaPackage' object is not callable":
0086                 _print_missing_jar(
0087                     "Streaming's Kinesis",
0088                     "streaming-kinesis-asl",
0089                     "streaming-kinesis-asl-assembly",
0090                     ssc.sparkContext.version)
0091             raise
0092         jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
0093                                       regionName, initialPositionInStream, jduration, jlevel,
0094                                       awsAccessKeyId, awsSecretKey, stsAssumeRoleArn,
0095                                       stsSessionName, stsExternalId)
0096         stream = DStream(jstream, ssc, NoOpSerializer())
0097         return stream.map(lambda v: decoder(v))
0098 
0099 
0100 class InitialPositionInStream(object):
0101     LATEST, TRIM_HORIZON = (0, 1)