Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 Randomly sampled RDDs.
0020 """
0021 from __future__ import print_function
0022 
0023 import sys
0024 
0025 from pyspark import SparkContext
0026 from pyspark.mllib.util import MLUtils
0027 
0028 
0029 if __name__ == "__main__":
0030     if len(sys.argv) not in [1, 2]:
0031         print("Usage: sampled_rdds <libsvm data file>", file=sys.stderr)
0032         sys.exit(-1)
0033     if len(sys.argv) == 2:
0034         datapath = sys.argv[1]
0035     else:
0036         datapath = 'data/mllib/sample_binary_classification_data.txt'
0037 
0038     sc = SparkContext(appName="PythonSampledRDDs")
0039 
0040     fraction = 0.1  # fraction of data to sample
0041 
0042     examples = MLUtils.loadLibSVMFile(sc, datapath)
0043     numExamples = examples.count()
0044     if numExamples == 0:
0045         print("Error: Data file had no samples to load.", file=sys.stderr)
0046         sys.exit(1)
0047     print('Loaded data with %d examples from file: %s' % (numExamples, datapath))
0048 
0049     # Example: RDD.sample() and RDD.takeSample()
0050     expectedSampleSize = int(numExamples * fraction)
0051     print('Sampling RDD using fraction %g.  Expected sample size = %d.'
0052           % (fraction, expectedSampleSize))
0053     sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
0054     print('  RDD.sample(): sample has %d examples' % sampledRDD.count())
0055     sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
0056     print('  RDD.takeSample(): sample has %d examples' % len(sampledArray))
0057 
0058     print()
0059 
0060     # Example: RDD.sampleByKey()
0061     keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
0062     print('  Keyed data using label (Int) as key ==> Orig')
0063     #  Count examples per label in original data.
0064     keyCountsA = keyedRDD.countByKey()
0065 
0066     #  Subsample, and count examples per label in sampled data.
0067     fractions = {}
0068     for k in keyCountsA.keys():
0069         fractions[k] = fraction
0070     sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
0071     keyCountsB = sampledByKeyRDD.countByKey()
0072     sizeB = sum(keyCountsB.values())
0073     print('  Sampled %d examples using approximate stratified sampling (by label). ==> Sample'
0074           % sizeB)
0075 
0076     #  Compare samples
0077     print('   \tFractions of examples with key')
0078     print('Key\tOrig\tSample')
0079     for k in sorted(keyCountsA.keys()):
0080         fracA = keyCountsA[k] / float(numExamples)
0081         if sizeB != 0:
0082             fracB = keyCountsB.get(k, 0) / float(sizeB)
0083         else:
0084             fracB = 0
0085         print('%d\t%g\t%g' % (k, fracA, fracB))
0086 
0087     sc.stop()