Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # References to objects that exist on the JVM backend
0019 # are maintained using the jobj.
0020 
0021 #' @include generics.R
0022 NULL
0023 
0024 # Maintain a reference count of Java object references
0025 # This allows us to GC the java object when it is safe
0026 .validJobjs <- new.env(parent = emptyenv())
0027 
0028 # List of object ids to be removed
0029 .toRemoveJobjs <- new.env(parent = emptyenv())
0030 
0031 # Check if jobj was created with the current SparkContext
0032 isValidJobj <- function(jobj) {
0033   if (exists(".scStartTime", envir = .sparkREnv)) {
0034     jobj$appId == get(".scStartTime", envir = .sparkREnv)
0035   } else {
0036     FALSE
0037   }
0038 }
0039 
0040 getJobj <- function(objId) {
0041   newObj <- jobj(objId)
0042   if (exists(objId, .validJobjs)) {
0043     .validJobjs[[objId]] <- .validJobjs[[objId]] + 1
0044   } else {
0045     .validJobjs[[objId]] <- 1
0046   }
0047   newObj
0048 }
0049 
0050 # Handler for a java object that exists on the backend.
0051 jobj <- function(objId) {
0052   if (!is.character(objId)) {
0053     stop("object id must be a character")
0054   }
0055   # NOTE: We need a new env for a jobj as we can only register
0056   # finalizers for environments or external references pointers.
0057   obj <- structure(new.env(parent = emptyenv()), class = "jobj")
0058   obj$id <- objId
0059   obj$appId <- get(".scStartTime", envir = .sparkREnv)
0060 
0061   # Register a finalizer to remove the Java object when this reference
0062   # is garbage collected in R
0063   reg.finalizer(obj, cleanup.jobj)
0064   obj
0065 }
0066 
0067 #' Print a JVM object reference.
0068 #'
0069 #' This function prints the type and id for an object stored
0070 #' in the SparkR JVM backend.
0071 #'
0072 #' @param x The JVM object reference
0073 #' @param ... further arguments passed to or from other methods
0074 #' @note print.jobj since 1.4.0
0075 print.jobj <- function(x, ...) {
0076   name <- getClassName.jobj(x)
0077   cat("Java ref type", name, "id", x$id, "\n", sep = " ")
0078 }
0079 
0080 getClassName.jobj <- function(x) {
0081   cls <- callJMethod(x, "getClass")
0082   callJMethod(cls, "getName")
0083 }
0084 
0085 cleanup.jobj <- function(jobj) {
0086   if (isValidJobj(jobj)) {
0087     objId <- jobj$id
0088     # If we don't know anything about this jobj, ignore it
0089     if (exists(objId, envir = .validJobjs)) {
0090       .validJobjs[[objId]] <- .validJobjs[[objId]] - 1
0091 
0092       if (.validJobjs[[objId]] == 0) {
0093         rm(list = objId, envir = .validJobjs)
0094         # NOTE: We cannot call removeJObject here as the finalizer may be run
0095         # in the middle of another RPC. Thus we queue up this object Id to be removed
0096         # and then run all the removeJObject when the next RPC is called.
0097         .toRemoveJobjs[[objId]] <- 1
0098       }
0099     }
0100   }
0101 }
0102 
0103 clearJobjs <- function() {
0104   valid <- ls(.validJobjs)
0105   rm(list = valid, envir = .validJobjs)
0106 
0107   removeList <- ls(.toRemoveJobjs)
0108   rm(list = removeList, envir = .toRemoveJobjs)
0109 }