0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes
0019
0020 #' @include generics.R jobj.R
0021 NULL
0022
0023 #' S4 class that represents a StreamingQuery
0024 #'
0025 #' StreamingQuery can be created by using read.stream() and write.stream()
0026 #'
0027 #' @rdname StreamingQuery
0028 #' @seealso \link{read.stream}
0029 #'
0030 #' @param ssq A Java object reference to the backing Scala StreamingQuery
0031 #' @note StreamingQuery since 2.2.0
0032 #' @note experimental
0033 setClass("StreamingQuery",
0034 slots = list(ssq = "jobj"))
0035
0036 setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
0037 .Object@ssq <- ssq
0038 .Object
0039 })
0040
0041 streamingQuery <- function(ssq) {
0042 stopifnot(class(ssq) == "jobj")
0043 new("StreamingQuery", ssq)
0044 }
0045
0046 #' @rdname show
0047 #' @note show(StreamingQuery) since 2.2.0
0048 setMethod("show", "StreamingQuery",
0049 function(object) {
0050 name <- callJMethod(object@ssq, "name")
0051 if (!is.null(name)) {
0052 cat(paste0("StreamingQuery '", name, "'\n"))
0053 } else {
0054 cat("StreamingQuery", "\n")
0055 }
0056 })
0057
0058 #' queryName
0059 #'
0060 #' Returns the user-specified name of the query. This is specified in
0061 #' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active
0062 #' queries.
0063 #'
0064 #' @param x a StreamingQuery.
0065 #' @return The name of the query, or NULL if not specified.
0066 #' @rdname queryName
0067 #' @name queryName
0068 #' @aliases queryName,StreamingQuery-method
0069 #' @family StreamingQuery methods
0070 #' @seealso \link{write.stream}
0071 #' @examples
0072 #' \dontrun{ queryName(sq) }
0073 #' @note queryName(StreamingQuery) since 2.2.0
0074 #' @note experimental
0075 setMethod("queryName",
0076 signature(x = "StreamingQuery"),
0077 function(x) {
0078 callJMethod(x@ssq, "name")
0079 })
0080
0081 #' @rdname explain
0082 #' @name explain
0083 #' @aliases explain,StreamingQuery-method
0084 #' @family StreamingQuery methods
0085 #' @examples
0086 #' \dontrun{ explain(sq) }
0087 #' @note explain(StreamingQuery) since 2.2.0
0088 setMethod("explain",
0089 signature(x = "StreamingQuery"),
0090 function(x, extended = FALSE) {
0091 cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
0092 })
0093
0094 #' lastProgress
0095 #'
0096 #' Prints the most recent progess update of this streaming query in JSON format.
0097 #'
0098 #' @param x a StreamingQuery.
0099 #' @rdname lastProgress
0100 #' @name lastProgress
0101 #' @aliases lastProgress,StreamingQuery-method
0102 #' @family StreamingQuery methods
0103 #' @examples
0104 #' \dontrun{ lastProgress(sq) }
0105 #' @note lastProgress(StreamingQuery) since 2.2.0
0106 #' @note experimental
0107 setMethod("lastProgress",
0108 signature(x = "StreamingQuery"),
0109 function(x) {
0110 p <- callJMethod(x@ssq, "lastProgress")
0111 if (is.null(p)) {
0112 cat("Streaming query has no progress")
0113 } else {
0114 cat(callJMethod(p, "toString"), "\n")
0115 }
0116 })
0117
0118 #' status
0119 #'
0120 #' Prints the current status of the query in JSON format.
0121 #'
0122 #' @param x a StreamingQuery.
0123 #' @rdname status
0124 #' @name status
0125 #' @aliases status,StreamingQuery-method
0126 #' @family StreamingQuery methods
0127 #' @examples
0128 #' \dontrun{ status(sq) }
0129 #' @note status(StreamingQuery) since 2.2.0
0130 #' @note experimental
0131 setMethod("status",
0132 signature(x = "StreamingQuery"),
0133 function(x) {
0134 cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
0135 })
0136
0137 #' isActive
0138 #'
0139 #' Returns TRUE if this query is actively running.
0140 #'
0141 #' @param x a StreamingQuery.
0142 #' @return TRUE if query is actively running, FALSE if stopped.
0143 #' @rdname isActive
0144 #' @name isActive
0145 #' @aliases isActive,StreamingQuery-method
0146 #' @family StreamingQuery methods
0147 #' @examples
0148 #' \dontrun{ isActive(sq) }
0149 #' @note isActive(StreamingQuery) since 2.2.0
0150 #' @note experimental
0151 setMethod("isActive",
0152 signature(x = "StreamingQuery"),
0153 function(x) {
0154 callJMethod(x@ssq, "isActive")
0155 })
0156
0157 #' awaitTermination
0158 #'
0159 #' Waits for the termination of the query, either by \code{stopQuery} or by an error.
0160 #'
0161 #' If the query has terminated, then all subsequent calls to this method will return TRUE
0162 #' immediately.
0163 #'
0164 #' @param x a StreamingQuery.
0165 #' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
0166 #' is called or an error has occurred.
0167 #' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
0168 #' specified.
0169 #' @rdname awaitTermination
0170 #' @name awaitTermination
0171 #' @aliases awaitTermination,StreamingQuery-method
0172 #' @family StreamingQuery methods
0173 #' @examples
0174 #' \dontrun{ awaitTermination(sq, 10000) }
0175 #' @note awaitTermination(StreamingQuery) since 2.2.0
0176 #' @note experimental
0177 setMethod("awaitTermination",
0178 signature(x = "StreamingQuery"),
0179 function(x, timeout = NULL) {
0180 if (is.null(timeout)) {
0181 invisible(handledCallJMethod(x@ssq, "awaitTermination"))
0182 } else {
0183 handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
0184 }
0185 })
0186
0187 #' stopQuery
0188 #'
0189 #' Stops the execution of this query if it is running. This method blocks until the execution is
0190 #' stopped.
0191 #'
0192 #' @param x a StreamingQuery.
0193 #' @rdname stopQuery
0194 #' @name stopQuery
0195 #' @aliases stopQuery,StreamingQuery-method
0196 #' @family StreamingQuery methods
0197 #' @examples
0198 #' \dontrun{ stopQuery(sq) }
0199 #' @note stopQuery(StreamingQuery) since 2.2.0
0200 #' @note experimental
0201 setMethod("stopQuery",
0202 signature(x = "StreamingQuery"),
0203 function(x) {
0204 invisible(callJMethod(x@ssq, "stop"))
0205 })