0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # mllib_fpm.R: Provides methods for MLlib frequent pattern mining algorithms integration
0019
0020 #' S4 class that represents a FPGrowthModel
0021 #'
0022 #' @param jobj a Java object reference to the backing Scala FPGrowthModel
0023 #' @note FPGrowthModel since 2.2.0
0024 setClass("FPGrowthModel", slots = list(jobj = "jobj"))
0025
0026 #' S4 class that represents a PrefixSpan
0027 #'
0028 #' @param jobj a Java object reference to the backing Scala PrefixSpan
0029 #' @note PrefixSpan since 3.0.0
0030 setClass("PrefixSpan", slots = list(jobj = "jobj"))
0031
0032 #' FP-growth
0033 #'
0034 #' A parallel FP-growth algorithm to mine frequent itemsets.
0035 #' \code{spark.fpGrowth} fits a FP-growth model on a SparkDataFrame. Users can
0036 #' \code{spark.freqItemsets} to get frequent itemsets, \code{spark.associationRules} to get
0037 #' association rules, \code{predict} to make predictions on new data based on generated association
0038 #' rules, and \code{write.ml}/\code{read.ml} to save/load fitted models.
0039 #' For more details, see
0040 #' \href{https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-growth}{
0041 #' FP-growth}.
0042 #'
0043 #' @param data A SparkDataFrame for training.
0044 #' @param minSupport Minimal support level.
0045 #' @param minConfidence Minimal confidence level.
0046 #' @param itemsCol Features column name.
0047 #' @param numPartitions Number of partitions used for fitting.
0048 #' @param ... additional argument(s) passed to the method.
0049 #' @return \code{spark.fpGrowth} returns a fitted FPGrowth model.
0050 #' @rdname spark.fpGrowth
0051 #' @name spark.fpGrowth
0052 #' @aliases spark.fpGrowth,SparkDataFrame-method
0053 #' @examples
0054 #' \dontrun{
0055 #' raw_data <- read.df(
0056 #' "data/mllib/sample_fpgrowth.txt",
0057 #' source = "csv",
0058 #' schema = structType(structField("raw_items", "string")))
0059 #'
0060 #' data <- selectExpr(raw_data, "split(raw_items, ' ') as items")
0061 #' model <- spark.fpGrowth(data)
0062 #'
0063 #' # Show frequent itemsets
0064 #' frequent_itemsets <- spark.freqItemsets(model)
0065 #' showDF(frequent_itemsets)
0066 #'
0067 #' # Show association rules
0068 #' association_rules <- spark.associationRules(model)
0069 #' showDF(association_rules)
0070 #'
0071 #' # Predict on new data
0072 #' new_itemsets <- data.frame(items = c("t", "t,s"))
0073 #' new_data <- selectExpr(createDataFrame(new_itemsets), "split(items, ',') as items")
0074 #' predict(model, new_data)
0075 #'
0076 #' # Save and load model
0077 #' path <- "/path/to/model"
0078 #' write.ml(model, path)
0079 #' read.ml(path)
0080 #'
0081 #' # Optional arguments
0082 #' baskets_data <- selectExpr(createDataFrame(itemsets), "split(items, ',') as baskets")
0083 #' another_model <- spark.fpGrowth(data, minSupport = 0.1, minConfidence = 0.5,
0084 #' itemsCol = "baskets", numPartitions = 10)
0085 #' }
0086 #' @note spark.fpGrowth since 2.2.0
0087 setMethod("spark.fpGrowth", signature(data = "SparkDataFrame"),
0088 function(data, minSupport = 0.3, minConfidence = 0.8,
0089 itemsCol = "items", numPartitions = NULL) {
0090 if (!is.numeric(minSupport) || minSupport < 0 || minSupport > 1) {
0091 stop("minSupport should be a number [0, 1].")
0092 }
0093 if (!is.numeric(minConfidence) || minConfidence < 0 || minConfidence > 1) {
0094 stop("minConfidence should be a number [0, 1].")
0095 }
0096 if (!is.null(numPartitions)) {
0097 numPartitions <- as.integer(numPartitions)
0098 stopifnot(numPartitions > 0)
0099 }
0100
0101 jobj <- callJStatic("org.apache.spark.ml.r.FPGrowthWrapper", "fit",
0102 data@sdf, as.numeric(minSupport), as.numeric(minConfidence),
0103 itemsCol, numPartitions)
0104 new("FPGrowthModel", jobj = jobj)
0105 })
0106
0107 # Get frequent itemsets.
0108
0109 #' @param object a fitted FPGrowth model.
0110 #' @return A \code{SparkDataFrame} with frequent itemsets.
0111 #' The \code{SparkDataFrame} contains two columns:
0112 #' \code{items} (an array of the same type as the input column)
0113 #' and \code{freq} (frequency of the itemset).
0114 #' @rdname spark.fpGrowth
0115 #' @aliases freqItemsets,FPGrowthModel-method
0116 #' @note spark.freqItemsets(FPGrowthModel) since 2.2.0
0117 setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
0118 function(object) {
0119 dataFrame(callJMethod(object@jobj, "freqItemsets"))
0120 })
0121
0122 # Get association rules.
0123
0124 #' @return A \code{SparkDataFrame} with association rules.
0125 #' The \code{SparkDataFrame} contains four columns:
0126 #' \code{antecedent} (an array of the same type as the input column),
0127 #' \code{consequent} (an array of the same type as the input column),
0128 #' \code{condfidence} (confidence for the rule)
0129 #' and \code{lift} (lift for the rule)
0130 #' @rdname spark.fpGrowth
0131 #' @aliases associationRules,FPGrowthModel-method
0132 #' @note spark.associationRules(FPGrowthModel) since 2.2.0
0133 setMethod("spark.associationRules", signature(object = "FPGrowthModel"),
0134 function(object) {
0135 dataFrame(callJMethod(object@jobj, "associationRules"))
0136 })
0137
0138 # Makes predictions based on generated association rules
0139
0140 #' @param newData a SparkDataFrame for testing.
0141 #' @return \code{predict} returns a SparkDataFrame containing predicted values.
0142 #' @rdname spark.fpGrowth
0143 #' @aliases predict,FPGrowthModel-method
0144 #' @note predict(FPGrowthModel) since 2.2.0
0145 setMethod("predict", signature(object = "FPGrowthModel"),
0146 function(object, newData) {
0147 predict_internal(object, newData)
0148 })
0149
0150 # Saves the FPGrowth model to the output path.
0151
0152 #' @param path the directory where the model is saved.
0153 #' @param overwrite logical value indicating whether to overwrite if the output path
0154 #' already exists. Default is FALSE which means throw exception
0155 #' if the output path exists.
0156 #' @rdname spark.fpGrowth
0157 #' @aliases write.ml,FPGrowthModel,character-method
0158 #' @seealso \link{read.ml}
0159 #' @note write.ml(FPGrowthModel, character) since 2.2.0
0160 setMethod("write.ml", signature(object = "FPGrowthModel", path = "character"),
0161 function(object, path, overwrite = FALSE) {
0162 write_internal(object, path, overwrite)
0163 })
0164
0165 #' PrefixSpan
0166 #'
0167 #' A parallel PrefixSpan algorithm to mine frequent sequential patterns.
0168 #' \code{spark.findFrequentSequentialPatterns} returns a complete set of frequent sequential
0169 #' patterns.
0170 #' For more details, see
0171 #' \href{https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#prefixspan}{
0172 #' PrefixSpan}.
0173 #'
0174 # Find frequent sequential patterns.
0175 #' @param data A SparkDataFrame.
0176 #' @param minSupport Minimal support level.
0177 #' @param maxPatternLength Maximal pattern length.
0178 #' @param maxLocalProjDBSize Maximum number of items (including delimiters used in the internal
0179 #' storage format) allowed in a projected database before local
0180 #' processing.
0181 #' @param sequenceCol name of the sequence column in dataset.
0182 #' @param ... additional argument(s) passed to the method.
0183 #' @return A complete set of frequent sequential patterns in the input sequences of itemsets.
0184 #' The returned \code{SparkDataFrame} contains columns of sequence and corresponding
0185 #' frequency. The schema of it will be:
0186 #' \code{sequence: ArrayType(ArrayType(T))}, \code{freq: integer}
0187 #' where T is the item type
0188 #' @rdname spark.prefixSpan
0189 #' @aliases findFrequentSequentialPatterns,PrefixSpan,SparkDataFrame-method
0190 #' @examples
0191 #' \dontrun{
0192 #' df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
0193 #' list(list(list(1L), list(3L, 2L), list(1L, 2L))),
0194 #' list(list(list(1L, 2L), list(5L))),
0195 #' list(list(list(6L)))),
0196 #' schema = c("sequence"))
0197 #' frequency <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
0198 #' maxLocalProjDBSize = 32000000L)
0199 #' showDF(frequency)
0200 #' }
0201 #' @note spark.findFrequentSequentialPatterns(SparkDataFrame) since 3.0.0
0202 setMethod("spark.findFrequentSequentialPatterns",
0203 signature(data = "SparkDataFrame"),
0204 function(data, minSupport = 0.1, maxPatternLength = 10L,
0205 maxLocalProjDBSize = 32000000L, sequenceCol = "sequence") {
0206 if (!is.numeric(minSupport) || minSupport < 0) {
0207 stop("minSupport should be a number with value >= 0.")
0208 }
0209 if (!is.integer(maxPatternLength) || maxPatternLength <= 0) {
0210 stop("maxPatternLength should be a number with value > 0.")
0211 }
0212 if (!is.numeric(maxLocalProjDBSize) || maxLocalProjDBSize <= 0) {
0213 stop("maxLocalProjDBSize should be a number with value > 0.")
0214 }
0215
0216 jobj <- callJStatic("org.apache.spark.ml.r.PrefixSpanWrapper", "getPrefixSpan",
0217 as.numeric(minSupport), as.integer(maxPatternLength),
0218 as.numeric(maxLocalProjDBSize), as.character(sequenceCol))
0219 object <- new("PrefixSpan", jobj = jobj)
0220 dataFrame(callJMethod(object@jobj, "findFrequentSequentialPatterns", data@sdf))
0221 }
0222 )