Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # mllib_clustering.R: Provides methods for MLlib clustering algorithms integration
0019 
0020 #' S4 class that represents a BisectingKMeansModel
0021 #'
0022 #' @param jobj a Java object reference to the backing Scala BisectingKMeansModel
0023 #' @note BisectingKMeansModel since 2.2.0
0024 setClass("BisectingKMeansModel", representation(jobj = "jobj"))
0025 
0026 #' S4 class that represents a GaussianMixtureModel
0027 #'
0028 #' @param jobj a Java object reference to the backing Scala GaussianMixtureModel
0029 #' @note GaussianMixtureModel since 2.1.0
0030 setClass("GaussianMixtureModel", representation(jobj = "jobj"))
0031 
0032 #' S4 class that represents a KMeansModel
0033 #'
0034 #' @param jobj a Java object reference to the backing Scala KMeansModel
0035 #' @note KMeansModel since 2.0.0
0036 setClass("KMeansModel", representation(jobj = "jobj"))
0037 
0038 #' S4 class that represents an LDAModel
0039 #'
0040 #' @param jobj a Java object reference to the backing Scala LDAWrapper
0041 #' @note LDAModel since 2.1.0
0042 setClass("LDAModel", representation(jobj = "jobj"))
0043 
0044 #' S4 class that represents a PowerIterationClustering
0045 #'
0046 #' @param jobj a Java object reference to the backing Scala PowerIterationClustering
0047 #' @note PowerIterationClustering since 3.0.0
0048 setClass("PowerIterationClustering", slots = list(jobj = "jobj"))
0049 
0050 #' Bisecting K-Means Clustering Model
0051 #'
0052 #' Fits a bisecting k-means clustering model against a SparkDataFrame.
0053 #' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
0054 #' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
0055 #'
0056 #' @param data a SparkDataFrame for training.
0057 #' @param formula a symbolic description of the model to be fitted. Currently only a few formula
0058 #'                operators are supported, including '~', '.', ':', '+', '-', '*', and '^'.
0059 #'                Note that the response variable of formula is empty in spark.bisectingKmeans.
0060 #' @param k the desired number of leaf clusters. Must be > 1.
0061 #'          The actual number could be smaller if there are no divisible leaf clusters.
0062 #' @param maxIter maximum iteration number.
0063 #' @param seed the random seed.
0064 #' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0)
0065 #'                                or the minimum proportion of points (if less than 1.0) of a
0066 #'                                divisible cluster. Note that it is an expert parameter. The
0067 #'                                default value should be good enough for most cases.
0068 #' @param ... additional argument(s) passed to the method.
0069 #' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model.
0070 #' @rdname spark.bisectingKmeans
0071 #' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method
0072 #' @name spark.bisectingKmeans
0073 #' @examples
0074 #' \dontrun{
0075 #' sparkR.session()
0076 #' t <- as.data.frame(Titanic)
0077 #' df <- createDataFrame(t)
0078 #' model <- spark.bisectingKmeans(df, Class ~ Survived, k = 4)
0079 #' summary(model)
0080 #'
0081 #' # get fitted result from a bisecting k-means model
0082 #' fitted.model <- fitted(model, "centers")
0083 #' showDF(fitted.model)
0084 #'
0085 #' # fitted values on training data
0086 #' fitted <- predict(model, df)
0087 #' head(select(fitted, "Class", "prediction"))
0088 #'
0089 #' # save fitted model to input path
0090 #' path <- "path/to/model"
0091 #' write.ml(model, path)
0092 #'
0093 #' # can also read back the saved model and print
0094 #' savedModel <- read.ml(path)
0095 #' summary(savedModel)
0096 #' }
0097 #' @note spark.bisectingKmeans since 2.2.0
0098 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
0099 setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"),
0100           function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) {
0101             formula <- paste0(deparse(formula), collapse = "")
0102             if (!is.null(seed)) {
0103               seed <- as.character(as.integer(seed))
0104             }
0105             jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit",
0106                                 data@sdf, formula, as.integer(k), as.integer(maxIter),
0107                                 seed, as.numeric(minDivisibleClusterSize))
0108             new("BisectingKMeansModel", jobj = jobj)
0109           })
0110 
0111 #  Get the summary of a bisecting k-means model
0112 
0113 #' @param object a fitted bisecting k-means model.
0114 #' @return \code{summary} returns summary information of the fitted model, which is a list.
0115 #'         The list includes the model's \code{k} (number of cluster centers),
0116 #'         \code{coefficients} (model cluster centers),
0117 #'         \code{size} (number of data points in each cluster), \code{cluster}
0118 #'         (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE),
0119 #'         and \code{is.loaded} (whether the model is loaded from a saved file).
0120 #' @rdname spark.bisectingKmeans
0121 #' @note summary(BisectingKMeansModel) since 2.2.0
0122 setMethod("summary", signature(object = "BisectingKMeansModel"),
0123           function(object) {
0124             jobj <- object@jobj
0125             is.loaded <- callJMethod(jobj, "isLoaded")
0126             features <- callJMethod(jobj, "features")
0127             coefficients <- callJMethod(jobj, "coefficients")
0128             k <- callJMethod(jobj, "k")
0129             size <- callJMethod(jobj, "size")
0130             coefficients <- t(matrix(coefficients, ncol = k))
0131             colnames(coefficients) <- unlist(features)
0132             rownames(coefficients) <- 1:k
0133             cluster <- if (is.loaded) {
0134               NULL
0135             } else {
0136               dataFrame(callJMethod(jobj, "cluster"))
0137             }
0138             list(k = k, coefficients = coefficients, size = size,
0139             cluster = cluster, is.loaded = is.loaded)
0140           })
0141 
0142 #  Predicted values based on a bisecting k-means model
0143 
0144 #' @param newData a SparkDataFrame for testing.
0145 #' @return \code{predict} returns the predicted values based on a bisecting k-means model.
0146 #' @rdname spark.bisectingKmeans
0147 #' @note predict(BisectingKMeansModel) since 2.2.0
0148 setMethod("predict", signature(object = "BisectingKMeansModel"),
0149           function(object, newData) {
0150             predict_internal(object, newData)
0151           })
0152 
0153 #' Get fitted result from a bisecting k-means model
0154 #'
0155 #' Get fitted result from a bisecting k-means model.
0156 #' Note: A saved-loaded model does not support this method.
0157 #'
0158 #' @param method type of fitted results, \code{"centers"} for cluster centers
0159 #'        or \code{"classes"} for assigned classes.
0160 #' @return \code{fitted} returns a SparkDataFrame containing fitted values.
0161 #' @rdname spark.bisectingKmeans
0162 #' @note fitted since 2.2.0
0163 setMethod("fitted", signature(object = "BisectingKMeansModel"),
0164           function(object, method = c("centers", "classes")) {
0165             method <- match.arg(method)
0166             jobj <- object@jobj
0167             is.loaded <- callJMethod(jobj, "isLoaded")
0168             if (is.loaded) {
0169               stop("Saved-loaded bisecting k-means model does not support 'fitted' method")
0170             } else {
0171               dataFrame(callJMethod(jobj, "fitted", method))
0172             }
0173           })
0174 
0175 #  Save fitted MLlib model to the input path
0176 
0177 #' @param path the directory where the model is saved.
0178 #' @param overwrite overwrites or not if the output path already exists. Default is FALSE
0179 #'                  which means throw exception if the output path exists.
0180 #'
0181 #' @rdname spark.bisectingKmeans
0182 #' @note write.ml(BisectingKMeansModel, character) since 2.2.0
0183 setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"),
0184           function(object, path, overwrite = FALSE) {
0185             write_internal(object, path, overwrite)
0186           })
0187 
0188 #' Multivariate Gaussian Mixture Model (GMM)
0189 #'
0190 #' Fits multivariate gaussian mixture model against a SparkDataFrame, similarly to R's
0191 #' mvnormalmixEM(). Users can call \code{summary} to print a summary of the fitted model,
0192 #' \code{predict} to make predictions on new data, and \code{write.ml}/\code{read.ml}
0193 #' to save/load fitted models.
0194 #'
0195 #' @param data a SparkDataFrame for training.
0196 #' @param formula a symbolic description of the model to be fitted. Currently only a few formula
0197 #'                operators are supported, including '~', '.', ':', '+', and '-'.
0198 #'                Note that the response variable of formula is empty in spark.gaussianMixture.
0199 #' @param k number of independent Gaussians in the mixture model.
0200 #' @param maxIter maximum iteration number.
0201 #' @param tol the convergence tolerance.
0202 #' @param ... additional arguments passed to the method.
0203 #' @aliases spark.gaussianMixture,SparkDataFrame,formula-method
0204 #' @return \code{spark.gaussianMixture} returns a fitted multivariate gaussian mixture model.
0205 #' @rdname spark.gaussianMixture
0206 #' @name spark.gaussianMixture
0207 #' @seealso mixtools: \url{https://cran.r-project.org/package=mixtools}
0208 #' @examples
0209 #' \dontrun{
0210 #' sparkR.session()
0211 #' library(mvtnorm)
0212 #' set.seed(100)
0213 #' a <- rmvnorm(4, c(0, 0))
0214 #' b <- rmvnorm(6, c(3, 4))
0215 #' data <- rbind(a, b)
0216 #' df <- createDataFrame(as.data.frame(data))
0217 #' model <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
0218 #' summary(model)
0219 #'
0220 #' # fitted values on training data
0221 #' fitted <- predict(model, df)
0222 #' head(select(fitted, "V1", "prediction"))
0223 #'
0224 #' # save fitted model to input path
0225 #' path <- "path/to/model"
0226 #' write.ml(model, path)
0227 #'
0228 #' # can also read back the saved model and print
0229 #' savedModel <- read.ml(path)
0230 #' summary(savedModel)
0231 #' }
0232 #' @note spark.gaussianMixture since 2.1.0
0233 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
0234 setMethod("spark.gaussianMixture", signature(data = "SparkDataFrame", formula = "formula"),
0235           function(data, formula, k = 2, maxIter = 100, tol = 0.01) {
0236             formula <- paste(deparse(formula), collapse = "")
0237             jobj <- callJStatic("org.apache.spark.ml.r.GaussianMixtureWrapper", "fit", data@sdf,
0238                                 formula, as.integer(k), as.integer(maxIter), as.numeric(tol))
0239             new("GaussianMixtureModel", jobj = jobj)
0240           })
0241 
0242 #  Get the summary of a multivariate gaussian mixture model
0243 
0244 #' @param object a fitted gaussian mixture model.
0245 #' @return \code{summary} returns summary of the fitted model, which is a list.
0246 #'         The list includes the model's \code{lambda} (lambda), \code{mu} (mu),
0247 #'         \code{sigma} (sigma), \code{loglik} (loglik), and \code{posterior} (posterior).
0248 #' @aliases spark.gaussianMixture,SparkDataFrame,formula-method
0249 #' @rdname spark.gaussianMixture
0250 #' @note summary(GaussianMixtureModel) since 2.1.0
0251 setMethod("summary", signature(object = "GaussianMixtureModel"),
0252           function(object) {
0253             jobj <- object@jobj
0254             is.loaded <- callJMethod(jobj, "isLoaded")
0255             lambda <- unlist(callJMethod(jobj, "lambda"))
0256             muList <- callJMethod(jobj, "mu")
0257             sigmaList <- callJMethod(jobj, "sigma")
0258             k <- callJMethod(jobj, "k")
0259             dim <- callJMethod(jobj, "dim")
0260             loglik <- callJMethod(jobj, "logLikelihood")
0261             mu <- c()
0262             for (i in 1 : k) {
0263               start <- (i - 1) * dim + 1
0264               end <- i * dim
0265               mu[[i]] <- unlist(muList[start : end])
0266             }
0267             sigma <- c()
0268             for (i in 1 : k) {
0269               start <- (i - 1) * dim * dim + 1
0270               end <- i * dim * dim
0271               sigma[[i]] <- t(matrix(sigmaList[start : end], ncol = dim))
0272             }
0273             posterior <- if (is.loaded) {
0274               NULL
0275             } else {
0276               dataFrame(callJMethod(jobj, "posterior"))
0277             }
0278             list(lambda = lambda, mu = mu, sigma = sigma, loglik = loglik,
0279                  posterior = posterior, is.loaded = is.loaded)
0280           })
0281 
0282 #  Predicted values based on a gaussian mixture model
0283 
0284 #' @param newData a SparkDataFrame for testing.
0285 #' @return \code{predict} returns a SparkDataFrame containing predicted labels in a column named
0286 #'         "prediction".
0287 #' @aliases predict,GaussianMixtureModel,SparkDataFrame-method
0288 #' @rdname spark.gaussianMixture
0289 #' @note predict(GaussianMixtureModel) since 2.1.0
0290 setMethod("predict", signature(object = "GaussianMixtureModel"),
0291           function(object, newData) {
0292             predict_internal(object, newData)
0293           })
0294 
0295 #  Save fitted MLlib model to the input path
0296 
0297 #' @param path the directory where the model is saved.
0298 #' @param overwrite overwrites or not if the output path already exists. Default is FALSE
0299 #'                  which means throw exception if the output path exists.
0300 #'
0301 #' @aliases write.ml,GaussianMixtureModel,character-method
0302 #' @rdname spark.gaussianMixture
0303 #' @note write.ml(GaussianMixtureModel, character) since 2.1.0
0304 setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "character"),
0305           function(object, path, overwrite = FALSE) {
0306             write_internal(object, path, overwrite)
0307           })
0308 
0309 #' K-Means Clustering Model
0310 #'
0311 #' Fits a k-means clustering model against a SparkDataFrame, similarly to R's kmeans().
0312 #' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
0313 #' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
0314 #'
0315 #' @param data a SparkDataFrame for training.
0316 #' @param formula a symbolic description of the model to be fitted. Currently only a few formula
0317 #'                operators are supported, including '~', '.', ':', '+', and '-'.
0318 #'                Note that the response variable of formula is empty in spark.kmeans.
0319 #' @param k number of centers.
0320 #' @param maxIter maximum iteration number.
0321 #' @param initMode the initialization algorithm chosen to fit the model.
0322 #' @param seed the random seed for cluster initialization.
0323 #' @param initSteps the number of steps for the k-means|| initialization mode.
0324 #'                  This is an advanced setting, the default of 2 is almost always enough.
0325 #'                  Must be > 0.
0326 #' @param tol convergence tolerance of iterations.
0327 #' @param ... additional argument(s) passed to the method.
0328 #' @return \code{spark.kmeans} returns a fitted k-means model.
0329 #' @rdname spark.kmeans
0330 #' @aliases spark.kmeans,SparkDataFrame,formula-method
0331 #' @name spark.kmeans
0332 #' @examples
0333 #' \dontrun{
0334 #' sparkR.session()
0335 #' t <- as.data.frame(Titanic)
0336 #' df <- createDataFrame(t)
0337 #' model <- spark.kmeans(df, Class ~ Survived, k = 4, initMode = "random")
0338 #' summary(model)
0339 #'
0340 #' # fitted values on training data
0341 #' fitted <- predict(model, df)
0342 #' head(select(fitted, "Class", "prediction"))
0343 #'
0344 #' # save fitted model to input path
0345 #' path <- "path/to/model"
0346 #' write.ml(model, path)
0347 #'
0348 #' # can also read back the saved model and print
0349 #' savedModel <- read.ml(path)
0350 #' summary(savedModel)
0351 #' }
0352 #' @note spark.kmeans since 2.0.0
0353 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
0354 setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"),
0355           function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random"),
0356                    seed = NULL, initSteps = 2, tol = 1E-4) {
0357             formula <- paste(deparse(formula), collapse = "")
0358             initMode <- match.arg(initMode)
0359             if (!is.null(seed)) {
0360               seed <- as.character(as.integer(seed))
0361             }
0362             jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf, formula,
0363                                 as.integer(k), as.integer(maxIter), initMode, seed,
0364                                 as.integer(initSteps), as.numeric(tol))
0365             new("KMeansModel", jobj = jobj)
0366           })
0367 
0368 #  Get the summary of a k-means model
0369 
0370 #' @param object a fitted k-means model.
0371 #' @return \code{summary} returns summary information of the fitted model, which is a list.
0372 #'         The list includes the model's \code{k} (the configured number of cluster centers),
0373 #'         \code{coefficients} (model cluster centers),
0374 #'         \code{size} (number of data points in each cluster), \code{cluster}
0375 #'         (cluster centers of the transformed data), {is.loaded} (whether the model is loaded
0376 #'         from a saved file), and \code{clusterSize}
0377 #'         (the actual number of cluster centers. When using initMode = "random",
0378 #'         \code{clusterSize} may not equal to \code{k}).
0379 #' @rdname spark.kmeans
0380 #' @note summary(KMeansModel) since 2.0.0
0381 setMethod("summary", signature(object = "KMeansModel"),
0382           function(object) {
0383             jobj <- object@jobj
0384             is.loaded <- callJMethod(jobj, "isLoaded")
0385             features <- callJMethod(jobj, "features")
0386             coefficients <- callJMethod(jobj, "coefficients")
0387             k <- callJMethod(jobj, "k")
0388             size <- callJMethod(jobj, "size")
0389             clusterSize <- callJMethod(jobj, "clusterSize")
0390             coefficients <- t(matrix(unlist(coefficients), ncol = clusterSize))
0391             colnames(coefficients) <- unlist(features)
0392             rownames(coefficients) <- 1:clusterSize
0393             cluster <- if (is.loaded) {
0394               NULL
0395             } else {
0396               dataFrame(callJMethod(jobj, "cluster"))
0397             }
0398             list(k = k, coefficients = coefficients, size = size,
0399                  cluster = cluster, is.loaded = is.loaded, clusterSize = clusterSize)
0400           })
0401 
0402 #  Predicted values based on a k-means model
0403 
0404 #' @param newData a SparkDataFrame for testing.
0405 #' @return \code{predict} returns the predicted values based on a k-means model.
0406 #' @rdname spark.kmeans
0407 #' @note predict(KMeansModel) since 2.0.0
0408 setMethod("predict", signature(object = "KMeansModel"),
0409           function(object, newData) {
0410             predict_internal(object, newData)
0411           })
0412 
0413 #' Get fitted result from a k-means model
0414 #'
0415 #' Get fitted result from a k-means model, similarly to R's fitted().
0416 #' Note: A saved-loaded model does not support this method.
0417 #'
0418 #' @param object a fitted k-means model.
0419 #' @param method type of fitted results, \code{"centers"} for cluster centers
0420 #'        or \code{"classes"} for assigned classes.
0421 #' @param ... additional argument(s) passed to the method.
0422 #' @return \code{fitted} returns a SparkDataFrame containing fitted values.
0423 #' @rdname fitted
0424 #' @examples
0425 #' \dontrun{
0426 #' model <- spark.kmeans(trainingData, ~ ., 2)
0427 #' fitted.model <- fitted(model)
0428 #' showDF(fitted.model)
0429 #'}
0430 #' @note fitted since 2.0.0
0431 setMethod("fitted", signature(object = "KMeansModel"),
0432           function(object, method = c("centers", "classes")) {
0433             method <- match.arg(method)
0434             jobj <- object@jobj
0435             is.loaded <- callJMethod(jobj, "isLoaded")
0436             if (is.loaded) {
0437               stop("Saved-loaded k-means model does not support 'fitted' method")
0438             } else {
0439               dataFrame(callJMethod(jobj, "fitted", method))
0440             }
0441           })
0442 
0443 #  Save fitted MLlib model to the input path
0444 
0445 #' @param path the directory where the model is saved.
0446 #' @param overwrite overwrites or not if the output path already exists. Default is FALSE
0447 #'                  which means throw exception if the output path exists.
0448 #'
0449 #' @rdname spark.kmeans
0450 #' @note write.ml(KMeansModel, character) since 2.0.0
0451 setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
0452           function(object, path, overwrite = FALSE) {
0453             write_internal(object, path, overwrite)
0454           })
0455 
0456 #' Latent Dirichlet Allocation
0457 #'
0458 #' \code{spark.lda} fits a Latent Dirichlet Allocation model on a SparkDataFrame. Users can call
0459 #' \code{summary} to get a summary of the fitted LDA model, \code{spark.posterior} to compute
0460 #' posterior probabilities on new data, \code{spark.perplexity} to compute log perplexity on new
0461 #' data and \code{write.ml}/\code{read.ml} to save/load fitted models.
0462 #'
0463 #' @param data A SparkDataFrame for training.
0464 #' @param features Features column name. Either libSVM-format column or character-format column is
0465 #'        valid.
0466 #' @param k Number of topics.
0467 #' @param maxIter Maximum iterations.
0468 #' @param optimizer Optimizer to train an LDA model, "online" or "em", default is "online".
0469 #' @param subsamplingRate (For online optimizer) Fraction of the corpus to be sampled and used in
0470 #'        each iteration of mini-batch gradient descent, in range (0, 1].
0471 #' @param topicConcentration concentration parameter (commonly named \code{beta} or \code{eta}) for
0472 #'        the prior placed on topic distributions over terms, default -1 to set automatically on the
0473 #'        Spark side. Use \code{summary} to retrieve the effective topicConcentration. Only 1-size
0474 #'        numeric is accepted.
0475 #' @param docConcentration concentration parameter (commonly named \code{alpha}) for the
0476 #'        prior placed on documents distributions over topics (\code{theta}), default -1 to set
0477 #'        automatically on the Spark side. Use \code{summary} to retrieve the effective
0478 #'        docConcentration. Only 1-size or \code{k}-size numeric is accepted.
0479 #' @param customizedStopWords stopwords that need to be removed from the given corpus. Ignore the
0480 #'        parameter if libSVM-format column is used as the features column.
0481 #' @param maxVocabSize maximum vocabulary size, default 1 << 18
0482 #' @param ... additional argument(s) passed to the method.
0483 #' @return \code{spark.lda} returns a fitted Latent Dirichlet Allocation model.
0484 #' @rdname spark.lda
0485 #' @aliases spark.lda,SparkDataFrame-method
0486 #' @seealso topicmodels: \url{https://cran.r-project.org/package=topicmodels}
0487 #' @examples
0488 #' \dontrun{
0489 #' text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
0490 #' model <- spark.lda(data = text, optimizer = "em")
0491 #'
0492 #' # get a summary of the model
0493 #' summary(model)
0494 #'
0495 #' # compute posterior probabilities
0496 #' posterior <- spark.posterior(model, text)
0497 #' showDF(posterior)
0498 #'
0499 #' # compute perplexity
0500 #' perplexity <- spark.perplexity(model, text)
0501 #'
0502 #' # save and load the model
0503 #' path <- "path/to/model"
0504 #' write.ml(model, path)
0505 #' savedModel <- read.ml(path)
0506 #' summary(savedModel)
0507 #' }
0508 #' @note spark.lda since 2.1.0
0509 setMethod("spark.lda", signature(data = "SparkDataFrame"),
0510           function(data, features = "features", k = 10, maxIter = 20, optimizer = c("online", "em"),
0511                    subsamplingRate = 0.05, topicConcentration = -1, docConcentration = -1,
0512                    customizedStopWords = "", maxVocabSize = bitwShiftL(1, 18)) {
0513             optimizer <- match.arg(optimizer)
0514             jobj <- callJStatic("org.apache.spark.ml.r.LDAWrapper", "fit", data@sdf, features,
0515                                 as.integer(k), as.integer(maxIter), optimizer,
0516                                 as.numeric(subsamplingRate), topicConcentration,
0517                                 as.array(docConcentration), as.array(customizedStopWords),
0518                                 maxVocabSize)
0519             new("LDAModel", jobj = jobj)
0520           })
0521 
0522 #  Returns the summary of a Latent Dirichlet Allocation model produced by \code{spark.lda}
0523 
0524 #' @param object A Latent Dirichlet Allocation model fitted by \code{spark.lda}.
0525 #' @param maxTermsPerTopic Maximum number of terms to collect for each topic. Default value of 10.
0526 #' @return \code{summary} returns summary information of the fitted model, which is a list.
0527 #'         The list includes
0528 #'         \item{\code{docConcentration}}{concentration parameter commonly named \code{alpha} for
0529 #'               the prior placed on documents distributions over topics \code{theta}}
0530 #'         \item{\code{topicConcentration}}{concentration parameter commonly named \code{beta} or
0531 #'               \code{eta} for the prior placed on topic distributions over terms}
0532 #'         \item{\code{logLikelihood}}{log likelihood of the entire corpus}
0533 #'         \item{\code{logPerplexity}}{log perplexity}
0534 #'         \item{\code{isDistributed}}{TRUE for distributed model while FALSE for local model}
0535 #'         \item{\code{vocabSize}}{number of terms in the corpus}
0536 #'         \item{\code{topics}}{top 10 terms and their weights of all topics}
0537 #'         \item{\code{vocabulary}}{whole terms of the training corpus, NULL if libsvm format file
0538 #'               used as training set}
0539 #'         \item{\code{trainingLogLikelihood}}{Log likelihood of the observed tokens in the
0540 #'               training set, given the current parameter estimates:
0541 #'               log P(docs | topics, topic distributions for docs, Dirichlet hyperparameters)
0542 #'               It is only for distributed LDA model (i.e., optimizer = "em")}
0543 #'         \item{\code{logPrior}}{Log probability of the current parameter estimate:
0544 #'               log P(topics, topic distributions for docs | Dirichlet hyperparameters)
0545 #'               It is only for distributed LDA model (i.e., optimizer = "em")}
0546 #' @rdname spark.lda
0547 #' @aliases summary,LDAModel-method
0548 #' @note summary(LDAModel) since 2.1.0
0549 setMethod("summary", signature(object = "LDAModel"),
0550           function(object, maxTermsPerTopic) {
0551             maxTermsPerTopic <- as.integer(ifelse(missing(maxTermsPerTopic), 10, maxTermsPerTopic))
0552             jobj <- object@jobj
0553             docConcentration <- callJMethod(jobj, "docConcentration")
0554             topicConcentration <- callJMethod(jobj, "topicConcentration")
0555             logLikelihood <- callJMethod(jobj, "logLikelihood")
0556             logPerplexity <- callJMethod(jobj, "logPerplexity")
0557             isDistributed <- callJMethod(jobj, "isDistributed")
0558             vocabSize <- callJMethod(jobj, "vocabSize")
0559             topics <- dataFrame(callJMethod(jobj, "topics", maxTermsPerTopic))
0560             vocabulary <- callJMethod(jobj, "vocabulary")
0561             trainingLogLikelihood <- if (isDistributed) {
0562               callJMethod(jobj, "trainingLogLikelihood")
0563             } else {
0564               NA
0565             }
0566             logPrior <- if (isDistributed) {
0567               callJMethod(jobj, "logPrior")
0568             } else {
0569               NA
0570             }
0571             list(docConcentration = unlist(docConcentration),
0572                  topicConcentration = topicConcentration,
0573                  logLikelihood = logLikelihood, logPerplexity = logPerplexity,
0574                  isDistributed = isDistributed, vocabSize = vocabSize,
0575                  topics = topics, vocabulary = unlist(vocabulary),
0576                  trainingLogLikelihood = trainingLogLikelihood, logPrior = logPrior)
0577           })
0578 
0579 #  Returns the log perplexity of a Latent Dirichlet Allocation model produced by \code{spark.lda}
0580 
0581 #' @return \code{spark.perplexity} returns the log perplexity of given SparkDataFrame, or the log
0582 #'         perplexity of the training data if missing argument "data".
0583 #' @rdname spark.lda
0584 #' @aliases spark.perplexity,LDAModel-method
0585 #' @note spark.perplexity(LDAModel) since 2.1.0
0586 setMethod("spark.perplexity", signature(object = "LDAModel", data = "SparkDataFrame"),
0587           function(object, data) {
0588             ifelse(missing(data), callJMethod(object@jobj, "logPerplexity"),
0589                    callJMethod(object@jobj, "computeLogPerplexity", data@sdf))
0590          })
0591 
0592 #  Returns posterior probabilities from a Latent Dirichlet Allocation model produced by spark.lda()
0593 
0594 #' @param newData A SparkDataFrame for testing.
0595 #' @return \code{spark.posterior} returns a SparkDataFrame containing posterior probabilities
0596 #'         vectors named "topicDistribution".
0597 #' @rdname spark.lda
0598 #' @aliases spark.posterior,LDAModel,SparkDataFrame-method
0599 #' @note spark.posterior(LDAModel) since 2.1.0
0600 setMethod("spark.posterior", signature(object = "LDAModel", newData = "SparkDataFrame"),
0601           function(object, newData) {
0602             predict_internal(object, newData)
0603           })
0604 
0605 #  Saves the Latent Dirichlet Allocation model to the input path.
0606 
0607 #' @param path The directory where the model is saved.
0608 #' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
0609 #'                  which means throw exception if the output path exists.
0610 #'
0611 #' @rdname spark.lda
0612 #' @aliases write.ml,LDAModel,character-method
0613 #' @seealso \link{read.ml}
0614 #' @note write.ml(LDAModel, character) since 2.1.0
0615 setMethod("write.ml", signature(object = "LDAModel", path = "character"),
0616           function(object, path, overwrite = FALSE) {
0617             write_internal(object, path, overwrite)
0618           })
0619 
0620 #' PowerIterationClustering
0621 #'
0622 #' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to
0623 #' return a cluster assignment for each input vertex.
0624 #' Run the PIC algorithm and returns a cluster assignment for each input vertex.
0625 #' @param data a SparkDataFrame.
0626 #' @param k the number of clusters to create.
0627 #' @param initMode the initialization algorithm; "random" or "degree"
0628 #' @param maxIter the maximum number of iterations.
0629 #' @param sourceCol the name of the input column for source vertex IDs.
0630 #' @param destinationCol the name of the input column for destination vertex IDs
0631 #' @param weightCol weight column name. If this is not set or \code{NULL},
0632 #'                  we treat all instance weights as 1.0.
0633 #' @param ... additional argument(s) passed to the method.
0634 #' @return A dataset that contains columns of vertex id and the corresponding cluster for the id.
0635 #'         The schema of it will be: \code{id: integer}, \code{cluster: integer}
0636 #' @rdname spark.powerIterationClustering
0637 #' @aliases spark.assignClusters,SparkDataFrame-method
0638 #' @examples
0639 #' \dontrun{
0640 #' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
0641 #'                            list(1L, 2L, 1.0), list(3L, 4L, 1.0),
0642 #'                            list(4L, 0L, 0.1)),
0643 #'                       schema = c("src", "dst", "weight"))
0644 #' clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
0645 #' showDF(clusters)
0646 #' }
0647 #' @note spark.assignClusters(SparkDataFrame) since 3.0.0
0648 setMethod("spark.assignClusters",
0649           signature(data = "SparkDataFrame"),
0650           function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L,
0651             sourceCol = "src", destinationCol = "dst", weightCol = NULL) {
0652             if (!is.integer(k) || k < 1) {
0653               stop("k should be a number with value >= 1.")
0654             }
0655             if (!is.integer(maxIter) || maxIter <= 0) {
0656               stop("maxIter should be a number with value > 0.")
0657             }
0658             initMode <- match.arg(initMode)
0659             if (!is.null(weightCol) && weightCol == "") {
0660               weightCol <- NULL
0661             } else if (!is.null(weightCol)) {
0662               weightCol <- as.character(weightCol)
0663             }
0664             jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper",
0665                                 "getPowerIterationClustering",
0666                                 as.integer(k), initMode,
0667                                 as.integer(maxIter), as.character(sourceCol),
0668                                 as.character(destinationCol), weightCol)
0669             object <- new("PowerIterationClustering", jobj = jobj)
0670             dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf))
0671           })