0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Randomly sampled RDDs.
0020 """
0021 from __future__ import print_function
0022
0023 import sys
0024
0025 from pyspark import SparkContext
0026 from pyspark.mllib.util import MLUtils
0027
0028
0029 if __name__ == "__main__":
0030 if len(sys.argv) not in [1, 2]:
0031 print("Usage: sampled_rdds <libsvm data file>", file=sys.stderr)
0032 sys.exit(-1)
0033 if len(sys.argv) == 2:
0034 datapath = sys.argv[1]
0035 else:
0036 datapath = 'data/mllib/sample_binary_classification_data.txt'
0037
0038 sc = SparkContext(appName="PythonSampledRDDs")
0039
0040 fraction = 0.1
0041
0042 examples = MLUtils.loadLibSVMFile(sc, datapath)
0043 numExamples = examples.count()
0044 if numExamples == 0:
0045 print("Error: Data file had no samples to load.", file=sys.stderr)
0046 sys.exit(1)
0047 print('Loaded data with %d examples from file: %s' % (numExamples, datapath))
0048
0049
0050 expectedSampleSize = int(numExamples * fraction)
0051 print('Sampling RDD using fraction %g. Expected sample size = %d.'
0052 % (fraction, expectedSampleSize))
0053 sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
0054 print(' RDD.sample(): sample has %d examples' % sampledRDD.count())
0055 sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
0056 print(' RDD.takeSample(): sample has %d examples' % len(sampledArray))
0057
0058 print()
0059
0060
0061 keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
0062 print(' Keyed data using label (Int) as key ==> Orig')
0063
0064 keyCountsA = keyedRDD.countByKey()
0065
0066
0067 fractions = {}
0068 for k in keyCountsA.keys():
0069 fractions[k] = fraction
0070 sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
0071 keyCountsB = sampledByKeyRDD.countByKey()
0072 sizeB = sum(keyCountsB.values())
0073 print(' Sampled %d examples using approximate stratified sampling (by label). ==> Sample'
0074 % sizeB)
0075
0076
0077 print(' \tFractions of examples with key')
0078 print('Key\tOrig\tSample')
0079 for k in sorted(keyCountsA.keys()):
0080 fracA = keyCountsA[k] / float(numExamples)
0081 if sizeB != 0:
0082 fracB = keyCountsB.get(k, 0) / float(sizeB)
0083 else:
0084 fracB = 0
0085 print('%d\t%g\t%g' % (k, fracA, fracB))
0086
0087 sc.stop()