001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.compress.compressors.CompressorInputStream; 027import org.apache.commons.compress.utils.BoundedInputStream; 028import org.apache.commons.compress.utils.ByteUtils; 029import org.apache.commons.compress.utils.CountingInputStream; 030import org.apache.commons.compress.utils.IOUtils; 031import org.apache.commons.compress.utils.InputStreamStatistics; 032 033/** 034 * CompressorInputStream for the framing Snappy format. 035 * 036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 037 * 038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 039 * @since 1.7 040 */ 041public class FramedSnappyCompressorInputStream extends CompressorInputStream 042 implements InputStreamStatistics { 043 044 /** 045 * package private for tests only. 046 */ 047 static final long MASK_OFFSET = 0xa282ead8L; 048 049 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 050 static final int COMPRESSED_CHUNK_TYPE = 0; 051 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 052 private static final int PADDING_CHUNK_TYPE = 0xfe; 053 private static final int MIN_UNSKIPPABLE_TYPE = 2; 054 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 055 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 056 057 // used by FramedSnappyCompressorOutputStream as well 058 static final byte[] SZ_SIGNATURE = new byte[] { //NOSONAR 059 (byte) STREAM_IDENTIFIER_TYPE, // tag 060 6, 0, 0, // length 061 's', 'N', 'a', 'P', 'p', 'Y' 062 }; 063 064 private long unreadBytes; 065 private final CountingInputStream countingStream; 066 067 /** The underlying stream to read compressed data from */ 068 private final PushbackInputStream in; 069 070 /** The dialect to expect */ 071 private final FramedSnappyDialect dialect; 072 073 private SnappyCompressorInputStream currentCompressedChunk; 074 075 // used in no-arg read method 076 private final byte[] oneByte = new byte[1]; 077 078 private boolean endReached, inUncompressedChunk; 079 080 private int uncompressedBytesRemaining; 081 private long expectedChecksum = -1; 082 private final int blockSize; 083 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 084 085 private final ByteUtils.ByteSupplier supplier = new ByteUtils.ByteSupplier() { 086 @Override 087 public int getAsByte() throws IOException { 088 return readOneByte(); 089 } 090 }; 091 092 /** 093 * Constructs a new input stream that decompresses 094 * snappy-framed-compressed data from the specified input stream 095 * using the {@link FramedSnappyDialect#STANDARD} dialect. 096 * @param in the InputStream from which to read the compressed data 097 * @throws IOException if reading fails 098 */ 099 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 100 this(in, FramedSnappyDialect.STANDARD); 101 } 102 103 /** 104 * Constructs a new input stream that decompresses snappy-framed-compressed data 105 * from the specified input stream. 106 * @param in the InputStream from which to read the compressed data 107 * @param dialect the dialect used by the compressed stream 108 * @throws IOException if reading fails 109 */ 110 public FramedSnappyCompressorInputStream(final InputStream in, 111 final FramedSnappyDialect dialect) 112 throws IOException { 113 this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect); 114 } 115 116 /** 117 * Constructs a new input stream that decompresses snappy-framed-compressed data 118 * from the specified input stream. 119 * @param in the InputStream from which to read the compressed data 120 * @param blockSize the block size to use for the compressed stream 121 * @param dialect the dialect used by the compressed stream 122 * @throws IOException if reading fails 123 * @throws IllegalArgumentException if blockSize is not bigger than 0 124 * @since 1.14 125 */ 126 public FramedSnappyCompressorInputStream(final InputStream in, 127 final int blockSize, 128 final FramedSnappyDialect dialect) 129 throws IOException { 130 if (blockSize <= 0) { 131 throw new IllegalArgumentException("blockSize must be bigger than 0"); 132 } 133 countingStream = new CountingInputStream(in); 134 this.in = new PushbackInputStream(countingStream, 1); 135 this.blockSize = blockSize; 136 this.dialect = dialect; 137 if (dialect.hasStreamIdentifier()) { 138 readStreamIdentifier(); 139 } 140 } 141 142 /** {@inheritDoc} */ 143 @Override 144 public int read() throws IOException { 145 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 146 } 147 148 /** {@inheritDoc} */ 149 @Override 150 public void close() throws IOException { 151 try { 152 if (currentCompressedChunk != null) { 153 currentCompressedChunk.close(); 154 currentCompressedChunk = null; 155 } 156 } finally { 157 in.close(); 158 } 159 } 160 161 /** {@inheritDoc} */ 162 @Override 163 public int read(final byte[] b, final int off, final int len) throws IOException { 164 int read = readOnce(b, off, len); 165 if (read == -1) { 166 readNextBlock(); 167 if (endReached) { 168 return -1; 169 } 170 read = readOnce(b, off, len); 171 } 172 return read; 173 } 174 175 /** {@inheritDoc} */ 176 @Override 177 public int available() throws IOException { 178 if (inUncompressedChunk) { 179 return Math.min(uncompressedBytesRemaining, 180 in.available()); 181 } else if (currentCompressedChunk != null) { 182 return currentCompressedChunk.available(); 183 } 184 return 0; 185 } 186 187 /** 188 * @since 1.17 189 */ 190 @Override 191 public long getCompressedCount() { 192 return countingStream.getBytesRead() - unreadBytes; 193 } 194 195 /** 196 * Read from the current chunk into the given array. 197 * 198 * @return -1 if there is no current chunk or the number of bytes 199 * read from the current chunk (which may be -1 if the end of the 200 * chunk is reached). 201 */ 202 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 203 int read = -1; 204 if (inUncompressedChunk) { 205 final int amount = Math.min(uncompressedBytesRemaining, len); 206 if (amount == 0) { 207 return -1; 208 } 209 read = in.read(b, off, amount); 210 if (read != -1) { 211 uncompressedBytesRemaining -= read; 212 count(read); 213 } 214 } else if (currentCompressedChunk != null) { 215 final long before = currentCompressedChunk.getBytesRead(); 216 read = currentCompressedChunk.read(b, off, len); 217 if (read == -1) { 218 currentCompressedChunk.close(); 219 currentCompressedChunk = null; 220 } else { 221 count(currentCompressedChunk.getBytesRead() - before); 222 } 223 } 224 if (read > 0) { 225 checksum.update(b, off, read); 226 } 227 return read; 228 } 229 230 private void readNextBlock() throws IOException { 231 verifyLastChecksumAndReset(); 232 inUncompressedChunk = false; 233 final int type = readOneByte(); 234 if (type == -1) { 235 endReached = true; 236 } else if (type == STREAM_IDENTIFIER_TYPE) { 237 in.unread(type); 238 unreadBytes++; 239 pushedBackBytes(1); 240 readStreamIdentifier(); 241 readNextBlock(); 242 } else if (type == PADDING_CHUNK_TYPE 243 || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) { 244 skipBlock(); 245 readNextBlock(); 246 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 247 throw new IOException("Unskippable chunk with type " + type 248 + " (hex " + Integer.toHexString(type) + ")" 249 + " detected."); 250 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 251 inUncompressedChunk = true; 252 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 253 if (uncompressedBytesRemaining < 0) { 254 throw new IOException("Found illegal chunk with negative size"); 255 } 256 expectedChecksum = unmask(readCrc()); 257 } else if (type == COMPRESSED_CHUNK_TYPE) { 258 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 259 final long size = readSize() - (expectChecksum ? 4L : 0L); 260 if (size < 0) { 261 throw new IOException("Found illegal chunk with negative size"); 262 } 263 if (expectChecksum) { 264 expectedChecksum = unmask(readCrc()); 265 } else { 266 expectedChecksum = -1; 267 } 268 currentCompressedChunk = 269 new SnappyCompressorInputStream(new BoundedInputStream(in, size), blockSize); 270 // constructor reads uncompressed size 271 count(currentCompressedChunk.getBytesRead()); 272 } else { 273 // impossible as all potential byte values have been covered 274 throw new IOException("Unknown chunk type " + type 275 + " detected."); 276 } 277 } 278 279 private long readCrc() throws IOException { 280 final byte[] b = new byte[4]; 281 final int read = IOUtils.readFully(in, b); 282 count(read); 283 if (read != 4) { 284 throw new IOException("Premature end of stream"); 285 } 286 return ByteUtils.fromLittleEndian(b); 287 } 288 289 static long unmask(long x) { 290 // ugly, maybe we should just have used ints and deal with the 291 // overflow 292 x -= MASK_OFFSET; 293 x &= 0xffffFFFFL; 294 return ((x >> 17) | (x << 15)) & 0xffffFFFFL; 295 } 296 297 private int readSize() throws IOException { 298 return (int) ByteUtils.fromLittleEndian(supplier, 3); 299 } 300 301 private void skipBlock() throws IOException { 302 final int size = readSize(); 303 if (size < 0) { 304 throw new IOException("Found illegal chunk with negative size"); 305 } 306 final long read = IOUtils.skip(in, size); 307 count(read); 308 if (read != size) { 309 throw new IOException("Premature end of stream"); 310 } 311 } 312 313 private void readStreamIdentifier() throws IOException { 314 final byte[] b = new byte[10]; 315 final int read = IOUtils.readFully(in, b); 316 count(read); 317 if (10 != read || !matches(b, 10)) { 318 throw new IOException("Not a framed Snappy stream"); 319 } 320 } 321 322 private int readOneByte() throws IOException { 323 final int b = in.read(); 324 if (b != -1) { 325 count(1); 326 return b & 0xFF; 327 } 328 return -1; 329 } 330 331 private void verifyLastChecksumAndReset() throws IOException { 332 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 333 throw new IOException("Checksum verification failed"); 334 } 335 expectedChecksum = -1; 336 checksum.reset(); 337 } 338 339 /** 340 * Checks if the signature matches what is expected for a .sz file. 341 * 342 * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p> 343 * 344 * @param signature the bytes to check 345 * @param length the number of bytes to check 346 * @return true if this is a .sz stream, false otherwise 347 */ 348 public static boolean matches(final byte[] signature, final int length) { 349 350 if (length < SZ_SIGNATURE.length) { 351 return false; 352 } 353 354 byte[] shortenedSig = signature; 355 if (signature.length > SZ_SIGNATURE.length) { 356 shortenedSig = new byte[SZ_SIGNATURE.length]; 357 System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length); 358 } 359 360 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 361 } 362 363}