Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes
0019 
0020 #' @include generics.R jobj.R
0021 NULL
0022 
0023 #' S4 class that represents a StreamingQuery
0024 #'
0025 #' StreamingQuery can be created by using read.stream() and write.stream()
0026 #'
0027 #' @rdname StreamingQuery
0028 #' @seealso \link{read.stream}
0029 #'
0030 #' @param ssq A Java object reference to the backing Scala StreamingQuery
0031 #' @note StreamingQuery since 2.2.0
0032 #' @note experimental
0033 setClass("StreamingQuery",
0034          slots = list(ssq = "jobj"))
0035 
0036 setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
0037   .Object@ssq <- ssq
0038   .Object
0039 })
0040 
0041 streamingQuery <- function(ssq) {
0042   stopifnot(class(ssq) == "jobj")
0043   new("StreamingQuery", ssq)
0044 }
0045 
0046 #' @rdname show
0047 #' @note show(StreamingQuery) since 2.2.0
0048 setMethod("show", "StreamingQuery",
0049           function(object) {
0050             name <- callJMethod(object@ssq, "name")
0051             if (!is.null(name)) {
0052               cat(paste0("StreamingQuery '", name, "'\n"))
0053             } else {
0054               cat("StreamingQuery", "\n")
0055             }
0056           })
0057 
0058 #' queryName
0059 #'
0060 #' Returns the user-specified name of the query. This is specified in
0061 #' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active
0062 #' queries.
0063 #'
0064 #' @param x a StreamingQuery.
0065 #' @return The name of the query, or NULL if not specified.
0066 #' @rdname queryName
0067 #' @name queryName
0068 #' @aliases queryName,StreamingQuery-method
0069 #' @family StreamingQuery methods
0070 #' @seealso \link{write.stream}
0071 #' @examples
0072 #' \dontrun{ queryName(sq) }
0073 #' @note queryName(StreamingQuery) since 2.2.0
0074 #' @note experimental
0075 setMethod("queryName",
0076           signature(x = "StreamingQuery"),
0077           function(x) {
0078             callJMethod(x@ssq, "name")
0079           })
0080 
0081 #' @rdname explain
0082 #' @name explain
0083 #' @aliases explain,StreamingQuery-method
0084 #' @family StreamingQuery methods
0085 #' @examples
0086 #' \dontrun{ explain(sq) }
0087 #' @note explain(StreamingQuery) since 2.2.0
0088 setMethod("explain",
0089           signature(x = "StreamingQuery"),
0090           function(x, extended = FALSE) {
0091             cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
0092           })
0093 
0094 #' lastProgress
0095 #'
0096 #' Prints the most recent progess update of this streaming query in JSON format.
0097 #'
0098 #' @param x a StreamingQuery.
0099 #' @rdname lastProgress
0100 #' @name lastProgress
0101 #' @aliases lastProgress,StreamingQuery-method
0102 #' @family StreamingQuery methods
0103 #' @examples
0104 #' \dontrun{ lastProgress(sq) }
0105 #' @note lastProgress(StreamingQuery) since 2.2.0
0106 #' @note experimental
0107 setMethod("lastProgress",
0108           signature(x = "StreamingQuery"),
0109           function(x) {
0110             p <- callJMethod(x@ssq, "lastProgress")
0111             if (is.null(p)) {
0112               cat("Streaming query has no progress")
0113             } else {
0114               cat(callJMethod(p, "toString"), "\n")
0115             }
0116           })
0117 
0118 #' status
0119 #'
0120 #' Prints the current status of the query in JSON format.
0121 #'
0122 #' @param x a StreamingQuery.
0123 #' @rdname status
0124 #' @name status
0125 #' @aliases status,StreamingQuery-method
0126 #' @family StreamingQuery methods
0127 #' @examples
0128 #' \dontrun{ status(sq) }
0129 #' @note status(StreamingQuery) since 2.2.0
0130 #' @note experimental
0131 setMethod("status",
0132           signature(x = "StreamingQuery"),
0133           function(x) {
0134             cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
0135           })
0136 
0137 #' isActive
0138 #'
0139 #' Returns TRUE if this query is actively running.
0140 #'
0141 #' @param x a StreamingQuery.
0142 #' @return TRUE if query is actively running, FALSE if stopped.
0143 #' @rdname isActive
0144 #' @name isActive
0145 #' @aliases isActive,StreamingQuery-method
0146 #' @family StreamingQuery methods
0147 #' @examples
0148 #' \dontrun{ isActive(sq) }
0149 #' @note isActive(StreamingQuery) since 2.2.0
0150 #' @note experimental
0151 setMethod("isActive",
0152           signature(x = "StreamingQuery"),
0153           function(x) {
0154             callJMethod(x@ssq, "isActive")
0155           })
0156 
0157 #' awaitTermination
0158 #'
0159 #' Waits for the termination of the query, either by \code{stopQuery} or by an error.
0160 #'
0161 #' If the query has terminated, then all subsequent calls to this method will return TRUE
0162 #' immediately.
0163 #'
0164 #' @param x a StreamingQuery.
0165 #' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
0166 #'                is called or an error has occurred.
0167 #' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
0168 #'         specified.
0169 #' @rdname awaitTermination
0170 #' @name awaitTermination
0171 #' @aliases awaitTermination,StreamingQuery-method
0172 #' @family StreamingQuery methods
0173 #' @examples
0174 #' \dontrun{ awaitTermination(sq, 10000) }
0175 #' @note awaitTermination(StreamingQuery) since 2.2.0
0176 #' @note experimental
0177 setMethod("awaitTermination",
0178           signature(x = "StreamingQuery"),
0179           function(x, timeout = NULL) {
0180             if (is.null(timeout)) {
0181               invisible(handledCallJMethod(x@ssq, "awaitTermination"))
0182             } else {
0183               handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
0184             }
0185           })
0186 
0187 #' stopQuery
0188 #'
0189 #' Stops the execution of this query if it is running. This method blocks until the execution is
0190 #' stopped.
0191 #'
0192 #' @param x a StreamingQuery.
0193 #' @rdname stopQuery
0194 #' @name stopQuery
0195 #' @aliases stopQuery,StreamingQuery-method
0196 #' @family StreamingQuery methods
0197 #' @examples
0198 #' \dontrun{ stopQuery(sq) }
0199 #' @note stopQuery(StreamingQuery) since 2.2.0
0200 #' @note experimental
0201 setMethod("stopQuery",
0202           signature(x = "StreamingQuery"),
0203           function(x) {
0204             invisible(callJMethod(x@ssq, "stop"))
0205           })