0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # References to objects that exist on the JVM backend
0019 # are maintained using the jobj.
0020
0021 #' @include generics.R
0022 NULL
0023
0024 # Maintain a reference count of Java object references
0025 # This allows us to GC the java object when it is safe
0026 .validJobjs <- new.env(parent = emptyenv())
0027
0028 # List of object ids to be removed
0029 .toRemoveJobjs <- new.env(parent = emptyenv())
0030
0031 # Check if jobj was created with the current SparkContext
0032 isValidJobj <- function(jobj) {
0033 if (exists(".scStartTime", envir = .sparkREnv)) {
0034 jobj$appId == get(".scStartTime", envir = .sparkREnv)
0035 } else {
0036 FALSE
0037 }
0038 }
0039
0040 getJobj <- function(objId) {
0041 newObj <- jobj(objId)
0042 if (exists(objId, .validJobjs)) {
0043 .validJobjs[[objId]] <- .validJobjs[[objId]] + 1
0044 } else {
0045 .validJobjs[[objId]] <- 1
0046 }
0047 newObj
0048 }
0049
0050 # Handler for a java object that exists on the backend.
0051 jobj <- function(objId) {
0052 if (!is.character(objId)) {
0053 stop("object id must be a character")
0054 }
0055 # NOTE: We need a new env for a jobj as we can only register
0056 # finalizers for environments or external references pointers.
0057 obj <- structure(new.env(parent = emptyenv()), class = "jobj")
0058 obj$id <- objId
0059 obj$appId <- get(".scStartTime", envir = .sparkREnv)
0060
0061 # Register a finalizer to remove the Java object when this reference
0062 # is garbage collected in R
0063 reg.finalizer(obj, cleanup.jobj)
0064 obj
0065 }
0066
0067 #' Print a JVM object reference.
0068 #'
0069 #' This function prints the type and id for an object stored
0070 #' in the SparkR JVM backend.
0071 #'
0072 #' @param x The JVM object reference
0073 #' @param ... further arguments passed to or from other methods
0074 #' @note print.jobj since 1.4.0
0075 print.jobj <- function(x, ...) {
0076 name <- getClassName.jobj(x)
0077 cat("Java ref type", name, "id", x$id, "\n", sep = " ")
0078 }
0079
0080 getClassName.jobj <- function(x) {
0081 cls <- callJMethod(x, "getClass")
0082 callJMethod(cls, "getName")
0083 }
0084
0085 cleanup.jobj <- function(jobj) {
0086 if (isValidJobj(jobj)) {
0087 objId <- jobj$id
0088 # If we don't know anything about this jobj, ignore it
0089 if (exists(objId, envir = .validJobjs)) {
0090 .validJobjs[[objId]] <- .validJobjs[[objId]] - 1
0091
0092 if (.validJobjs[[objId]] == 0) {
0093 rm(list = objId, envir = .validJobjs)
0094 # NOTE: We cannot call removeJObject here as the finalizer may be run
0095 # in the middle of another RPC. Thus we queue up this object Id to be removed
0096 # and then run all the removeJObject when the next RPC is called.
0097 .toRemoveJobjs[[objId]] <- 1
0098 }
0099 }
0100 }
0101 }
0102
0103 clearJobjs <- function() {
0104 valid <- ls(.validJobjs)
0105 rm(list = valid, envir = .validJobjs)
0106
0107 removeList <- ls(.toRemoveJobjs)
0108 rm(list = removeList, envir = .toRemoveJobjs)
0109 }