0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 context("broadcast variables")
0019
0020 # JavaSparkContext handle
0021 sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
0022 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
0023
0024 # Partitioned data
0025 nums <- 1:2
0026 rrdd <- parallelize(sc, nums, 2L)
0027
0028 test_that("using broadcast variable", {
0029 randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
0030 randomMatBr <- broadcastRDD(sc, randomMat)
0031
0032 useBroadcast <- function(x) {
0033 sum(SparkR:::value(randomMatBr) * x)
0034 }
0035 actual <- collectRDD(lapply(rrdd, useBroadcast))
0036 expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
0037 expect_equal(actual, expected)
0038 })
0039
0040 test_that("without using broadcast variable", {
0041 randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
0042
0043 useBroadcast <- function(x) {
0044 sum(randomMat * x)
0045 }
0046 actual <- collectRDD(lapply(rrdd, useBroadcast))
0047 expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
0048 expect_equal(actual, expected)
0049 })
0050
0051 sparkR.session.stop()