0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Methods to call into SparkRBackend.
0019
0020
0021 # Returns TRUE if object is an instance of given class
0022 isInstanceOf <- function(jobj, className) {
0023 stopifnot(class(jobj) == "jobj")
0024 cls <- callJStatic("java.lang.Class", "forName", className)
0025 callJMethod(cls, "isInstance", jobj)
0026 }
0027
0028 # Call a Java method named methodName on the object
0029 # specified by objId. objId should be a "jobj" returned
0030 # from the SparkRBackend.
0031 callJMethod <- function(objId, methodName, ...) {
0032 stopifnot(class(objId) == "jobj")
0033 if (!isValidJobj(objId)) {
0034 stop("Invalid jobj ", objId$id,
0035 ". If SparkR was restarted, Spark operations need to be re-executed.")
0036 }
0037 invokeJava(isStatic = FALSE, objId$id, methodName, ...)
0038 }
0039
0040 # Call a static method on a specified className
0041 callJStatic <- function(className, methodName, ...) {
0042 invokeJava(isStatic = TRUE, className, methodName, ...)
0043 }
0044
0045 # Create a new object of the specified class name
0046 newJObject <- function(className, ...) {
0047 invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
0048 }
0049
0050 # Remove an object from the SparkR backend. This is done
0051 # automatically when a jobj is garbage collected.
0052 removeJObject <- function(objId) {
0053 invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
0054 }
0055
0056 isRemoveMethod <- function(isStatic, objId, methodName) {
0057 isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
0058 }
0059
0060 # Invoke a Java method on the SparkR backend. Users
0061 # should typically use one of the higher level methods like
0062 # callJMethod, callJStatic etc. instead of using this.
0063 #
0064 # isStatic - TRUE if the method to be called is static
0065 # objId - String that refers to the object on which method is invoked
0066 # Should be a jobj id for non-static methods and the classname
0067 # for static methods
0068 # methodName - name of method to be invoked
0069 invokeJava <- function(isStatic, objId, methodName, ...) {
0070 if (!exists(".sparkRCon", .sparkREnv)) {
0071 stop("No connection to backend found. Please re-run sparkR.session()")
0072 }
0073
0074 # If this isn't a removeJObject call
0075 if (!isRemoveMethod(isStatic, objId, methodName)) {
0076 objsToRemove <- ls(.toRemoveJobjs)
0077 if (length(objsToRemove) > 0) {
0078 sapply(objsToRemove,
0079 function(e) {
0080 removeJObject(e)
0081 })
0082 rm(list = objsToRemove, envir = .toRemoveJobjs)
0083 }
0084 }
0085
0086
0087 rc <- rawConnection(raw(0), "r+")
0088
0089 writeBoolean(rc, isStatic)
0090 writeString(rc, objId)
0091 writeString(rc, methodName)
0092
0093 args <- list(...)
0094 writeInt(rc, length(args))
0095 writeArgs(rc, args)
0096
0097 # Construct the whole request message to send it once,
0098 # avoiding write-write-read pattern in case of Nagle's algorithm.
0099 # Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
0100 bytesToSend <- rawConnectionValue(rc)
0101 close(rc)
0102 rc <- rawConnection(raw(0), "r+")
0103 writeInt(rc, length(bytesToSend))
0104 writeBin(bytesToSend, rc)
0105 requestMessage <- rawConnectionValue(rc)
0106 close(rc)
0107
0108 conn <- get(".sparkRCon", .sparkREnv)
0109 writeBin(requestMessage, conn)
0110
0111 returnStatus <- readInt(conn)
0112 handleErrors(returnStatus, conn)
0113
0114 # Backend will send +1 as keep alive value to prevent various connection timeouts
0115 # on very long running jobs. See spark.r.heartBeatInterval
0116 while (returnStatus == 1) {
0117 returnStatus <- readInt(conn)
0118 handleErrors(returnStatus, conn)
0119 }
0120
0121 readObject(conn)
0122 }
0123
0124 # Helper function to check for returned errors and print appropriate error message to user
0125 handleErrors <- function(returnStatus, conn) {
0126 if (length(returnStatus) == 0) {
0127 stop("No status is returned. Java SparkR backend might have failed.")
0128 }
0129
0130 # 0 is success and +1 is reserved for heartbeats. Other negative values indicate errors.
0131 if (returnStatus < 0) {
0132 stop(readString(conn))
0133 }
0134 }