001/* 002 * Licensed under the Apache License, Version 2.0 (the "License"); 003 * you may not use this file except in compliance with the License. 004 * You may obtain a copy of the License at 005 * 006 * http://www.apache.org/licenses/LICENSE-2.0 007 * 008 * Unless required by applicable law or agreed to in writing, software 009 * distributed under the License is distributed on an "AS IS" BASIS, 010 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 011 * See the License for the specific language governing permissions and 012 * limitations under the License. 013 */ 014package org.apache.commons.io.input; 015 016import static org.apache.commons.io.IOUtils.EOF; 017 018import java.io.File; 019import java.io.IOException; 020import java.io.InputStream; 021import java.lang.reflect.Field; 022import java.lang.reflect.Method; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.file.Path; 026import java.nio.file.StandardOpenOption; 027import java.util.Objects; 028 029import org.apache.commons.io.IOUtils; 030 031/** 032 * {@link InputStream} implementation which uses direct buffer to read a file to avoid extra copy of data between Java 033 * and native memory which happens when using {@link java.io.BufferedInputStream}. Unfortunately, this is not something 034 * already available in JDK, {@code sun.nio.ch.ChannelInputStream} supports reading a file using NIO, but does not 035 * support buffering. 036 * <p> 037 * This class was ported and adapted from Apache Spark commit 933dc6cb7b3de1d8ccaf73d124d6eb95b947ed19 where it was 038 * called {@code NioBufferedFileInputStream}. 039 * </p> 040 * 041 * @since 2.9.0 042 */ 043@SuppressWarnings("restriction") 044public final class BufferedFileChannelInputStream extends InputStream { 045 046 private final ByteBuffer byteBuffer; 047 048 private final FileChannel fileChannel; 049 050 private static final Class<?> DIRECT_BUFFER_CLASS = getDirectBufferClass(); 051 052 private static Class<?> getDirectBufferClass() { 053 Class<?> res = null; 054 try { 055 res = Class.forName("sun.nio.ch.DirectBuffer"); 056 } catch (IllegalAccessError | ClassNotFoundException ignored) { 057 // ignored 058 } 059 return res; 060 } 061 062 private static boolean isDirectBuffer(final Object object) { 063 return DIRECT_BUFFER_CLASS != null && DIRECT_BUFFER_CLASS.isInstance(object); 064 } 065 066 /** 067 * Constructs a new instance for the given File. 068 * 069 * @param file The file to stream. 070 * @throws IOException If an I/O error occurs 071 */ 072 public BufferedFileChannelInputStream(final File file) throws IOException { 073 this(file, IOUtils.DEFAULT_BUFFER_SIZE); 074 } 075 076 /** 077 * Constructs a new instance for the given File and buffer size. 078 * 079 * @param file The file to stream. 080 * @param bufferSizeInBytes buffer size. 081 * @throws IOException If an I/O error occurs 082 */ 083 public BufferedFileChannelInputStream(final File file, final int bufferSizeInBytes) throws IOException { 084 this(file.toPath(), bufferSizeInBytes); 085 } 086 087 /** 088 * Constructs a new instance for the given Path. 089 * 090 * @param path The path to stream. 091 * @throws IOException If an I/O error occurs 092 */ 093 public BufferedFileChannelInputStream(final Path path) throws IOException { 094 this(path, IOUtils.DEFAULT_BUFFER_SIZE); 095 } 096 097 /** 098 * Constructs a new instance for the given Path and buffer size. 099 * 100 * @param path The path to stream. 101 * @param bufferSizeInBytes buffer size. 102 * @throws IOException If an I/O error occurs 103 */ 104 public BufferedFileChannelInputStream(final Path path, final int bufferSizeInBytes) throws IOException { 105 Objects.requireNonNull(path, "path"); 106 fileChannel = FileChannel.open(path, StandardOpenOption.READ); 107 byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes); 108 byteBuffer.flip(); 109 } 110 111 @Override 112 public synchronized int available() throws IOException { 113 return byteBuffer.remaining(); 114 } 115 116 /** 117 * Attempts to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun API that will cause 118 * errors if one attempts to read from the disposed buffer. However, neither the bytes allocated to direct buffers 119 * nor file descriptors opened for memory-mapped buffers put pressure on the garbage collector. Waiting for garbage 120 * collection may lead to the depletion of off-heap memory or huge numbers of open files. There's unfortunately no 121 * standard API to manually dispose of these kinds of buffers. 122 * 123 * @param buffer the buffer to clean. 124 */ 125 private void clean(final ByteBuffer buffer) { 126 if (isDirectBuffer(buffer)) { 127 cleanDirectBuffer(buffer); 128 } 129 } 130 131 /** 132 * In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible to access the method 133 * sun.misc.Cleaner.clean() to invoke it. The type changed to jdk.internal.ref.Cleaner in later JDKs, and the 134 * .clean() method is not accessible even with reflection. However sun.misc.Unsafe added a invokeCleaner() method in 135 * JDK 9+ and this is still accessible with reflection. 136 * 137 * @param buffer the buffer to clean. must be a DirectBuffer. 138 */ 139 private void cleanDirectBuffer(final ByteBuffer buffer) { 140 // 141 // Ported from StorageUtils.scala. 142 // 143// private val bufferCleaner: DirectBuffer => Unit = 144// if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { 145// val cleanerMethod = 146// Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) 147// val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe") 148// unsafeField.setAccessible(true) 149// val unsafe = unsafeField.get(null).asInstanceOf[Unsafe] 150// buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer) 151// } else { 152// val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean") 153// buffer: DirectBuffer => { 154// // Careful to avoid the return type of .cleaner(), which changes with JDK 155// val cleaner: AnyRef = buffer.cleaner() 156// if (cleaner != null) { 157// cleanerMethod.invoke(cleaner) 158// } 159// } 160// } 161 // 162 final String specVer = System.getProperty("java.specification.version"); 163 if ("1.8".equals(specVer)) { 164 // On Java 8, but also compiles on Java 11. 165 try { 166 final Class<?> clsCleaner = Class.forName("sun.misc.Cleaner"); 167 final Method cleanerMethod = DIRECT_BUFFER_CLASS.getMethod("cleaner"); 168 final Object cleaner = cleanerMethod.invoke(buffer); 169 if (cleaner != null) { 170 final Method cleanMethod = clsCleaner.getMethod("clean"); 171 cleanMethod.invoke(cleaner); 172 } 173 } catch (final ReflectiveOperationException e) { 174 throw new IllegalStateException(e); 175 } 176 } else { 177 // On Java 9 and up, but compiles on Java 8. 178 try { 179 final Class<?> clsUnsafe = Class.forName("sun.misc.Unsafe"); 180 final Method cleanerMethod = clsUnsafe.getMethod("invokeCleaner", ByteBuffer.class); 181 final Field unsafeField = clsUnsafe.getDeclaredField("theUnsafe"); 182 unsafeField.setAccessible(true); 183 cleanerMethod.invoke(unsafeField.get(null), buffer); 184 } catch (final ReflectiveOperationException e) { 185 throw new IllegalStateException(e); 186 } 187 } 188 } 189 190 @Override 191 public synchronized void close() throws IOException { 192 try { 193 fileChannel.close(); 194 } finally { 195 clean(byteBuffer); 196 } 197 } 198 199 @Override 200 public synchronized int read() throws IOException { 201 if (!refill()) { 202 return EOF; 203 } 204 return byteBuffer.get() & 0xFF; 205 } 206 207 @Override 208 public synchronized int read(final byte[] b, final int offset, int len) throws IOException { 209 if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) { 210 throw new IndexOutOfBoundsException(); 211 } 212 if (!refill()) { 213 return EOF; 214 } 215 len = Math.min(len, byteBuffer.remaining()); 216 byteBuffer.get(b, offset, len); 217 return len; 218 } 219 220 /** 221 * Checks whether data is left to be read from the input stream. 222 * 223 * @return true if data is left, false otherwise 224 */ 225 private boolean refill() throws IOException { 226 if (!byteBuffer.hasRemaining()) { 227 byteBuffer.clear(); 228 int nRead = 0; 229 while (nRead == 0) { 230 nRead = fileChannel.read(byteBuffer); 231 } 232 byteBuffer.flip(); 233 return nRead >= 0; 234 } 235 return true; 236 } 237 238 @Override 239 public synchronized long skip(final long n) throws IOException { 240 if (n <= 0L) { 241 return 0L; 242 } 243 if (byteBuffer.remaining() >= n) { 244 // The buffered content is enough to skip 245 byteBuffer.position(byteBuffer.position() + (int) n); 246 return n; 247 } 248 final long skippedFromBuffer = byteBuffer.remaining(); 249 final long toSkipFromFileChannel = n - skippedFromBuffer; 250 // Discard everything we have read in the buffer. 251 byteBuffer.position(0); 252 byteBuffer.flip(); 253 return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel); 254 } 255 256 private long skipFromFileChannel(final long n) throws IOException { 257 final long currentFilePosition = fileChannel.position(); 258 final long size = fileChannel.size(); 259 if (n > size - currentFilePosition) { 260 fileChannel.position(size); 261 return size - currentFilePosition; 262 } 263 fileChannel.position(currentFilePosition + n); 264 return n; 265 } 266 267}