Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes
0019 
0020 #' @include generics.R jobj.R column.R
0021 NULL
0022 
0023 #' S4 class that represents a WindowSpec
0024 #'
0025 #' WindowSpec can be created by using windowPartitionBy() or windowOrderBy()
0026 #'
0027 #' @rdname WindowSpec
0028 #' @seealso \link{windowPartitionBy}, \link{windowOrderBy}
0029 #'
0030 #' @param sws A Java object reference to the backing Scala WindowSpec
0031 #' @note WindowSpec since 2.0.0
0032 setClass("WindowSpec",
0033          slots = list(sws = "jobj"))
0034 
0035 setMethod("initialize", "WindowSpec", function(.Object, sws) {
0036   .Object@sws <- sws
0037   .Object
0038 })
0039 
0040 windowSpec <- function(sws) {
0041   stopifnot(class(sws) == "jobj")
0042   new("WindowSpec", sws)
0043 }
0044 
0045 #' @rdname show
0046 #' @note show(WindowSpec) since 2.0.0
0047 setMethod("show", "WindowSpec",
0048           function(object) {
0049             cat("WindowSpec", callJMethod(object@sws, "toString"), "\n")
0050           })
0051 
0052 #' partitionBy
0053 #'
0054 #' Defines the partitioning columns in a WindowSpec.
0055 #'
0056 #' @param x a WindowSpec.
0057 #' @param col a column to partition on (desribed by the name or Column).
0058 #' @param ... additional column(s) to partition on.
0059 #' @return A WindowSpec.
0060 #' @rdname partitionBy
0061 #' @name partitionBy
0062 #' @aliases partitionBy,WindowSpec-method
0063 #' @family windowspec_method
0064 #' @examples
0065 #' \dontrun{
0066 #'   partitionBy(ws, "col1", "col2")
0067 #'   partitionBy(ws, df$col1, df$col2)
0068 #' }
0069 #' @note partitionBy(WindowSpec) since 2.0.0
0070 setMethod("partitionBy",
0071           signature(x = "WindowSpec"),
0072           function(x, col, ...) {
0073             stopifnot(class(col) %in% c("character", "Column"))
0074 
0075             if (class(col) == "character") {
0076               windowSpec(callJMethod(x@sws, "partitionBy", col, list(...)))
0077             } else {
0078               jcols <- lapply(list(col, ...), function(c) {
0079                 c@jc
0080               })
0081               windowSpec(callJMethod(x@sws, "partitionBy", jcols))
0082             }
0083           })
0084 
0085 #' Ordering Columns in a WindowSpec
0086 #'
0087 #' Defines the ordering columns in a WindowSpec.
0088 #' @param x a WindowSpec
0089 #' @param col a character or Column indicating an ordering column
0090 #' @param ... additional sorting fields
0091 #' @return A WindowSpec.
0092 #' @name orderBy
0093 #' @rdname orderBy
0094 #' @aliases orderBy,WindowSpec,character-method
0095 #' @family windowspec_method
0096 #' @seealso See \link{arrange} for use in sorting a SparkDataFrame
0097 #' @examples
0098 #' \dontrun{
0099 #'   orderBy(ws, "col1", "col2")
0100 #'   orderBy(ws, df$col1, df$col2)
0101 #' }
0102 #' @note orderBy(WindowSpec, character) since 2.0.0
0103 setMethod("orderBy",
0104           signature(x = "WindowSpec", col = "character"),
0105           function(x, col, ...) {
0106             windowSpec(callJMethod(x@sws, "orderBy", col, list(...)))
0107           })
0108 
0109 #' @rdname orderBy
0110 #' @name orderBy
0111 #' @aliases orderBy,WindowSpec,Column-method
0112 #' @note orderBy(WindowSpec, Column) since 2.0.0
0113 setMethod("orderBy",
0114           signature(x = "WindowSpec", col = "Column"),
0115           function(x, col, ...) {
0116             jcols <- lapply(list(col, ...), function(c) {
0117               c@jc
0118             })
0119             windowSpec(callJMethod(x@sws, "orderBy", jcols))
0120           })
0121 
0122 #' rowsBetween
0123 #'
0124 #' Defines the frame boundaries, from \code{start} (inclusive) to \code{end} (inclusive).
0125 #'
0126 #' Both \code{start} and \code{end} are relative positions from the current row. For example,
0127 #' "0" means "current row", while "-1" means the row before the current row, and "5" means the
0128 #' fifth row after the current row.
0129 #'
0130 #' We recommend users use \code{Window.unboundedPreceding}, \code{Window.unboundedFollowing},
0131 #' and \code{Window.currentRow} to specify special boundary values, rather than using long values
0132 #' directly.
0133 #'
0134 #' A row based boundary is based on the position of the row within the partition.
0135 #' An offset indicates the number of rows above or below the current row, the frame for the
0136 #' current row starts or ends. For instance, given a row based sliding frame with a lower bound
0137 #' offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
0138 #' index 4 to index 6.
0139 #'
0140 #' @param x a WindowSpec
0141 #' @param start boundary start, inclusive.
0142 #'              The frame is unbounded if this is the minimum long value.
0143 #' @param end boundary end, inclusive.
0144 #'            The frame is unbounded if this is the maximum long value.
0145 #' @return a WindowSpec
0146 #' @rdname rowsBetween
0147 #' @aliases rowsBetween,WindowSpec,numeric,numeric-method
0148 #' @name rowsBetween
0149 #' @family windowspec_method
0150 #' @examples
0151 #' \dontrun{
0152 #'   id <- c(rep(1, 3), rep(2, 3), 3)
0153 #'   desc <- c('New', 'New', 'Good', 'New', 'Good', 'Good', 'New')
0154 #'   df <- data.frame(id, desc)
0155 #'   df <- createDataFrame(df)
0156 #'   w1 <- orderBy(windowPartitionBy('desc'), df$id)
0157 #'   w2 <- rowsBetween(w1, 0, 3)
0158 #'   df1 <- withColumn(df, "sum", over(sum(df$id), w2))
0159 #'   head(df1)
0160 #' }
0161 #' @note rowsBetween since 2.0.0
0162 setMethod("rowsBetween",
0163           signature(x = "WindowSpec", start = "numeric", end = "numeric"),
0164           function(x, start, end) {
0165             # "start" and "end" should be long, due to serde limitation,
0166             # limit "start" and "end" as integer now
0167             windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end)))
0168           })
0169 
0170 #' rangeBetween
0171 #'
0172 #' Defines the frame boundaries, from \code{start} (inclusive) to \code{end} (inclusive).
0173 #'
0174 #' Both \code{start} and \code{end} are relative from the current row. For example, "0" means
0175 #' "current row", while "-1" means one off before the current row, and "5" means the five off
0176 #' after the current row.
0177 #'
0178 #' We recommend users use \code{Window.unboundedPreceding}, \code{Window.unboundedFollowing},
0179 #' and \code{Window.currentRow} to specify special boundary values, rather than using long values
0180 #' directly.
0181 #'
0182 #' A range-based boundary is based on the actual value of the ORDER BY
0183 #' expression(s). An offset is used to alter the value of the ORDER BY expression,
0184 #' for instance if the current ORDER BY expression has a value of 10 and the lower bound offset
0185 #' is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
0186 #' number of constraints on the ORDER BY expressions: there can be only one expression and this
0187 #' expression must have a numerical data type. An exception can be made when the offset is
0188 #' unbounded, because no value modification is needed, in this case multiple and non-numeric
0189 #' ORDER BY expression are allowed.
0190 #'
0191 #' @param x a WindowSpec
0192 #' @param start boundary start, inclusive.
0193 #'              The frame is unbounded if this is the minimum long value.
0194 #' @param end boundary end, inclusive.
0195 #'            The frame is unbounded if this is the maximum long value.
0196 #' @return a WindowSpec
0197 #' @rdname rangeBetween
0198 #' @aliases rangeBetween,WindowSpec,numeric,numeric-method
0199 #' @name rangeBetween
0200 #' @family windowspec_method
0201 #' @examples
0202 #' \dontrun{
0203 #'   id <- c(rep(1, 3), rep(2, 3), 3)
0204 #'   desc <- c('New', 'New', 'Good', 'New', 'Good', 'Good', 'New')
0205 #'   df <- data.frame(id, desc)
0206 #'   df <- createDataFrame(df)
0207 #'   w1 <- orderBy(windowPartitionBy('desc'), df$id)
0208 #'   w2 <- rangeBetween(w1, 0, 3)
0209 #'   df1 <- withColumn(df, "sum", over(sum(df$id), w2))
0210 #'   head(df1)
0211 #' }
0212 #' @note rangeBetween since 2.0.0
0213 setMethod("rangeBetween",
0214           signature(x = "WindowSpec", start = "numeric", end = "numeric"),
0215           function(x, start, end) {
0216             # "start" and "end" should be long, due to serde limitation,
0217             # limit "start" and "end" as integer now
0218             windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end)))
0219           })
0220 
0221 # Note that over is a method of Column class, but it is placed here to
0222 # avoid Roxygen circular-dependency between class Column and WindowSpec.
0223 
0224 #' over
0225 #'
0226 #' Define a windowing column.
0227 #'
0228 #' @param x a Column, usually one returned by window function(s).
0229 #' @param window a WindowSpec object. Can be created by \code{windowPartitionBy} or
0230 #'        \code{windowOrderBy} and configured by other WindowSpec methods.
0231 #' @rdname over
0232 #' @name over
0233 #' @aliases over,Column,WindowSpec-method
0234 #' @family colum_func
0235 #' @examples
0236 #' \dontrun{
0237 #'   df <- createDataFrame(mtcars)
0238 #'
0239 #'   # Partition by am (transmission) and order by hp (horsepower)
0240 #'   ws <- orderBy(windowPartitionBy("am"), "hp")
0241 #'
0242 #'   # Rank on hp within each partition
0243 #'   out <- select(df, over(rank(), ws), df$hp, df$am)
0244 #'
0245 #'   # Lag mpg values by 1 row on the partition-and-ordered table
0246 #'   out <- select(df, over(lead(df$mpg), ws), df$mpg, df$hp, df$am)
0247 #' }
0248 #' @note over since 2.0.0
0249 setMethod("over",
0250           signature(x = "Column", window = "WindowSpec"),
0251           function(x, window) {
0252             column(callJMethod(x@jc, "over", window@sws))
0253           })