Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Methods to call into SparkRBackend.
0019 
0020 
0021 # Returns TRUE if object is an instance of given class
0022 isInstanceOf <- function(jobj, className) {
0023   stopifnot(class(jobj) == "jobj")
0024   cls <- callJStatic("java.lang.Class", "forName", className)
0025   callJMethod(cls, "isInstance", jobj)
0026 }
0027 
0028 # Call a Java method named methodName on the object
0029 # specified by objId. objId should be a "jobj" returned
0030 # from the SparkRBackend.
0031 callJMethod <- function(objId, methodName, ...) {
0032   stopifnot(class(objId) == "jobj")
0033   if (!isValidJobj(objId)) {
0034     stop("Invalid jobj ", objId$id,
0035          ". If SparkR was restarted, Spark operations need to be re-executed.")
0036   }
0037   invokeJava(isStatic = FALSE, objId$id, methodName, ...)
0038 }
0039 
0040 # Call a static method on a specified className
0041 callJStatic <- function(className, methodName, ...) {
0042   invokeJava(isStatic = TRUE, className, methodName, ...)
0043 }
0044 
0045 # Create a new object of the specified class name
0046 newJObject <- function(className, ...) {
0047   invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
0048 }
0049 
0050 # Remove an object from the SparkR backend. This is done
0051 # automatically when a jobj is garbage collected.
0052 removeJObject <- function(objId) {
0053   invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
0054 }
0055 
0056 isRemoveMethod <- function(isStatic, objId, methodName) {
0057   isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
0058 }
0059 
0060 # Invoke a Java method on the SparkR backend. Users
0061 # should typically use one of the higher level methods like
0062 # callJMethod, callJStatic etc. instead of using this.
0063 #
0064 # isStatic - TRUE if the method to be called is static
0065 # objId - String that refers to the object on which method is invoked
0066 #         Should be a jobj id for non-static methods and the classname
0067 #         for static methods
0068 # methodName - name of method to be invoked
0069 invokeJava <- function(isStatic, objId, methodName, ...) {
0070   if (!exists(".sparkRCon", .sparkREnv)) {
0071     stop("No connection to backend found. Please re-run sparkR.session()")
0072   }
0073 
0074   # If this isn't a removeJObject call
0075   if (!isRemoveMethod(isStatic, objId, methodName)) {
0076     objsToRemove <- ls(.toRemoveJobjs)
0077     if (length(objsToRemove) > 0) {
0078       sapply(objsToRemove,
0079             function(e) {
0080               removeJObject(e)
0081             })
0082       rm(list = objsToRemove, envir = .toRemoveJobjs)
0083     }
0084   }
0085 
0086 
0087   rc <- rawConnection(raw(0), "r+")
0088 
0089   writeBoolean(rc, isStatic)
0090   writeString(rc, objId)
0091   writeString(rc, methodName)
0092 
0093   args <- list(...)
0094   writeInt(rc, length(args))
0095   writeArgs(rc, args)
0096 
0097   # Construct the whole request message to send it once,
0098   # avoiding write-write-read pattern in case of Nagle's algorithm.
0099   # Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
0100   bytesToSend <- rawConnectionValue(rc)
0101   close(rc)
0102   rc <- rawConnection(raw(0), "r+")
0103   writeInt(rc, length(bytesToSend))
0104   writeBin(bytesToSend, rc)
0105   requestMessage <- rawConnectionValue(rc)
0106   close(rc)
0107 
0108   conn <- get(".sparkRCon", .sparkREnv)
0109   writeBin(requestMessage, conn)
0110 
0111   returnStatus <- readInt(conn)
0112   handleErrors(returnStatus, conn)
0113 
0114   # Backend will send +1 as keep alive value to prevent various connection timeouts
0115   # on very long running jobs. See spark.r.heartBeatInterval
0116   while (returnStatus == 1) {
0117     returnStatus <- readInt(conn)
0118     handleErrors(returnStatus, conn)
0119   }
0120 
0121   readObject(conn)
0122 }
0123 
0124 # Helper function to check for returned errors and print appropriate error message to user
0125 handleErrors <- function(returnStatus, conn) {
0126   if (length(returnStatus) == 0) {
0127     stop("No status is returned. Java SparkR backend might have failed.")
0128   }
0129 
0130   # 0 is success and +1 is reserved for heartbeats. Other negative values indicate errors.
0131   if (returnStatus < 0) {
0132     stop(readString(conn))
0133   }
0134 }