Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.io.*;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.ReadableByteChannel;
0023 import java.nio.charset.StandardCharsets;
0024 import java.util.Locale;
0025 import java.util.concurrent.TimeUnit;
0026 import java.util.regex.Matcher;
0027 import java.util.regex.Pattern;
0028 
0029 import com.google.common.base.Preconditions;
0030 import com.google.common.collect.ImmutableMap;
0031 import io.netty.buffer.Unpooled;
0032 import org.apache.commons.lang3.SystemUtils;
0033 import org.slf4j.Logger;
0034 import org.slf4j.LoggerFactory;
0035 
0036 /**
0037  * General utilities available in the network package. Many of these are sourced from Spark's
0038  * own Utils, just accessible within this package.
0039  */
0040 public class JavaUtils {
0041   private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
0042 
0043   /**
0044    * Define a default value for driver memory here since this value is referenced across the code
0045    * base and nearly all files already use Utils.scala
0046    */
0047   public static final long DEFAULT_DRIVER_MEM_MB = 1024;
0048 
0049   /** Closes the given object, ignoring IOExceptions. */
0050   public static void closeQuietly(Closeable closeable) {
0051     try {
0052       if (closeable != null) {
0053         closeable.close();
0054       }
0055     } catch (IOException e) {
0056       logger.error("IOException should not have been thrown.", e);
0057     }
0058   }
0059 
0060   /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
0061   public static int nonNegativeHash(Object obj) {
0062     if (obj == null) { return 0; }
0063     int hash = obj.hashCode();
0064     return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
0065   }
0066 
0067   /**
0068    * Convert the given string to a byte buffer. The resulting buffer can be
0069    * converted back to the same string through {@link #bytesToString(ByteBuffer)}.
0070    */
0071   public static ByteBuffer stringToBytes(String s) {
0072     return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer();
0073   }
0074 
0075   /**
0076    * Convert the given byte buffer to a string. The resulting string can be
0077    * converted back to the same byte buffer through {@link #stringToBytes(String)}.
0078    */
0079   public static String bytesToString(ByteBuffer b) {
0080     return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
0081   }
0082 
0083   /**
0084    * Delete a file or directory and its contents recursively.
0085    * Don't follow directories if they are symlinks.
0086    *
0087    * @param file Input file / dir to be deleted
0088    * @throws IOException if deletion is unsuccessful
0089    */
0090   public static void deleteRecursively(File file) throws IOException {
0091     deleteRecursively(file, null);
0092   }
0093 
0094   /**
0095    * Delete a file or directory and its contents recursively.
0096    * Don't follow directories if they are symlinks.
0097    *
0098    * @param file Input file / dir to be deleted
0099    * @param filter A filename filter that make sure only files / dirs with the satisfied filenames
0100    *               are deleted.
0101    * @throws IOException if deletion is unsuccessful
0102    */
0103   public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
0104     if (file == null) { return; }
0105 
0106     // On Unix systems, use operating system command to run faster
0107     // If that does not work out, fallback to the Java IO way
0108     if (SystemUtils.IS_OS_UNIX && filter == null) {
0109       try {
0110         deleteRecursivelyUsingUnixNative(file);
0111         return;
0112       } catch (IOException e) {
0113         logger.warn("Attempt to delete using native Unix OS command failed for path = {}. " +
0114                         "Falling back to Java IO way", file.getAbsolutePath(), e);
0115       }
0116     }
0117 
0118     deleteRecursivelyUsingJavaIO(file, filter);
0119   }
0120 
0121   private static void deleteRecursivelyUsingJavaIO(
0122       File file,
0123       FilenameFilter filter) throws IOException {
0124     if (file.isDirectory() && !isSymlink(file)) {
0125       IOException savedIOException = null;
0126       for (File child : listFilesSafely(file, filter)) {
0127         try {
0128           deleteRecursively(child, filter);
0129         } catch (IOException e) {
0130           // In case of multiple exceptions, only last one will be thrown
0131           savedIOException = e;
0132         }
0133       }
0134       if (savedIOException != null) {
0135         throw savedIOException;
0136       }
0137     }
0138 
0139     // Delete file only when it's a normal file or an empty directory.
0140     if (file.isFile() || (file.isDirectory() && listFilesSafely(file, null).length == 0)) {
0141       boolean deleted = file.delete();
0142       // Delete can also fail if the file simply did not exist.
0143       if (!deleted && file.exists()) {
0144         throw new IOException("Failed to delete: " + file.getAbsolutePath());
0145       }
0146     }
0147   }
0148 
0149   private static void deleteRecursivelyUsingUnixNative(File file) throws IOException {
0150     ProcessBuilder builder = new ProcessBuilder("rm", "-rf", file.getAbsolutePath());
0151     Process process = null;
0152     int exitCode = -1;
0153 
0154     try {
0155       // In order to avoid deadlocks, consume the stdout (and stderr) of the process
0156       builder.redirectErrorStream(true);
0157       builder.redirectOutput(new File("/dev/null"));
0158 
0159       process = builder.start();
0160 
0161       exitCode = process.waitFor();
0162     } catch (Exception e) {
0163       throw new IOException("Failed to delete: " + file.getAbsolutePath(), e);
0164     } finally {
0165       if (process != null) {
0166         process.destroy();
0167       }
0168     }
0169 
0170     if (exitCode != 0 || file.exists()) {
0171       throw new IOException("Failed to delete: " + file.getAbsolutePath());
0172     }
0173   }
0174 
0175   private static File[] listFilesSafely(File file, FilenameFilter filter) throws IOException {
0176     if (file.exists()) {
0177       File[] files = file.listFiles(filter);
0178       if (files == null) {
0179         throw new IOException("Failed to list files for dir: " + file);
0180       }
0181       return files;
0182     } else {
0183       return new File[0];
0184     }
0185   }
0186 
0187   private static boolean isSymlink(File file) throws IOException {
0188     Preconditions.checkNotNull(file);
0189     File fileInCanonicalDir = null;
0190     if (file.getParent() == null) {
0191       fileInCanonicalDir = file;
0192     } else {
0193       fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
0194     }
0195     return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
0196   }
0197 
0198   private static final ImmutableMap<String, TimeUnit> timeSuffixes =
0199     ImmutableMap.<String, TimeUnit>builder()
0200       .put("us", TimeUnit.MICROSECONDS)
0201       .put("ms", TimeUnit.MILLISECONDS)
0202       .put("s", TimeUnit.SECONDS)
0203       .put("m", TimeUnit.MINUTES)
0204       .put("min", TimeUnit.MINUTES)
0205       .put("h", TimeUnit.HOURS)
0206       .put("d", TimeUnit.DAYS)
0207       .build();
0208 
0209   private static final ImmutableMap<String, ByteUnit> byteSuffixes =
0210     ImmutableMap.<String, ByteUnit>builder()
0211       .put("b", ByteUnit.BYTE)
0212       .put("k", ByteUnit.KiB)
0213       .put("kb", ByteUnit.KiB)
0214       .put("m", ByteUnit.MiB)
0215       .put("mb", ByteUnit.MiB)
0216       .put("g", ByteUnit.GiB)
0217       .put("gb", ByteUnit.GiB)
0218       .put("t", ByteUnit.TiB)
0219       .put("tb", ByteUnit.TiB)
0220       .put("p", ByteUnit.PiB)
0221       .put("pb", ByteUnit.PiB)
0222       .build();
0223 
0224   /**
0225    * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
0226    * The unit is also considered the default if the given string does not specify a unit.
0227    */
0228   public static long timeStringAs(String str, TimeUnit unit) {
0229     String lower = str.toLowerCase(Locale.ROOT).trim();
0230 
0231     try {
0232       Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
0233       if (!m.matches()) {
0234         throw new NumberFormatException("Failed to parse time string: " + str);
0235       }
0236 
0237       long val = Long.parseLong(m.group(1));
0238       String suffix = m.group(2);
0239 
0240       // Check for invalid suffixes
0241       if (suffix != null && !timeSuffixes.containsKey(suffix)) {
0242         throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
0243       }
0244 
0245       // If suffix is valid use that, otherwise none was provided and use the default passed
0246       return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
0247     } catch (NumberFormatException e) {
0248       String timeError = "Time must be specified as seconds (s), " +
0249               "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +
0250               "E.g. 50s, 100ms, or 250us.";
0251 
0252       throw new NumberFormatException(timeError + "\n" + e.getMessage());
0253     }
0254   }
0255 
0256   /**
0257    * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
0258    * no suffix is provided, the passed number is assumed to be in ms.
0259    */
0260   public static long timeStringAsMs(String str) {
0261     return timeStringAs(str, TimeUnit.MILLISECONDS);
0262   }
0263 
0264   /**
0265    * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
0266    * no suffix is provided, the passed number is assumed to be in seconds.
0267    */
0268   public static long timeStringAsSec(String str) {
0269     return timeStringAs(str, TimeUnit.SECONDS);
0270   }
0271 
0272   /**
0273    * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to the given. If no suffix is
0274    * provided, a direct conversion to the provided unit is attempted.
0275    */
0276   public static long byteStringAs(String str, ByteUnit unit) {
0277     String lower = str.toLowerCase(Locale.ROOT).trim();
0278 
0279     try {
0280       Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
0281       Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
0282 
0283       if (m.matches()) {
0284         long val = Long.parseLong(m.group(1));
0285         String suffix = m.group(2);
0286 
0287         // Check for invalid suffixes
0288         if (suffix != null && !byteSuffixes.containsKey(suffix)) {
0289           throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
0290         }
0291 
0292         // If suffix is valid use that, otherwise none was provided and use the default passed
0293         return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);
0294       } else if (fractionMatcher.matches()) {
0295         throw new NumberFormatException("Fractional values are not supported. Input was: "
0296           + fractionMatcher.group(1));
0297       } else {
0298         throw new NumberFormatException("Failed to parse byte string: " + str);
0299       }
0300 
0301     } catch (NumberFormatException e) {
0302       String byteError = "Size must be specified as bytes (b), " +
0303         "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " +
0304         "E.g. 50b, 100k, or 250m.";
0305 
0306       throw new NumberFormatException(byteError + "\n" + e.getMessage());
0307     }
0308   }
0309 
0310   /**
0311    * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for
0312    * internal use.
0313    *
0314    * If no suffix is provided, the passed number is assumed to be in bytes.
0315    */
0316   public static long byteStringAsBytes(String str) {
0317     return byteStringAs(str, ByteUnit.BYTE);
0318   }
0319 
0320   /**
0321    * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for
0322    * internal use.
0323    *
0324    * If no suffix is provided, the passed number is assumed to be in kibibytes.
0325    */
0326   public static long byteStringAsKb(String str) {
0327     return byteStringAs(str, ByteUnit.KiB);
0328   }
0329 
0330   /**
0331    * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for
0332    * internal use.
0333    *
0334    * If no suffix is provided, the passed number is assumed to be in mebibytes.
0335    */
0336   public static long byteStringAsMb(String str) {
0337     return byteStringAs(str, ByteUnit.MiB);
0338   }
0339 
0340   /**
0341    * Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for
0342    * internal use.
0343    *
0344    * If no suffix is provided, the passed number is assumed to be in gibibytes.
0345    */
0346   public static long byteStringAsGb(String str) {
0347     return byteStringAs(str, ByteUnit.GiB);
0348   }
0349 
0350   /**
0351    * Returns a byte array with the buffer's contents, trying to avoid copying the data if
0352    * possible.
0353    */
0354   public static byte[] bufferToArray(ByteBuffer buffer) {
0355     if (buffer.hasArray() && buffer.arrayOffset() == 0 &&
0356         buffer.array().length == buffer.remaining()) {
0357       return buffer.array();
0358     } else {
0359       byte[] bytes = new byte[buffer.remaining()];
0360       buffer.get(bytes);
0361       return bytes;
0362     }
0363   }
0364 
0365   /**
0366    * Fills a buffer with data read from the channel.
0367    */
0368   public static void readFully(ReadableByteChannel channel, ByteBuffer dst) throws IOException {
0369     int expected = dst.remaining();
0370     while (dst.hasRemaining()) {
0371       if (channel.read(dst) < 0) {
0372         throw new EOFException(String.format("Not enough bytes in channel (expected %d).",
0373           expected));
0374       }
0375     }
0376   }
0377 
0378 }