Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # mllib_fpm.R: Provides methods for MLlib frequent pattern mining algorithms integration
0019 
0020 #' S4 class that represents a FPGrowthModel
0021 #'
0022 #' @param jobj a Java object reference to the backing Scala FPGrowthModel
0023 #' @note FPGrowthModel since 2.2.0
0024 setClass("FPGrowthModel", slots = list(jobj = "jobj"))
0025 
0026 #' S4 class that represents a PrefixSpan
0027 #'
0028 #' @param jobj a Java object reference to the backing Scala PrefixSpan
0029 #' @note PrefixSpan since 3.0.0
0030 setClass("PrefixSpan", slots = list(jobj = "jobj"))
0031 
0032 #' FP-growth
0033 #'
0034 #' A parallel FP-growth algorithm to mine frequent itemsets.
0035 #' \code{spark.fpGrowth} fits a FP-growth model on a SparkDataFrame. Users can
0036 #' \code{spark.freqItemsets} to get frequent itemsets, \code{spark.associationRules} to get
0037 #' association rules, \code{predict} to make predictions on new data based on generated association
0038 #' rules, and \code{write.ml}/\code{read.ml} to save/load fitted models.
0039 #' For more details, see
0040 #' \href{https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-growth}{
0041 #' FP-growth}.
0042 #'
0043 #' @param data A SparkDataFrame for training.
0044 #' @param minSupport Minimal support level.
0045 #' @param minConfidence Minimal confidence level.
0046 #' @param itemsCol Features column name.
0047 #' @param numPartitions Number of partitions used for fitting.
0048 #' @param ... additional argument(s) passed to the method.
0049 #' @return \code{spark.fpGrowth} returns a fitted FPGrowth model.
0050 #' @rdname spark.fpGrowth
0051 #' @name spark.fpGrowth
0052 #' @aliases spark.fpGrowth,SparkDataFrame-method
0053 #' @examples
0054 #' \dontrun{
0055 #' raw_data <- read.df(
0056 #'   "data/mllib/sample_fpgrowth.txt",
0057 #'   source = "csv",
0058 #'   schema = structType(structField("raw_items", "string")))
0059 #'
0060 #' data <- selectExpr(raw_data, "split(raw_items, ' ') as items")
0061 #' model <- spark.fpGrowth(data)
0062 #'
0063 #' # Show frequent itemsets
0064 #' frequent_itemsets <- spark.freqItemsets(model)
0065 #' showDF(frequent_itemsets)
0066 #'
0067 #' # Show association rules
0068 #' association_rules <- spark.associationRules(model)
0069 #' showDF(association_rules)
0070 #'
0071 #' # Predict on new data
0072 #' new_itemsets <- data.frame(items = c("t", "t,s"))
0073 #' new_data <- selectExpr(createDataFrame(new_itemsets), "split(items, ',') as items")
0074 #' predict(model, new_data)
0075 #'
0076 #' # Save and load model
0077 #' path <- "/path/to/model"
0078 #' write.ml(model, path)
0079 #' read.ml(path)
0080 #'
0081 #' # Optional arguments
0082 #' baskets_data <- selectExpr(createDataFrame(itemsets), "split(items, ',') as baskets")
0083 #' another_model <- spark.fpGrowth(data, minSupport = 0.1, minConfidence = 0.5,
0084 #'                                 itemsCol = "baskets", numPartitions = 10)
0085 #' }
0086 #' @note spark.fpGrowth since 2.2.0
0087 setMethod("spark.fpGrowth", signature(data = "SparkDataFrame"),
0088           function(data, minSupport = 0.3, minConfidence = 0.8,
0089                    itemsCol = "items", numPartitions = NULL) {
0090             if (!is.numeric(minSupport) || minSupport < 0 || minSupport > 1) {
0091               stop("minSupport should be a number [0, 1].")
0092             }
0093             if (!is.numeric(minConfidence) || minConfidence < 0 || minConfidence > 1) {
0094               stop("minConfidence should be a number [0, 1].")
0095             }
0096             if (!is.null(numPartitions)) {
0097               numPartitions <- as.integer(numPartitions)
0098               stopifnot(numPartitions > 0)
0099             }
0100 
0101             jobj <- callJStatic("org.apache.spark.ml.r.FPGrowthWrapper", "fit",
0102                                 data@sdf, as.numeric(minSupport), as.numeric(minConfidence),
0103                                 itemsCol, numPartitions)
0104             new("FPGrowthModel", jobj = jobj)
0105           })
0106 
0107 # Get frequent itemsets.
0108 
0109 #' @param object a fitted FPGrowth model.
0110 #' @return A \code{SparkDataFrame} with frequent itemsets.
0111 #'         The \code{SparkDataFrame} contains two columns:
0112 #'         \code{items} (an array of the same type as the input column)
0113 #'         and \code{freq} (frequency of the itemset).
0114 #' @rdname spark.fpGrowth
0115 #' @aliases freqItemsets,FPGrowthModel-method
0116 #' @note spark.freqItemsets(FPGrowthModel) since 2.2.0
0117 setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
0118           function(object) {
0119             dataFrame(callJMethod(object@jobj, "freqItemsets"))
0120           })
0121 
0122 # Get association rules.
0123 
0124 #' @return A \code{SparkDataFrame} with association rules.
0125 #'         The \code{SparkDataFrame} contains four columns:
0126 #'         \code{antecedent} (an array of the same type as the input column),
0127 #'         \code{consequent} (an array of the same type as the input column),
0128 #'         \code{condfidence} (confidence for the rule)
0129 #'         and \code{lift} (lift for the rule)
0130 #' @rdname spark.fpGrowth
0131 #' @aliases associationRules,FPGrowthModel-method
0132 #' @note spark.associationRules(FPGrowthModel) since 2.2.0
0133 setMethod("spark.associationRules", signature(object = "FPGrowthModel"),
0134           function(object) {
0135             dataFrame(callJMethod(object@jobj, "associationRules"))
0136           })
0137 
0138 #  Makes predictions based on generated association rules
0139 
0140 #' @param newData a SparkDataFrame for testing.
0141 #' @return \code{predict} returns a SparkDataFrame containing predicted values.
0142 #' @rdname spark.fpGrowth
0143 #' @aliases predict,FPGrowthModel-method
0144 #' @note predict(FPGrowthModel) since 2.2.0
0145 setMethod("predict", signature(object = "FPGrowthModel"),
0146           function(object, newData) {
0147             predict_internal(object, newData)
0148           })
0149 
0150 #  Saves the FPGrowth model to the output path.
0151 
0152 #' @param path the directory where the model is saved.
0153 #' @param overwrite logical value indicating whether to overwrite if the output path
0154 #'                  already exists. Default is FALSE which means throw exception
0155 #'                  if the output path exists.
0156 #' @rdname spark.fpGrowth
0157 #' @aliases write.ml,FPGrowthModel,character-method
0158 #' @seealso \link{read.ml}
0159 #' @note write.ml(FPGrowthModel, character) since 2.2.0
0160 setMethod("write.ml", signature(object = "FPGrowthModel", path = "character"),
0161           function(object, path, overwrite = FALSE) {
0162             write_internal(object, path, overwrite)
0163           })
0164 
0165 #' PrefixSpan
0166 #'
0167 #' A parallel PrefixSpan algorithm to mine frequent sequential patterns.
0168 #' \code{spark.findFrequentSequentialPatterns} returns a complete set of frequent sequential
0169 #' patterns.
0170 #' For more details, see
0171 #' \href{https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#prefixspan}{
0172 #' PrefixSpan}.
0173 #'
0174 #  Find frequent sequential patterns.
0175 #' @param data A SparkDataFrame.
0176 #' @param minSupport Minimal support level.
0177 #' @param maxPatternLength Maximal pattern length.
0178 #' @param maxLocalProjDBSize Maximum number of items (including delimiters used in the internal
0179 #'                           storage format) allowed in a projected database before local
0180 #'                           processing.
0181 #' @param sequenceCol name of the sequence column in dataset.
0182 #' @param ... additional argument(s) passed to the method.
0183 #' @return A complete set of frequent sequential patterns in the input sequences of itemsets.
0184 #'         The returned \code{SparkDataFrame} contains columns of sequence and corresponding
0185 #'         frequency. The schema of it will be:
0186 #'         \code{sequence: ArrayType(ArrayType(T))}, \code{freq: integer}
0187 #'         where T is the item type
0188 #' @rdname spark.prefixSpan
0189 #' @aliases findFrequentSequentialPatterns,PrefixSpan,SparkDataFrame-method
0190 #' @examples
0191 #' \dontrun{
0192 #' df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
0193 #'                            list(list(list(1L), list(3L, 2L), list(1L, 2L))),
0194 #'                            list(list(list(1L, 2L), list(5L))),
0195 #'                            list(list(list(6L)))),
0196 #'                       schema = c("sequence"))
0197 #' frequency <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
0198 #'                                                   maxLocalProjDBSize = 32000000L)
0199 #' showDF(frequency)
0200 #' }
0201 #' @note spark.findFrequentSequentialPatterns(SparkDataFrame) since 3.0.0
0202 setMethod("spark.findFrequentSequentialPatterns",
0203           signature(data = "SparkDataFrame"),
0204           function(data, minSupport = 0.1, maxPatternLength = 10L,
0205             maxLocalProjDBSize = 32000000L, sequenceCol = "sequence") {
0206               if (!is.numeric(minSupport) || minSupport < 0) {
0207                 stop("minSupport should be a number with value >= 0.")
0208               }
0209               if (!is.integer(maxPatternLength) || maxPatternLength <= 0) {
0210                 stop("maxPatternLength should be a number with value > 0.")
0211               }
0212               if (!is.numeric(maxLocalProjDBSize) || maxLocalProjDBSize <= 0) {
0213                 stop("maxLocalProjDBSize should be a number with value > 0.")
0214               }
0215 
0216               jobj <- callJStatic("org.apache.spark.ml.r.PrefixSpanWrapper", "getPrefixSpan",
0217                                   as.numeric(minSupport), as.integer(maxPatternLength),
0218                                   as.numeric(maxLocalProjDBSize), as.character(sequenceCol))
0219               object <- new("PrefixSpan", jobj = jobj)
0220               dataFrame(callJMethod(object@jobj, "findFrequentSequentialPatterns", data@sdf))
0221             }
0222           )