0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from pyspark import keyword_only, since
0019 from pyspark.rdd import ignore_unicode_prefix
0020 from pyspark.sql import DataFrame
0021 from pyspark.ml.util import *
0022 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
0023 from pyspark.ml.param.shared import *
0024
0025 __all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"]
0026
0027
0028 class _FPGrowthParams(HasPredictionCol):
0029 """
0030 Params for :py:class:`FPGrowth` and :py:class:`FPGrowthModel`.
0031
0032 .. versionadded:: 3.0.0
0033 """
0034
0035 itemsCol = Param(Params._dummy(), "itemsCol",
0036 "items column name", typeConverter=TypeConverters.toString)
0037 minSupport = Param(
0038 Params._dummy(),
0039 "minSupport",
0040 "Minimal support level of the frequent pattern. [0.0, 1.0]. " +
0041 "Any pattern that appears more than (minSupport * size-of-the-dataset) " +
0042 "times will be output in the frequent itemsets.",
0043 typeConverter=TypeConverters.toFloat)
0044 numPartitions = Param(
0045 Params._dummy(),
0046 "numPartitions",
0047 "Number of partitions (at least 1) used by parallel FP-growth. " +
0048 "By default the param is not set, " +
0049 "and partition number of the input dataset is used.",
0050 typeConverter=TypeConverters.toInt)
0051 minConfidence = Param(
0052 Params._dummy(),
0053 "minConfidence",
0054 "Minimal confidence for generating Association Rule. [0.0, 1.0]. " +
0055 "minConfidence will not affect the mining for frequent itemsets, " +
0056 "but will affect the association rules generation.",
0057 typeConverter=TypeConverters.toFloat)
0058
0059 def getItemsCol(self):
0060 """
0061 Gets the value of itemsCol or its default value.
0062 """
0063 return self.getOrDefault(self.itemsCol)
0064
0065 def getMinSupport(self):
0066 """
0067 Gets the value of minSupport or its default value.
0068 """
0069 return self.getOrDefault(self.minSupport)
0070
0071 def getNumPartitions(self):
0072 """
0073 Gets the value of :py:attr:`numPartitions` or its default value.
0074 """
0075 return self.getOrDefault(self.numPartitions)
0076
0077 def getMinConfidence(self):
0078 """
0079 Gets the value of minConfidence or its default value.
0080 """
0081 return self.getOrDefault(self.minConfidence)
0082
0083
0084 class FPGrowthModel(JavaModel, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
0085 """
0086 Model fitted by FPGrowth.
0087
0088 .. versionadded:: 2.2.0
0089 """
0090
0091 @since("3.0.0")
0092 def setItemsCol(self, value):
0093 """
0094 Sets the value of :py:attr:`itemsCol`.
0095 """
0096 return self._set(itemsCol=value)
0097
0098 @since("3.0.0")
0099 def setMinConfidence(self, value):
0100 """
0101 Sets the value of :py:attr:`minConfidence`.
0102 """
0103 return self._set(minConfidence=value)
0104
0105 @since("3.0.0")
0106 def setPredictionCol(self, value):
0107 """
0108 Sets the value of :py:attr:`predictionCol`.
0109 """
0110 return self._set(predictionCol=value)
0111
0112 @property
0113 @since("2.2.0")
0114 def freqItemsets(self):
0115 """
0116 DataFrame with two columns:
0117 * `items` - Itemset of the same type as the input column.
0118 * `freq` - Frequency of the itemset (`LongType`).
0119 """
0120 return self._call_java("freqItemsets")
0121
0122 @property
0123 @since("2.2.0")
0124 def associationRules(self):
0125 """
0126 DataFrame with four columns:
0127 * `antecedent` - Array of the same type as the input column.
0128 * `consequent` - Array of the same type as the input column.
0129 * `confidence` - Confidence for the rule (`DoubleType`).
0130 * `lift` - Lift for the rule (`DoubleType`).
0131 """
0132 return self._call_java("associationRules")
0133
0134
0135 @ignore_unicode_prefix
0136 class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
0137 r"""
0138 A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in
0139 Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_.
0140 PFP distributes computation in such a way that each worker executes an
0141 independent group of mining tasks. The FP-Growth algorithm is described in
0142 Han et al., Mining frequent patterns without candidate generation [HAN2000]_
0143
0144 .. [LI2008] https://doi.org/10.1145/1454008.1454027
0145 .. [HAN2000] https://doi.org/10.1145/335191.335372
0146
0147 .. note:: null values in the feature column are ignored during fit().
0148 .. note:: Internally `transform` `collects` and `broadcasts` association rules.
0149
0150 >>> from pyspark.sql.functions import split
0151 >>> data = (spark.read
0152 ... .text("data/mllib/sample_fpgrowth.txt")
0153 ... .select(split("value", "\s+").alias("items")))
0154 >>> data.show(truncate=False)
0155 +------------------------+
0156 |items |
0157 +------------------------+
0158 |[r, z, h, k, p] |
0159 |[z, y, x, w, v, u, t, s]|
0160 |[s, x, o, n, r] |
0161 |[x, z, y, m, t, s, q, e]|
0162 |[z] |
0163 |[x, z, y, r, q, t, p] |
0164 +------------------------+
0165 ...
0166 >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
0167 >>> fpm = fp.fit(data)
0168 >>> fpm.setPredictionCol("newPrediction")
0169 FPGrowthModel...
0170 >>> fpm.freqItemsets.show(5)
0171 +---------+----+
0172 | items|freq|
0173 +---------+----+
0174 | [s]| 3|
0175 | [s, x]| 3|
0176 |[s, x, z]| 2|
0177 | [s, z]| 2|
0178 | [r]| 3|
0179 +---------+----+
0180 only showing top 5 rows
0181 ...
0182 >>> fpm.associationRules.show(5)
0183 +----------+----------+----------+----+
0184 |antecedent|consequent|confidence|lift|
0185 +----------+----------+----------+----+
0186 | [t, s]| [y]| 1.0| 2.0|
0187 | [t, s]| [x]| 1.0| 1.5|
0188 | [t, s]| [z]| 1.0| 1.2|
0189 | [p]| [r]| 1.0| 2.0|
0190 | [p]| [z]| 1.0| 1.2|
0191 +----------+----------+----------+----+
0192 only showing top 5 rows
0193 ...
0194 >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"])
0195 >>> sorted(fpm.transform(new_data).first().newPrediction)
0196 [u'x', u'y', u'z']
0197
0198 .. versionadded:: 2.2.0
0199 """
0200 @keyword_only
0201 def __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
0202 predictionCol="prediction", numPartitions=None):
0203 """
0204 __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
0205 predictionCol="prediction", numPartitions=None)
0206 """
0207 super(FPGrowth, self).__init__()
0208 self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid)
0209 self._setDefault(minSupport=0.3, minConfidence=0.8,
0210 itemsCol="items", predictionCol="prediction")
0211 kwargs = self._input_kwargs
0212 self.setParams(**kwargs)
0213
0214 @keyword_only
0215 @since("2.2.0")
0216 def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
0217 predictionCol="prediction", numPartitions=None):
0218 """
0219 setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
0220 predictionCol="prediction", numPartitions=None)
0221 """
0222 kwargs = self._input_kwargs
0223 return self._set(**kwargs)
0224
0225 def setItemsCol(self, value):
0226 """
0227 Sets the value of :py:attr:`itemsCol`.
0228 """
0229 return self._set(itemsCol=value)
0230
0231 def setMinSupport(self, value):
0232 """
0233 Sets the value of :py:attr:`minSupport`.
0234 """
0235 return self._set(minSupport=value)
0236
0237 def setNumPartitions(self, value):
0238 """
0239 Sets the value of :py:attr:`numPartitions`.
0240 """
0241 return self._set(numPartitions=value)
0242
0243 def setMinConfidence(self, value):
0244 """
0245 Sets the value of :py:attr:`minConfidence`.
0246 """
0247 return self._set(minConfidence=value)
0248
0249 def setPredictionCol(self, value):
0250 """
0251 Sets the value of :py:attr:`predictionCol`.
0252 """
0253 return self._set(predictionCol=value)
0254
0255 def _create_model(self, java_model):
0256 return FPGrowthModel(java_model)
0257
0258
0259 class PrefixSpan(JavaParams):
0260 """
0261 A parallel PrefixSpan algorithm to mine frequent sequential patterns.
0262 The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns
0263 Efficiently by Prefix-Projected Pattern Growth
0264 (see <a href="https://doi.org/10.1109/ICDE.2001.914830">here</a>).
0265 This class is not yet an Estimator/Transformer, use :py:func:`findFrequentSequentialPatterns`
0266 method to run the PrefixSpan algorithm.
0267
0268 @see <a href="https://en.wikipedia.org/wiki/Sequential_Pattern_Mining">Sequential Pattern Mining
0269 (Wikipedia)</a>
0270
0271 >>> from pyspark.ml.fpm import PrefixSpan
0272 >>> from pyspark.sql import Row
0273 >>> df = sc.parallelize([Row(sequence=[[1, 2], [3]]),
0274 ... Row(sequence=[[1], [3, 2], [1, 2]]),
0275 ... Row(sequence=[[1, 2], [5]]),
0276 ... Row(sequence=[[6]])]).toDF()
0277 >>> prefixSpan = PrefixSpan()
0278 >>> prefixSpan.getMaxLocalProjDBSize()
0279 32000000
0280 >>> prefixSpan.getSequenceCol()
0281 'sequence'
0282 >>> prefixSpan.setMinSupport(0.5)
0283 PrefixSpan...
0284 >>> prefixSpan.setMaxPatternLength(5)
0285 PrefixSpan...
0286 >>> prefixSpan.findFrequentSequentialPatterns(df).sort("sequence").show(truncate=False)
0287 +----------+----+
0288 |sequence |freq|
0289 +----------+----+
0290 |[[1]] |3 |
0291 |[[1], [3]]|2 |
0292 |[[2]] |3 |
0293 |[[2, 1]] |3 |
0294 |[[3]] |2 |
0295 +----------+----+
0296 ...
0297
0298 .. versionadded:: 2.4.0
0299 """
0300
0301 minSupport = Param(Params._dummy(), "minSupport", "The minimal support level of the " +
0302 "sequential pattern. Sequential pattern that appears more than " +
0303 "(minSupport * size-of-the-dataset) times will be output. Must be >= 0.",
0304 typeConverter=TypeConverters.toFloat)
0305
0306 maxPatternLength = Param(Params._dummy(), "maxPatternLength",
0307 "The maximal length of the sequential pattern. Must be > 0.",
0308 typeConverter=TypeConverters.toInt)
0309
0310 maxLocalProjDBSize = Param(Params._dummy(), "maxLocalProjDBSize",
0311 "The maximum number of items (including delimiters used in the " +
0312 "internal storage format) allowed in a projected database before " +
0313 "local processing. If a projected database exceeds this size, " +
0314 "another iteration of distributed prefix growth is run. " +
0315 "Must be > 0.",
0316 typeConverter=TypeConverters.toInt)
0317
0318 sequenceCol = Param(Params._dummy(), "sequenceCol", "The name of the sequence column in " +
0319 "dataset, rows with nulls in this column are ignored.",
0320 typeConverter=TypeConverters.toString)
0321
0322 @keyword_only
0323 def __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0324 sequenceCol="sequence"):
0325 """
0326 __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
0327 sequenceCol="sequence")
0328 """
0329 super(PrefixSpan, self).__init__()
0330 self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.PrefixSpan", self.uid)
0331 self._setDefault(minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0332 sequenceCol="sequence")
0333 kwargs = self._input_kwargs
0334 self.setParams(**kwargs)
0335
0336 @keyword_only
0337 @since("2.4.0")
0338 def setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0339 sequenceCol="sequence"):
0340 """
0341 setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
0342 sequenceCol="sequence")
0343 """
0344 kwargs = self._input_kwargs
0345 return self._set(**kwargs)
0346
0347 @since("3.0.0")
0348 def setMinSupport(self, value):
0349 """
0350 Sets the value of :py:attr:`minSupport`.
0351 """
0352 return self._set(minSupport=value)
0353
0354 @since("3.0.0")
0355 def getMinSupport(self):
0356 """
0357 Gets the value of minSupport or its default value.
0358 """
0359 return self.getOrDefault(self.minSupport)
0360
0361 @since("3.0.0")
0362 def setMaxPatternLength(self, value):
0363 """
0364 Sets the value of :py:attr:`maxPatternLength`.
0365 """
0366 return self._set(maxPatternLength=value)
0367
0368 @since("3.0.0")
0369 def getMaxPatternLength(self):
0370 """
0371 Gets the value of maxPatternLength or its default value.
0372 """
0373 return self.getOrDefault(self.maxPatternLength)
0374
0375 @since("3.0.0")
0376 def setMaxLocalProjDBSize(self, value):
0377 """
0378 Sets the value of :py:attr:`maxLocalProjDBSize`.
0379 """
0380 return self._set(maxLocalProjDBSize=value)
0381
0382 @since("3.0.0")
0383 def getMaxLocalProjDBSize(self):
0384 """
0385 Gets the value of maxLocalProjDBSize or its default value.
0386 """
0387 return self.getOrDefault(self.maxLocalProjDBSize)
0388
0389 @since("3.0.0")
0390 def setSequenceCol(self, value):
0391 """
0392 Sets the value of :py:attr:`sequenceCol`.
0393 """
0394 return self._set(sequenceCol=value)
0395
0396 @since("3.0.0")
0397 def getSequenceCol(self):
0398 """
0399 Gets the value of sequenceCol or its default value.
0400 """
0401 return self.getOrDefault(self.sequenceCol)
0402
0403 @since("2.4.0")
0404 def findFrequentSequentialPatterns(self, dataset):
0405 """
0406 Finds the complete set of frequent sequential patterns in the input sequences of itemsets.
0407
0408 :param dataset: A dataframe containing a sequence column which is
0409 `ArrayType(ArrayType(T))` type, T is the item type for the input dataset.
0410 :return: A `DataFrame` that contains columns of sequence and corresponding frequency.
0411 The schema of it will be:
0412 - `sequence: ArrayType(ArrayType(T))` (T is the item type)
0413 - `freq: Long`
0414
0415 .. versionadded:: 2.4.0
0416 """
0417
0418 self._transfer_params_to_java()
0419 jdf = self._java_obj.findFrequentSequentialPatterns(dataset._jdf)
0420 return DataFrame(jdf, dataset.sql_ctx)
0421
0422
0423 if __name__ == "__main__":
0424 import doctest
0425 import pyspark.ml.fpm
0426 from pyspark.sql import SparkSession
0427 globs = pyspark.ml.fpm.__dict__.copy()
0428
0429
0430 spark = SparkSession.builder\
0431 .master("local[2]")\
0432 .appName("ml.fpm tests")\
0433 .getOrCreate()
0434 sc = spark.sparkContext
0435 globs['sc'] = sc
0436 globs['spark'] = spark
0437 import tempfile
0438 temp_path = tempfile.mkdtemp()
0439 globs['temp_path'] = temp_path
0440 try:
0441 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0442 spark.stop()
0443 finally:
0444 from shutil import rmtree
0445 try:
0446 rmtree(temp_path)
0447 except OSError:
0448 pass
0449 if failure_count:
0450 sys.exit(-1)