Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 context("functions on binary files")
0019 
0020 # JavaSparkContext handle
0021 sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
0022 sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
0023 
0024 mockFile <- c("Spark is pretty.", "Spark is awesome.")
0025 
0026 test_that("saveAsObjectFile()/objectFile() following textFile() works", {
0027   fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0028   fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0029   writeLines(mockFile, fileName1)
0030 
0031   rdd <- textFile(sc, fileName1, 1)
0032   saveAsObjectFile(rdd, fileName2)
0033   rdd <- objectFile(sc, fileName2)
0034   expect_equal(collectRDD(rdd), as.list(mockFile))
0035 
0036   unlink(fileName1)
0037   unlink(fileName2, recursive = TRUE)
0038 })
0039 
0040 test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
0041   fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
0042 
0043   l <- list(1, 2, 3)
0044   rdd <- parallelize(sc, l, 1)
0045   saveAsObjectFile(rdd, fileName)
0046   rdd <- objectFile(sc, fileName)
0047   expect_equal(collectRDD(rdd), l)
0048 
0049   unlink(fileName, recursive = TRUE)
0050 })
0051 
0052 test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
0053   fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0054   fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0055   writeLines(mockFile, fileName1)
0056 
0057   rdd <- textFile(sc, fileName1)
0058 
0059   words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
0060   wordCount <- lapply(words, function(word) { list(word, 1L) })
0061 
0062   counts <- reduceByKey(wordCount, "+", 2L)
0063 
0064   saveAsObjectFile(counts, fileName2)
0065   counts <- objectFile(sc, fileName2)
0066 
0067   output <- collectRDD(counts)
0068   expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
0069                     list("is", 2))
0070   expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
0071 
0072   unlink(fileName1)
0073   unlink(fileName2, recursive = TRUE)
0074 })
0075 
0076 test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
0077   fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0078   fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
0079 
0080   rdd1 <- parallelize(sc, "Spark is pretty.")
0081   saveAsObjectFile(rdd1, fileName1)
0082   rdd2 <- parallelize(sc, "Spark is awesome.")
0083   saveAsObjectFile(rdd2, fileName2)
0084 
0085   rdd <- objectFile(sc, c(fileName1, fileName2))
0086   expect_equal(countRDD(rdd), 2)
0087 
0088   unlink(fileName1, recursive = TRUE)
0089   unlink(fileName2, recursive = TRUE)
0090 })
0091 
0092 sparkR.session.stop()