1 /* 2 * Copyright 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.jimfs; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkPositionIndexes; 21 import static java.nio.file.StandardOpenOption.APPEND; 22 import static java.nio.file.StandardOpenOption.READ; 23 import static java.nio.file.StandardOpenOption.WRITE; 24 25 import java.io.IOException; 26 import java.nio.ByteBuffer; 27 import java.nio.MappedByteBuffer; 28 import java.nio.channels.AsynchronousCloseException; 29 import java.nio.channels.AsynchronousFileChannel; 30 import java.nio.channels.ClosedByInterruptException; 31 import java.nio.channels.ClosedChannelException; 32 import java.nio.channels.FileChannel; 33 import java.nio.channels.FileLock; 34 import java.nio.channels.FileLockInterruptionException; 35 import java.nio.channels.NonReadableChannelException; 36 import java.nio.channels.NonWritableChannelException; 37 import java.nio.channels.ReadableByteChannel; 38 import java.nio.channels.WritableByteChannel; 39 import java.nio.file.OpenOption; 40 import java.util.Arrays; 41 import java.util.HashSet; 42 import java.util.List; 43 import java.util.Set; 44 import java.util.concurrent.ExecutorService; 45 import java.util.concurrent.atomic.AtomicBoolean; 46 47 /** 48 * A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The 49 * read and write methods and other methods that read or change the position of the channel are 50 * locked because the {@link ReadableByteChannel} and {@link WritableByteChannel} interfaces specify 51 * that the read and write methods block when another thread is currently doing a read or write 52 * operation. 53 * 54 * @author Colin Decker 55 */ 56 final class JimfsFileChannel extends FileChannel { 57 58 /** 59 * Set of threads that are currently doing an interruptible blocking operation; that is, doing 60 * something that requires acquiring the file's lock. These threads must be interrupted if the 61 * channel is closed by another thread. 62 */ 63 @GuardedBy("blockingThreads") 64 private final Set<Thread> blockingThreads = new HashSet<Thread>(); 65 66 private final RegularFile file; 67 private final FileSystemState fileSystemState; 68 69 private final boolean read; 70 private final boolean write; 71 private final boolean append; 72 73 @GuardedBy("this") 74 private long position; 75 JimfsFileChannel( RegularFile file, Set<OpenOption> options, FileSystemState fileSystemState)76 public JimfsFileChannel( 77 RegularFile file, Set<OpenOption> options, FileSystemState fileSystemState) { 78 this.file = file; 79 this.fileSystemState = fileSystemState; 80 this.read = options.contains(READ); 81 this.write = options.contains(WRITE); 82 this.append = options.contains(APPEND); 83 84 fileSystemState.register(this); 85 } 86 87 /** 88 * Returns an {@link AsynchronousFileChannel} view of this channel using the given executor for 89 * asynchronous operations. 90 */ asAsynchronousFileChannel(ExecutorService executor)91 public AsynchronousFileChannel asAsynchronousFileChannel(ExecutorService executor) { 92 return new JimfsAsynchronousFileChannel(this, executor); 93 } 94 checkReadable()95 void checkReadable() { 96 if (!read) { 97 throw new NonReadableChannelException(); 98 } 99 } 100 checkWritable()101 void checkWritable() { 102 if (!write) { 103 throw new NonWritableChannelException(); 104 } 105 } 106 checkOpen()107 void checkOpen() throws ClosedChannelException { 108 if (!isOpen()) { 109 throw new ClosedChannelException(); 110 } 111 } 112 113 /** 114 * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the 115 * channel was open and the thread was added as a blocking thread; returns {@code false} if the 116 * channel was closed. 117 */ beginBlocking()118 private boolean beginBlocking() { 119 begin(); 120 synchronized (blockingThreads) { 121 if (isOpen()) { 122 blockingThreads.add(Thread.currentThread()); 123 return true; 124 } 125 126 return false; 127 } 128 } 129 130 /** 131 * Ends a blocking operation, throwing an exception if the thread was interrupted while blocking 132 * or if the channel was closed from another thread. 133 */ endBlocking(boolean completed)134 private void endBlocking(boolean completed) throws AsynchronousCloseException { 135 synchronized (blockingThreads) { 136 blockingThreads.remove(Thread.currentThread()); 137 } 138 end(completed); 139 } 140 141 @Override read(ByteBuffer dst)142 public int read(ByteBuffer dst) throws IOException { 143 checkNotNull(dst); 144 checkOpen(); 145 checkReadable(); 146 147 int read = 0; // will definitely either be assigned or an exception will be thrown 148 149 synchronized (this) { 150 boolean completed = false; 151 try { 152 if (!beginBlocking()) { 153 return 0; // AsynchronousCloseException will be thrown 154 } 155 file.readLock().lockInterruptibly(); 156 try { 157 read = file.read(position, dst); 158 if (read != -1) { 159 position += read; 160 } 161 file.updateAccessTime(); 162 completed = true; 163 } finally { 164 file.readLock().unlock(); 165 } 166 } catch (InterruptedException e) { 167 Thread.currentThread().interrupt(); 168 } finally { 169 endBlocking(completed); 170 } 171 } 172 173 return read; 174 } 175 176 @Override read(ByteBuffer[] dsts, int offset, int length)177 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { 178 checkPositionIndexes(offset, offset + length, dsts.length); 179 List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length); 180 Util.checkNoneNull(buffers); 181 checkOpen(); 182 checkReadable(); 183 184 long read = 0; // will definitely either be assigned or an exception will be thrown 185 186 synchronized (this) { 187 boolean completed = false; 188 try { 189 if (!beginBlocking()) { 190 return 0; // AsynchronousCloseException will be thrown 191 } 192 file.readLock().lockInterruptibly(); 193 try { 194 read = file.read(position, buffers); 195 if (read != -1) { 196 position += read; 197 } 198 file.updateAccessTime(); 199 completed = true; 200 } finally { 201 file.readLock().unlock(); 202 } 203 } catch (InterruptedException e) { 204 Thread.currentThread().interrupt(); 205 } finally { 206 endBlocking(completed); 207 } 208 } 209 210 return read; 211 } 212 213 @Override read(ByteBuffer dst, long position)214 public int read(ByteBuffer dst, long position) throws IOException { 215 checkNotNull(dst); 216 Util.checkNotNegative(position, "position"); 217 checkOpen(); 218 checkReadable(); 219 220 int read = 0; // will definitely either be assigned or an exception will be thrown 221 222 // no need to synchronize here; this method does not make use of the channel's position 223 boolean completed = false; 224 try { 225 if (!beginBlocking()) { 226 return 0; // AsynchronousCloseException will be thrown 227 } 228 file.readLock().lockInterruptibly(); 229 try { 230 read = file.read(position, dst); 231 file.updateAccessTime(); 232 completed = true; 233 } finally { 234 file.readLock().unlock(); 235 } 236 } catch (InterruptedException e) { 237 Thread.currentThread().interrupt(); 238 } finally { 239 endBlocking(completed); 240 } 241 242 return read; 243 } 244 245 @Override write(ByteBuffer src)246 public int write(ByteBuffer src) throws IOException { 247 checkNotNull(src); 248 checkOpen(); 249 checkWritable(); 250 251 int written = 0; // will definitely either be assigned or an exception will be thrown 252 253 synchronized (this) { 254 boolean completed = false; 255 try { 256 if (!beginBlocking()) { 257 return 0; // AsynchronousCloseException will be thrown 258 } 259 file.writeLock().lockInterruptibly(); 260 try { 261 if (append) { 262 position = file.size(); 263 } 264 written = file.write(position, src); 265 position += written; 266 file.updateModifiedTime(); 267 completed = true; 268 } finally { 269 file.writeLock().unlock(); 270 } 271 } catch (InterruptedException e) { 272 Thread.currentThread().interrupt(); 273 } finally { 274 endBlocking(completed); 275 } 276 } 277 278 return written; 279 } 280 281 @Override write(ByteBuffer[] srcs, int offset, int length)282 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { 283 checkPositionIndexes(offset, offset + length, srcs.length); 284 List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length); 285 Util.checkNoneNull(buffers); 286 checkOpen(); 287 checkWritable(); 288 289 long written = 0; // will definitely either be assigned or an exception will be thrown 290 291 synchronized (this) { 292 boolean completed = false; 293 try { 294 if (!beginBlocking()) { 295 return 0; // AsynchronousCloseException will be thrown 296 } 297 file.writeLock().lockInterruptibly(); 298 try { 299 if (append) { 300 position = file.size(); 301 } 302 written = file.write(position, buffers); 303 position += written; 304 file.updateModifiedTime(); 305 completed = true; 306 } finally { 307 file.writeLock().unlock(); 308 } 309 } catch (InterruptedException e) { 310 Thread.currentThread().interrupt(); 311 } finally { 312 endBlocking(completed); 313 } 314 } 315 316 return written; 317 } 318 319 @Override write(ByteBuffer src, long position)320 public int write(ByteBuffer src, long position) throws IOException { 321 checkNotNull(src); 322 Util.checkNotNegative(position, "position"); 323 checkOpen(); 324 checkWritable(); 325 326 int written = 0; // will definitely either be assigned or an exception will be thrown 327 328 if (append) { 329 // synchronize because appending does update the channel's position 330 synchronized (this) { 331 boolean completed = false; 332 try { 333 if (!beginBlocking()) { 334 return 0; // AsynchronousCloseException will be thrown 335 } 336 337 file.writeLock().lockInterruptibly(); 338 try { 339 position = file.sizeWithoutLocking(); 340 written = file.write(position, src); 341 this.position = position + written; 342 file.updateModifiedTime(); 343 completed = true; 344 } finally { 345 file.writeLock().unlock(); 346 } 347 } catch (InterruptedException e) { 348 Thread.currentThread().interrupt(); 349 } finally { 350 endBlocking(completed); 351 } 352 } 353 } else { 354 // don't synchronize because the channel's position is not involved 355 boolean completed = false; 356 try { 357 if (!beginBlocking()) { 358 return 0; // AsynchronousCloseException will be thrown 359 } 360 file.writeLock().lockInterruptibly(); 361 try { 362 written = file.write(position, src); 363 file.updateModifiedTime(); 364 completed = true; 365 } finally { 366 file.writeLock().unlock(); 367 } 368 } catch (InterruptedException e) { 369 Thread.currentThread().interrupt(); 370 } finally { 371 endBlocking(completed); 372 } 373 } 374 375 return written; 376 } 377 378 @Override position()379 public long position() throws IOException { 380 checkOpen(); 381 382 long pos; 383 384 synchronized (this) { 385 boolean completed = false; 386 try { 387 begin(); // don't call beginBlocking() because this method doesn't block 388 if (!isOpen()) { 389 return 0; // AsynchronousCloseException will be thrown 390 } 391 pos = this.position; 392 completed = true; 393 } finally { 394 end(completed); 395 } 396 } 397 398 return pos; 399 } 400 401 @Override position(long newPosition)402 public FileChannel position(long newPosition) throws IOException { 403 Util.checkNotNegative(newPosition, "newPosition"); 404 checkOpen(); 405 406 synchronized (this) { 407 boolean completed = false; 408 try { 409 begin(); // don't call beginBlocking() because this method doesn't block 410 if (!isOpen()) { 411 return this; // AsynchronousCloseException will be thrown 412 } 413 this.position = newPosition; 414 completed = true; 415 } finally { 416 end(completed); 417 } 418 } 419 420 return this; 421 } 422 423 @Override size()424 public long size() throws IOException { 425 checkOpen(); 426 427 long size = 0; // will definitely either be assigned or an exception will be thrown 428 429 boolean completed = false; 430 try { 431 if (!beginBlocking()) { 432 return 0; // AsynchronousCloseException will be thrown 433 } 434 file.readLock().lockInterruptibly(); 435 try { 436 size = file.sizeWithoutLocking(); 437 completed = true; 438 } finally { 439 file.readLock().unlock(); 440 } 441 } catch (InterruptedException e) { 442 Thread.currentThread().interrupt(); 443 } finally { 444 endBlocking(completed); 445 } 446 447 return size; 448 } 449 450 @Override truncate(long size)451 public FileChannel truncate(long size) throws IOException { 452 Util.checkNotNegative(size, "size"); 453 checkOpen(); 454 checkWritable(); 455 456 synchronized (this) { 457 boolean completed = false; 458 try { 459 if (!beginBlocking()) { 460 return this; // AsynchronousCloseException will be thrown 461 } 462 file.writeLock().lockInterruptibly(); 463 try { 464 file.truncate(size); 465 if (position > size) { 466 position = size; 467 } 468 file.updateModifiedTime(); 469 completed = true; 470 } finally { 471 file.writeLock().unlock(); 472 } 473 } catch (InterruptedException e) { 474 Thread.currentThread().interrupt(); 475 } finally { 476 endBlocking(completed); 477 } 478 } 479 480 return this; 481 } 482 483 @Override force(boolean metaData)484 public void force(boolean metaData) throws IOException { 485 checkOpen(); 486 487 // nothing to do since writes are all direct to the storage 488 // however, we should handle the thread being interrupted anyway 489 boolean completed = false; 490 try { 491 begin(); 492 completed = true; 493 } finally { 494 end(completed); 495 } 496 } 497 498 @Override transferTo(long position, long count, WritableByteChannel target)499 public long transferTo(long position, long count, WritableByteChannel target) throws IOException { 500 checkNotNull(target); 501 Util.checkNotNegative(position, "position"); 502 Util.checkNotNegative(count, "count"); 503 checkOpen(); 504 checkReadable(); 505 506 long transferred = 0; // will definitely either be assigned or an exception will be thrown 507 508 // no need to synchronize here; this method does not make use of the channel's position 509 boolean completed = false; 510 try { 511 if (!beginBlocking()) { 512 return 0; // AsynchronousCloseException will be thrown 513 } 514 file.readLock().lockInterruptibly(); 515 try { 516 transferred = file.transferTo(position, count, target); 517 file.updateAccessTime(); 518 completed = true; 519 } finally { 520 file.readLock().unlock(); 521 } 522 } catch (InterruptedException e) { 523 Thread.currentThread().interrupt(); 524 } finally { 525 endBlocking(completed); 526 } 527 528 return transferred; 529 } 530 531 @Override transferFrom(ReadableByteChannel src, long position, long count)532 public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException { 533 checkNotNull(src); 534 Util.checkNotNegative(position, "position"); 535 Util.checkNotNegative(count, "count"); 536 checkOpen(); 537 checkWritable(); 538 539 long transferred = 0; // will definitely either be assigned or an exception will be thrown 540 541 if (append) { 542 // synchronize because appending does update the channel's position 543 synchronized (this) { 544 boolean completed = false; 545 try { 546 if (!beginBlocking()) { 547 return 0; // AsynchronousCloseException will be thrown 548 } 549 550 file.writeLock().lockInterruptibly(); 551 try { 552 position = file.sizeWithoutLocking(); 553 transferred = file.transferFrom(src, position, count); 554 this.position = position + transferred; 555 file.updateModifiedTime(); 556 completed = true; 557 } finally { 558 file.writeLock().unlock(); 559 } 560 } catch (InterruptedException e) { 561 Thread.currentThread().interrupt(); 562 } finally { 563 endBlocking(completed); 564 } 565 } 566 } else { 567 // don't synchronize because the channel's position is not involved 568 boolean completed = false; 569 try { 570 if (!beginBlocking()) { 571 return 0; // AsynchronousCloseException will be thrown 572 } 573 file.writeLock().lockInterruptibly(); 574 try { 575 transferred = file.transferFrom(src, position, count); 576 file.updateModifiedTime(); 577 completed = true; 578 } finally { 579 file.writeLock().unlock(); 580 } 581 } catch (InterruptedException e) { 582 Thread.currentThread().interrupt(); 583 } finally { 584 endBlocking(completed); 585 } 586 } 587 588 return transferred; 589 } 590 591 @Override map(MapMode mode, long position, long size)592 public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { 593 // would like this to pretend to work, but can't create an implementation of MappedByteBuffer 594 // well, a direct buffer could be cast to MappedByteBuffer, but it couldn't work in general 595 throw new UnsupportedOperationException(); 596 } 597 598 @Override lock(long position, long size, boolean shared)599 public FileLock lock(long position, long size, boolean shared) throws IOException { 600 checkLockArguments(position, size, shared); 601 602 // lock is interruptible 603 boolean completed = false; 604 try { 605 begin(); 606 completed = true; 607 return new FakeFileLock(this, position, size, shared); 608 } finally { 609 try { 610 end(completed); 611 } catch (ClosedByInterruptException e) { 612 throw new FileLockInterruptionException(); 613 } 614 } 615 } 616 617 @Override tryLock(long position, long size, boolean shared)618 public FileLock tryLock(long position, long size, boolean shared) throws IOException { 619 checkLockArguments(position, size, shared); 620 621 // tryLock is not interruptible 622 return new FakeFileLock(this, position, size, shared); 623 } 624 checkLockArguments(long position, long size, boolean shared)625 private void checkLockArguments(long position, long size, boolean shared) throws IOException { 626 Util.checkNotNegative(position, "position"); 627 Util.checkNotNegative(size, "size"); 628 checkOpen(); 629 if (shared) { 630 checkReadable(); 631 } else { 632 checkWritable(); 633 } 634 } 635 636 @Override implCloseChannel()637 protected void implCloseChannel() { 638 // interrupt the current blocking threads, if any, causing them to throw 639 // ClosedByInterruptException 640 try { 641 synchronized (blockingThreads) { 642 for (Thread thread : blockingThreads) { 643 thread.interrupt(); 644 } 645 } 646 } finally { 647 fileSystemState.unregister(this); 648 file.closed(); 649 } 650 } 651 652 /** A file lock that does nothing, since only one JVM process has access to this file system. */ 653 static final class FakeFileLock extends FileLock { 654 655 private final AtomicBoolean valid = new AtomicBoolean(true); 656 FakeFileLock(FileChannel channel, long position, long size, boolean shared)657 public FakeFileLock(FileChannel channel, long position, long size, boolean shared) { 658 super(channel, position, size, shared); 659 } 660 FakeFileLock(AsynchronousFileChannel channel, long position, long size, boolean shared)661 public FakeFileLock(AsynchronousFileChannel channel, long position, long size, boolean shared) { 662 super(channel, position, size, shared); 663 } 664 665 @Override isValid()666 public boolean isValid() { 667 return valid.get(); 668 } 669 670 @Override release()671 public void release() throws IOException { 672 valid.set(false); 673 } 674 } 675 } 676