|
||||
0001 # 0002 # Licensed to the Apache Software Foundation (ASF) under one or more 0003 # contributor license agreements. See the NOTICE file distributed with 0004 # this work for additional information regarding copyright ownership. 0005 # The ASF licenses this file to You under the Apache License, Version 2.0 0006 # (the "License"); you may not use this file except in compliance with 0007 # the License. You may obtain a copy of the License at 0008 # 0009 # http://www.apache.org/licenses/LICENSE-2.0 0010 # 0011 # Unless required by applicable law or agreed to in writing, software 0012 # distributed under the License is distributed on an "AS IS" BASIS, 0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 # See the License for the specific language governing permissions and 0015 # limitations under the License. 0016 # 0017 0018 """ 0019 Create a queue of RDDs that will be mapped/reduced one at a time in 0020 1 second intervals. 0021 0022 To run this example use 0023 `$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py 0024 """ 0025 import time 0026 0027 from pyspark import SparkContext 0028 from pyspark.streaming import StreamingContext 0029 0030 if __name__ == "__main__": 0031 0032 sc = SparkContext(appName="PythonStreamingQueueStream") 0033 ssc = StreamingContext(sc, 1) 0034 0035 # Create the queue through which RDDs can be pushed to 0036 # a QueueInputDStream 0037 rddQueue = [] 0038 for i in range(5): 0039 rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)] 0040 0041 # Create the QueueInputDStream and use it do some processing 0042 inputStream = ssc.queueStream(rddQueue) 0043 mappedStream = inputStream.map(lambda x: (x % 10, 1)) 0044 reducedStream = mappedStream.reduceByKey(lambda a, b: a + b) 0045 reducedStream.pprint() 0046 0047 ssc.start() 0048 time.sleep(6) 0049 ssc.stop(stopSparkContext=True, stopGraceFully=True)
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |