Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from pyspark import keyword_only, since
0019 from pyspark.rdd import ignore_unicode_prefix
0020 from pyspark.sql import DataFrame
0021 from pyspark.ml.util import *
0022 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
0023 from pyspark.ml.param.shared import *
0024 
0025 __all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"]
0026 
0027 
0028 class _FPGrowthParams(HasPredictionCol):
0029     """
0030     Params for :py:class:`FPGrowth` and :py:class:`FPGrowthModel`.
0031 
0032     .. versionadded:: 3.0.0
0033     """
0034 
0035     itemsCol = Param(Params._dummy(), "itemsCol",
0036                      "items column name", typeConverter=TypeConverters.toString)
0037     minSupport = Param(
0038         Params._dummy(),
0039         "minSupport",
0040         "Minimal support level of the frequent pattern. [0.0, 1.0]. " +
0041         "Any pattern that appears more than (minSupport * size-of-the-dataset) " +
0042         "times will be output in the frequent itemsets.",
0043         typeConverter=TypeConverters.toFloat)
0044     numPartitions = Param(
0045         Params._dummy(),
0046         "numPartitions",
0047         "Number of partitions (at least 1) used by parallel FP-growth. " +
0048         "By default the param is not set, " +
0049         "and partition number of the input dataset is used.",
0050         typeConverter=TypeConverters.toInt)
0051     minConfidence = Param(
0052         Params._dummy(),
0053         "minConfidence",
0054         "Minimal confidence for generating Association Rule. [0.0, 1.0]. " +
0055         "minConfidence will not affect the mining for frequent itemsets, " +
0056         "but will affect the association rules generation.",
0057         typeConverter=TypeConverters.toFloat)
0058 
0059     def getItemsCol(self):
0060         """
0061         Gets the value of itemsCol or its default value.
0062         """
0063         return self.getOrDefault(self.itemsCol)
0064 
0065     def getMinSupport(self):
0066         """
0067         Gets the value of minSupport or its default value.
0068         """
0069         return self.getOrDefault(self.minSupport)
0070 
0071     def getNumPartitions(self):
0072         """
0073         Gets the value of :py:attr:`numPartitions` or its default value.
0074         """
0075         return self.getOrDefault(self.numPartitions)
0076 
0077     def getMinConfidence(self):
0078         """
0079         Gets the value of minConfidence or its default value.
0080         """
0081         return self.getOrDefault(self.minConfidence)
0082 
0083 
0084 class FPGrowthModel(JavaModel, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
0085     """
0086     Model fitted by FPGrowth.
0087 
0088     .. versionadded:: 2.2.0
0089     """
0090 
0091     @since("3.0.0")
0092     def setItemsCol(self, value):
0093         """
0094         Sets the value of :py:attr:`itemsCol`.
0095         """
0096         return self._set(itemsCol=value)
0097 
0098     @since("3.0.0")
0099     def setMinConfidence(self, value):
0100         """
0101         Sets the value of :py:attr:`minConfidence`.
0102         """
0103         return self._set(minConfidence=value)
0104 
0105     @since("3.0.0")
0106     def setPredictionCol(self, value):
0107         """
0108         Sets the value of :py:attr:`predictionCol`.
0109         """
0110         return self._set(predictionCol=value)
0111 
0112     @property
0113     @since("2.2.0")
0114     def freqItemsets(self):
0115         """
0116         DataFrame with two columns:
0117         * `items` - Itemset of the same type as the input column.
0118         * `freq`  - Frequency of the itemset (`LongType`).
0119         """
0120         return self._call_java("freqItemsets")
0121 
0122     @property
0123     @since("2.2.0")
0124     def associationRules(self):
0125         """
0126         DataFrame with four columns:
0127         * `antecedent`  - Array of the same type as the input column.
0128         * `consequent`  - Array of the same type as the input column.
0129         * `confidence`  - Confidence for the rule (`DoubleType`).
0130         * `lift`        - Lift for the rule (`DoubleType`).
0131         """
0132         return self._call_java("associationRules")
0133 
0134 
0135 @ignore_unicode_prefix
0136 class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
0137     r"""
0138     A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in
0139     Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_.
0140     PFP distributes computation in such a way that each worker executes an
0141     independent group of mining tasks. The FP-Growth algorithm is described in
0142     Han et al., Mining frequent patterns without candidate generation [HAN2000]_
0143 
0144     .. [LI2008] https://doi.org/10.1145/1454008.1454027
0145     .. [HAN2000] https://doi.org/10.1145/335191.335372
0146 
0147     .. note:: null values in the feature column are ignored during fit().
0148     .. note:: Internally `transform` `collects` and `broadcasts` association rules.
0149 
0150     >>> from pyspark.sql.functions import split
0151     >>> data = (spark.read
0152     ...     .text("data/mllib/sample_fpgrowth.txt")
0153     ...     .select(split("value", "\s+").alias("items")))
0154     >>> data.show(truncate=False)
0155     +------------------------+
0156     |items                   |
0157     +------------------------+
0158     |[r, z, h, k, p]         |
0159     |[z, y, x, w, v, u, t, s]|
0160     |[s, x, o, n, r]         |
0161     |[x, z, y, m, t, s, q, e]|
0162     |[z]                     |
0163     |[x, z, y, r, q, t, p]   |
0164     +------------------------+
0165     ...
0166     >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
0167     >>> fpm = fp.fit(data)
0168     >>> fpm.setPredictionCol("newPrediction")
0169     FPGrowthModel...
0170     >>> fpm.freqItemsets.show(5)
0171     +---------+----+
0172     |    items|freq|
0173     +---------+----+
0174     |      [s]|   3|
0175     |   [s, x]|   3|
0176     |[s, x, z]|   2|
0177     |   [s, z]|   2|
0178     |      [r]|   3|
0179     +---------+----+
0180     only showing top 5 rows
0181     ...
0182     >>> fpm.associationRules.show(5)
0183     +----------+----------+----------+----+
0184     |antecedent|consequent|confidence|lift|
0185     +----------+----------+----------+----+
0186     |    [t, s]|       [y]|       1.0| 2.0|
0187     |    [t, s]|       [x]|       1.0| 1.5|
0188     |    [t, s]|       [z]|       1.0| 1.2|
0189     |       [p]|       [r]|       1.0| 2.0|
0190     |       [p]|       [z]|       1.0| 1.2|
0191     +----------+----------+----------+----+
0192     only showing top 5 rows
0193     ...
0194     >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"])
0195     >>> sorted(fpm.transform(new_data).first().newPrediction)
0196     [u'x', u'y', u'z']
0197 
0198     .. versionadded:: 2.2.0
0199     """
0200     @keyword_only
0201     def __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
0202                  predictionCol="prediction", numPartitions=None):
0203         """
0204         __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
0205                  predictionCol="prediction", numPartitions=None)
0206         """
0207         super(FPGrowth, self).__init__()
0208         self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid)
0209         self._setDefault(minSupport=0.3, minConfidence=0.8,
0210                          itemsCol="items", predictionCol="prediction")
0211         kwargs = self._input_kwargs
0212         self.setParams(**kwargs)
0213 
0214     @keyword_only
0215     @since("2.2.0")
0216     def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items",
0217                   predictionCol="prediction", numPartitions=None):
0218         """
0219         setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
0220                   predictionCol="prediction", numPartitions=None)
0221         """
0222         kwargs = self._input_kwargs
0223         return self._set(**kwargs)
0224 
0225     def setItemsCol(self, value):
0226         """
0227         Sets the value of :py:attr:`itemsCol`.
0228         """
0229         return self._set(itemsCol=value)
0230 
0231     def setMinSupport(self, value):
0232         """
0233         Sets the value of :py:attr:`minSupport`.
0234         """
0235         return self._set(minSupport=value)
0236 
0237     def setNumPartitions(self, value):
0238         """
0239         Sets the value of :py:attr:`numPartitions`.
0240         """
0241         return self._set(numPartitions=value)
0242 
0243     def setMinConfidence(self, value):
0244         """
0245         Sets the value of :py:attr:`minConfidence`.
0246         """
0247         return self._set(minConfidence=value)
0248 
0249     def setPredictionCol(self, value):
0250         """
0251         Sets the value of :py:attr:`predictionCol`.
0252         """
0253         return self._set(predictionCol=value)
0254 
0255     def _create_model(self, java_model):
0256         return FPGrowthModel(java_model)
0257 
0258 
0259 class PrefixSpan(JavaParams):
0260     """
0261     A parallel PrefixSpan algorithm to mine frequent sequential patterns.
0262     The PrefixSpan algorithm is described in J. Pei, et al., PrefixSpan: Mining Sequential Patterns
0263     Efficiently by Prefix-Projected Pattern Growth
0264     (see <a href="https://doi.org/10.1109/ICDE.2001.914830">here</a>).
0265     This class is not yet an Estimator/Transformer, use :py:func:`findFrequentSequentialPatterns`
0266     method to run the PrefixSpan algorithm.
0267 
0268     @see <a href="https://en.wikipedia.org/wiki/Sequential_Pattern_Mining">Sequential Pattern Mining
0269     (Wikipedia)</a>
0270 
0271     >>> from pyspark.ml.fpm import PrefixSpan
0272     >>> from pyspark.sql import Row
0273     >>> df = sc.parallelize([Row(sequence=[[1, 2], [3]]),
0274     ...                      Row(sequence=[[1], [3, 2], [1, 2]]),
0275     ...                      Row(sequence=[[1, 2], [5]]),
0276     ...                      Row(sequence=[[6]])]).toDF()
0277     >>> prefixSpan = PrefixSpan()
0278     >>> prefixSpan.getMaxLocalProjDBSize()
0279     32000000
0280     >>> prefixSpan.getSequenceCol()
0281     'sequence'
0282     >>> prefixSpan.setMinSupport(0.5)
0283     PrefixSpan...
0284     >>> prefixSpan.setMaxPatternLength(5)
0285     PrefixSpan...
0286     >>> prefixSpan.findFrequentSequentialPatterns(df).sort("sequence").show(truncate=False)
0287     +----------+----+
0288     |sequence  |freq|
0289     +----------+----+
0290     |[[1]]     |3   |
0291     |[[1], [3]]|2   |
0292     |[[2]]     |3   |
0293     |[[2, 1]]  |3   |
0294     |[[3]]     |2   |
0295     +----------+----+
0296     ...
0297 
0298     .. versionadded:: 2.4.0
0299     """
0300 
0301     minSupport = Param(Params._dummy(), "minSupport", "The minimal support level of the " +
0302                        "sequential pattern. Sequential pattern that appears more than " +
0303                        "(minSupport * size-of-the-dataset) times will be output. Must be >= 0.",
0304                        typeConverter=TypeConverters.toFloat)
0305 
0306     maxPatternLength = Param(Params._dummy(), "maxPatternLength",
0307                              "The maximal length of the sequential pattern. Must be > 0.",
0308                              typeConverter=TypeConverters.toInt)
0309 
0310     maxLocalProjDBSize = Param(Params._dummy(), "maxLocalProjDBSize",
0311                                "The maximum number of items (including delimiters used in the " +
0312                                "internal storage format) allowed in a projected database before " +
0313                                "local processing. If a projected database exceeds this size, " +
0314                                "another iteration of distributed prefix growth is run. " +
0315                                "Must be > 0.",
0316                                typeConverter=TypeConverters.toInt)
0317 
0318     sequenceCol = Param(Params._dummy(), "sequenceCol", "The name of the sequence column in " +
0319                         "dataset, rows with nulls in this column are ignored.",
0320                         typeConverter=TypeConverters.toString)
0321 
0322     @keyword_only
0323     def __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0324                  sequenceCol="sequence"):
0325         """
0326         __init__(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
0327                  sequenceCol="sequence")
0328         """
0329         super(PrefixSpan, self).__init__()
0330         self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.PrefixSpan", self.uid)
0331         self._setDefault(minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0332                          sequenceCol="sequence")
0333         kwargs = self._input_kwargs
0334         self.setParams(**kwargs)
0335 
0336     @keyword_only
0337     @since("2.4.0")
0338     def setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000,
0339                   sequenceCol="sequence"):
0340         """
0341         setParams(self, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=32000000, \
0342                   sequenceCol="sequence")
0343         """
0344         kwargs = self._input_kwargs
0345         return self._set(**kwargs)
0346 
0347     @since("3.0.0")
0348     def setMinSupport(self, value):
0349         """
0350         Sets the value of :py:attr:`minSupport`.
0351         """
0352         return self._set(minSupport=value)
0353 
0354     @since("3.0.0")
0355     def getMinSupport(self):
0356         """
0357         Gets the value of minSupport or its default value.
0358         """
0359         return self.getOrDefault(self.minSupport)
0360 
0361     @since("3.0.0")
0362     def setMaxPatternLength(self, value):
0363         """
0364         Sets the value of :py:attr:`maxPatternLength`.
0365         """
0366         return self._set(maxPatternLength=value)
0367 
0368     @since("3.0.0")
0369     def getMaxPatternLength(self):
0370         """
0371         Gets the value of maxPatternLength or its default value.
0372         """
0373         return self.getOrDefault(self.maxPatternLength)
0374 
0375     @since("3.0.0")
0376     def setMaxLocalProjDBSize(self, value):
0377         """
0378         Sets the value of :py:attr:`maxLocalProjDBSize`.
0379         """
0380         return self._set(maxLocalProjDBSize=value)
0381 
0382     @since("3.0.0")
0383     def getMaxLocalProjDBSize(self):
0384         """
0385         Gets the value of maxLocalProjDBSize or its default value.
0386         """
0387         return self.getOrDefault(self.maxLocalProjDBSize)
0388 
0389     @since("3.0.0")
0390     def setSequenceCol(self, value):
0391         """
0392         Sets the value of :py:attr:`sequenceCol`.
0393         """
0394         return self._set(sequenceCol=value)
0395 
0396     @since("3.0.0")
0397     def getSequenceCol(self):
0398         """
0399         Gets the value of sequenceCol or its default value.
0400         """
0401         return self.getOrDefault(self.sequenceCol)
0402 
0403     @since("2.4.0")
0404     def findFrequentSequentialPatterns(self, dataset):
0405         """
0406         Finds the complete set of frequent sequential patterns in the input sequences of itemsets.
0407 
0408         :param dataset: A dataframe containing a sequence column which is
0409                         `ArrayType(ArrayType(T))` type, T is the item type for the input dataset.
0410         :return: A `DataFrame` that contains columns of sequence and corresponding frequency.
0411                  The schema of it will be:
0412                  - `sequence: ArrayType(ArrayType(T))` (T is the item type)
0413                  - `freq: Long`
0414 
0415         .. versionadded:: 2.4.0
0416         """
0417 
0418         self._transfer_params_to_java()
0419         jdf = self._java_obj.findFrequentSequentialPatterns(dataset._jdf)
0420         return DataFrame(jdf, dataset.sql_ctx)
0421 
0422 
0423 if __name__ == "__main__":
0424     import doctest
0425     import pyspark.ml.fpm
0426     from pyspark.sql import SparkSession
0427     globs = pyspark.ml.fpm.__dict__.copy()
0428     # The small batch size here ensures that we see multiple batches,
0429     # even in these small test examples:
0430     spark = SparkSession.builder\
0431         .master("local[2]")\
0432         .appName("ml.fpm tests")\
0433         .getOrCreate()
0434     sc = spark.sparkContext
0435     globs['sc'] = sc
0436     globs['spark'] = spark
0437     import tempfile
0438     temp_path = tempfile.mkdtemp()
0439     globs['temp_path'] = temp_path
0440     try:
0441         (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0442         spark.stop()
0443     finally:
0444         from shutil import rmtree
0445         try:
0446             rmtree(temp_path)
0447         except OSError:
0448             pass
0449     if failure_count:
0450         sys.exit(-1)