• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2013 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.jimfs;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkPositionIndexes;
21 import static java.nio.file.StandardOpenOption.APPEND;
22 import static java.nio.file.StandardOpenOption.READ;
23 import static java.nio.file.StandardOpenOption.WRITE;
24 
25 import java.io.IOException;
26 import java.nio.ByteBuffer;
27 import java.nio.MappedByteBuffer;
28 import java.nio.channels.AsynchronousCloseException;
29 import java.nio.channels.AsynchronousFileChannel;
30 import java.nio.channels.ClosedByInterruptException;
31 import java.nio.channels.ClosedChannelException;
32 import java.nio.channels.FileChannel;
33 import java.nio.channels.FileLock;
34 import java.nio.channels.FileLockInterruptionException;
35 import java.nio.channels.NonReadableChannelException;
36 import java.nio.channels.NonWritableChannelException;
37 import java.nio.channels.ReadableByteChannel;
38 import java.nio.channels.WritableByteChannel;
39 import java.nio.file.OpenOption;
40 import java.util.Arrays;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 
47 /**
48  * A {@link FileChannel} implementation that reads and writes to a {@link RegularFile} object. The
49  * read and write methods and other methods that read or change the position of the channel are
50  * locked because the {@link ReadableByteChannel} and {@link WritableByteChannel} interfaces specify
51  * that the read and write methods block when another thread is currently doing a read or write
52  * operation.
53  *
54  * @author Colin Decker
55  */
56 final class JimfsFileChannel extends FileChannel {
57 
58   /**
59    * Set of threads that are currently doing an interruptible blocking operation; that is, doing
60    * something that requires acquiring the file's lock. These threads must be interrupted if the
61    * channel is closed by another thread.
62    */
63   @GuardedBy("blockingThreads")
64   private final Set<Thread> blockingThreads = new HashSet<Thread>();
65 
66   private final RegularFile file;
67   private final FileSystemState fileSystemState;
68 
69   private final boolean read;
70   private final boolean write;
71   private final boolean append;
72 
73   @GuardedBy("this")
74   private long position;
75 
JimfsFileChannel( RegularFile file, Set<OpenOption> options, FileSystemState fileSystemState)76   public JimfsFileChannel(
77       RegularFile file, Set<OpenOption> options, FileSystemState fileSystemState) {
78     this.file = file;
79     this.fileSystemState = fileSystemState;
80     this.read = options.contains(READ);
81     this.write = options.contains(WRITE);
82     this.append = options.contains(APPEND);
83 
84     fileSystemState.register(this);
85   }
86 
87   /**
88    * Returns an {@link AsynchronousFileChannel} view of this channel using the given executor for
89    * asynchronous operations.
90    */
asAsynchronousFileChannel(ExecutorService executor)91   public AsynchronousFileChannel asAsynchronousFileChannel(ExecutorService executor) {
92     return new JimfsAsynchronousFileChannel(this, executor);
93   }
94 
checkReadable()95   void checkReadable() {
96     if (!read) {
97       throw new NonReadableChannelException();
98     }
99   }
100 
checkWritable()101   void checkWritable() {
102     if (!write) {
103       throw new NonWritableChannelException();
104     }
105   }
106 
checkOpen()107   void checkOpen() throws ClosedChannelException {
108     if (!isOpen()) {
109       throw new ClosedChannelException();
110     }
111   }
112 
113   /**
114    * Begins a blocking operation, making the operation interruptible. Returns {@code true} if the
115    * channel was open and the thread was added as a blocking thread; returns {@code false} if the
116    * channel was closed.
117    */
beginBlocking()118   private boolean beginBlocking() {
119     begin();
120     synchronized (blockingThreads) {
121       if (isOpen()) {
122         blockingThreads.add(Thread.currentThread());
123         return true;
124       }
125 
126       return false;
127     }
128   }
129 
130   /**
131    * Ends a blocking operation, throwing an exception if the thread was interrupted while blocking
132    * or if the channel was closed from another thread.
133    */
endBlocking(boolean completed)134   private void endBlocking(boolean completed) throws AsynchronousCloseException {
135     synchronized (blockingThreads) {
136       blockingThreads.remove(Thread.currentThread());
137     }
138     end(completed);
139   }
140 
141   @Override
read(ByteBuffer dst)142   public int read(ByteBuffer dst) throws IOException {
143     checkNotNull(dst);
144     checkOpen();
145     checkReadable();
146 
147     int read = 0; // will definitely either be assigned or an exception will be thrown
148 
149     synchronized (this) {
150       boolean completed = false;
151       try {
152         if (!beginBlocking()) {
153           return 0; // AsynchronousCloseException will be thrown
154         }
155         file.readLock().lockInterruptibly();
156         try {
157           read = file.read(position, dst);
158           if (read != -1) {
159             position += read;
160           }
161           file.updateAccessTime();
162           completed = true;
163         } finally {
164           file.readLock().unlock();
165         }
166       } catch (InterruptedException e) {
167         Thread.currentThread().interrupt();
168       } finally {
169         endBlocking(completed);
170       }
171     }
172 
173     return read;
174   }
175 
176   @Override
read(ByteBuffer[] dsts, int offset, int length)177   public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
178     checkPositionIndexes(offset, offset + length, dsts.length);
179     List<ByteBuffer> buffers = Arrays.asList(dsts).subList(offset, offset + length);
180     Util.checkNoneNull(buffers);
181     checkOpen();
182     checkReadable();
183 
184     long read = 0; // will definitely either be assigned or an exception will be thrown
185 
186     synchronized (this) {
187       boolean completed = false;
188       try {
189         if (!beginBlocking()) {
190           return 0; // AsynchronousCloseException will be thrown
191         }
192         file.readLock().lockInterruptibly();
193         try {
194           read = file.read(position, buffers);
195           if (read != -1) {
196             position += read;
197           }
198           file.updateAccessTime();
199           completed = true;
200         } finally {
201           file.readLock().unlock();
202         }
203       } catch (InterruptedException e) {
204         Thread.currentThread().interrupt();
205       } finally {
206         endBlocking(completed);
207       }
208     }
209 
210     return read;
211   }
212 
213   @Override
read(ByteBuffer dst, long position)214   public int read(ByteBuffer dst, long position) throws IOException {
215     checkNotNull(dst);
216     Util.checkNotNegative(position, "position");
217     checkOpen();
218     checkReadable();
219 
220     int read = 0; // will definitely either be assigned or an exception will be thrown
221 
222     // no need to synchronize here; this method does not make use of the channel's position
223     boolean completed = false;
224     try {
225       if (!beginBlocking()) {
226         return 0; // AsynchronousCloseException will be thrown
227       }
228       file.readLock().lockInterruptibly();
229       try {
230         read = file.read(position, dst);
231         file.updateAccessTime();
232         completed = true;
233       } finally {
234         file.readLock().unlock();
235       }
236     } catch (InterruptedException e) {
237       Thread.currentThread().interrupt();
238     } finally {
239       endBlocking(completed);
240     }
241 
242     return read;
243   }
244 
245   @Override
write(ByteBuffer src)246   public int write(ByteBuffer src) throws IOException {
247     checkNotNull(src);
248     checkOpen();
249     checkWritable();
250 
251     int written = 0; // will definitely either be assigned or an exception will be thrown
252 
253     synchronized (this) {
254       boolean completed = false;
255       try {
256         if (!beginBlocking()) {
257           return 0; // AsynchronousCloseException will be thrown
258         }
259         file.writeLock().lockInterruptibly();
260         try {
261           if (append) {
262             position = file.size();
263           }
264           written = file.write(position, src);
265           position += written;
266           file.updateModifiedTime();
267           completed = true;
268         } finally {
269           file.writeLock().unlock();
270         }
271       } catch (InterruptedException e) {
272         Thread.currentThread().interrupt();
273       } finally {
274         endBlocking(completed);
275       }
276     }
277 
278     return written;
279   }
280 
281   @Override
write(ByteBuffer[] srcs, int offset, int length)282   public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
283     checkPositionIndexes(offset, offset + length, srcs.length);
284     List<ByteBuffer> buffers = Arrays.asList(srcs).subList(offset, offset + length);
285     Util.checkNoneNull(buffers);
286     checkOpen();
287     checkWritable();
288 
289     long written = 0; // will definitely either be assigned or an exception will be thrown
290 
291     synchronized (this) {
292       boolean completed = false;
293       try {
294         if (!beginBlocking()) {
295           return 0; // AsynchronousCloseException will be thrown
296         }
297         file.writeLock().lockInterruptibly();
298         try {
299           if (append) {
300             position = file.size();
301           }
302           written = file.write(position, buffers);
303           position += written;
304           file.updateModifiedTime();
305           completed = true;
306         } finally {
307           file.writeLock().unlock();
308         }
309       } catch (InterruptedException e) {
310         Thread.currentThread().interrupt();
311       } finally {
312         endBlocking(completed);
313       }
314     }
315 
316     return written;
317   }
318 
319   @Override
write(ByteBuffer src, long position)320   public int write(ByteBuffer src, long position) throws IOException {
321     checkNotNull(src);
322     Util.checkNotNegative(position, "position");
323     checkOpen();
324     checkWritable();
325 
326     int written = 0; // will definitely either be assigned or an exception will be thrown
327 
328     if (append) {
329       // synchronize because appending does update the channel's position
330       synchronized (this) {
331         boolean completed = false;
332         try {
333           if (!beginBlocking()) {
334             return 0; // AsynchronousCloseException will be thrown
335           }
336 
337           file.writeLock().lockInterruptibly();
338           try {
339             position = file.sizeWithoutLocking();
340             written = file.write(position, src);
341             this.position = position + written;
342             file.updateModifiedTime();
343             completed = true;
344           } finally {
345             file.writeLock().unlock();
346           }
347         } catch (InterruptedException e) {
348           Thread.currentThread().interrupt();
349         } finally {
350           endBlocking(completed);
351         }
352       }
353     } else {
354       // don't synchronize because the channel's position is not involved
355       boolean completed = false;
356       try {
357         if (!beginBlocking()) {
358           return 0; // AsynchronousCloseException will be thrown
359         }
360         file.writeLock().lockInterruptibly();
361         try {
362           written = file.write(position, src);
363           file.updateModifiedTime();
364           completed = true;
365         } finally {
366           file.writeLock().unlock();
367         }
368       } catch (InterruptedException e) {
369         Thread.currentThread().interrupt();
370       } finally {
371         endBlocking(completed);
372       }
373     }
374 
375     return written;
376   }
377 
378   @Override
position()379   public long position() throws IOException {
380     checkOpen();
381 
382     long pos;
383 
384     synchronized (this) {
385       boolean completed = false;
386       try {
387         begin(); // don't call beginBlocking() because this method doesn't block
388         if (!isOpen()) {
389           return 0; // AsynchronousCloseException will be thrown
390         }
391         pos = this.position;
392         completed = true;
393       } finally {
394         end(completed);
395       }
396     }
397 
398     return pos;
399   }
400 
401   @Override
position(long newPosition)402   public FileChannel position(long newPosition) throws IOException {
403     Util.checkNotNegative(newPosition, "newPosition");
404     checkOpen();
405 
406     synchronized (this) {
407       boolean completed = false;
408       try {
409         begin(); // don't call beginBlocking() because this method doesn't block
410         if (!isOpen()) {
411           return this; // AsynchronousCloseException will be thrown
412         }
413         this.position = newPosition;
414         completed = true;
415       } finally {
416         end(completed);
417       }
418     }
419 
420     return this;
421   }
422 
423   @Override
size()424   public long size() throws IOException {
425     checkOpen();
426 
427     long size = 0; // will definitely either be assigned or an exception will be thrown
428 
429     boolean completed = false;
430     try {
431       if (!beginBlocking()) {
432         return 0; // AsynchronousCloseException will be thrown
433       }
434       file.readLock().lockInterruptibly();
435       try {
436         size = file.sizeWithoutLocking();
437         completed = true;
438       } finally {
439         file.readLock().unlock();
440       }
441     } catch (InterruptedException e) {
442       Thread.currentThread().interrupt();
443     } finally {
444       endBlocking(completed);
445     }
446 
447     return size;
448   }
449 
450   @Override
truncate(long size)451   public FileChannel truncate(long size) throws IOException {
452     Util.checkNotNegative(size, "size");
453     checkOpen();
454     checkWritable();
455 
456     synchronized (this) {
457       boolean completed = false;
458       try {
459         if (!beginBlocking()) {
460           return this; // AsynchronousCloseException will be thrown
461         }
462         file.writeLock().lockInterruptibly();
463         try {
464           file.truncate(size);
465           if (position > size) {
466             position = size;
467           }
468           file.updateModifiedTime();
469           completed = true;
470         } finally {
471           file.writeLock().unlock();
472         }
473       } catch (InterruptedException e) {
474         Thread.currentThread().interrupt();
475       } finally {
476         endBlocking(completed);
477       }
478     }
479 
480     return this;
481   }
482 
483   @Override
force(boolean metaData)484   public void force(boolean metaData) throws IOException {
485     checkOpen();
486 
487     // nothing to do since writes are all direct to the storage
488     // however, we should handle the thread being interrupted anyway
489     boolean completed = false;
490     try {
491       begin();
492       completed = true;
493     } finally {
494       end(completed);
495     }
496   }
497 
498   @Override
transferTo(long position, long count, WritableByteChannel target)499   public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
500     checkNotNull(target);
501     Util.checkNotNegative(position, "position");
502     Util.checkNotNegative(count, "count");
503     checkOpen();
504     checkReadable();
505 
506     long transferred = 0; // will definitely either be assigned or an exception will be thrown
507 
508     // no need to synchronize here; this method does not make use of the channel's position
509     boolean completed = false;
510     try {
511       if (!beginBlocking()) {
512         return 0; // AsynchronousCloseException will be thrown
513       }
514       file.readLock().lockInterruptibly();
515       try {
516         transferred = file.transferTo(position, count, target);
517         file.updateAccessTime();
518         completed = true;
519       } finally {
520         file.readLock().unlock();
521       }
522     } catch (InterruptedException e) {
523       Thread.currentThread().interrupt();
524     } finally {
525       endBlocking(completed);
526     }
527 
528     return transferred;
529   }
530 
531   @Override
transferFrom(ReadableByteChannel src, long position, long count)532   public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
533     checkNotNull(src);
534     Util.checkNotNegative(position, "position");
535     Util.checkNotNegative(count, "count");
536     checkOpen();
537     checkWritable();
538 
539     long transferred = 0; // will definitely either be assigned or an exception will be thrown
540 
541     if (append) {
542       // synchronize because appending does update the channel's position
543       synchronized (this) {
544         boolean completed = false;
545         try {
546           if (!beginBlocking()) {
547             return 0; // AsynchronousCloseException will be thrown
548           }
549 
550           file.writeLock().lockInterruptibly();
551           try {
552             position = file.sizeWithoutLocking();
553             transferred = file.transferFrom(src, position, count);
554             this.position = position + transferred;
555             file.updateModifiedTime();
556             completed = true;
557           } finally {
558             file.writeLock().unlock();
559           }
560         } catch (InterruptedException e) {
561           Thread.currentThread().interrupt();
562         } finally {
563           endBlocking(completed);
564         }
565       }
566     } else {
567       // don't synchronize because the channel's position is not involved
568       boolean completed = false;
569       try {
570         if (!beginBlocking()) {
571           return 0; // AsynchronousCloseException will be thrown
572         }
573         file.writeLock().lockInterruptibly();
574         try {
575           transferred = file.transferFrom(src, position, count);
576           file.updateModifiedTime();
577           completed = true;
578         } finally {
579           file.writeLock().unlock();
580         }
581       } catch (InterruptedException e) {
582         Thread.currentThread().interrupt();
583       } finally {
584         endBlocking(completed);
585       }
586     }
587 
588     return transferred;
589   }
590 
591   @Override
map(MapMode mode, long position, long size)592   public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
593     // would like this to pretend to work, but can't create an implementation of MappedByteBuffer
594     // well, a direct buffer could be cast to MappedByteBuffer, but it couldn't work in general
595     throw new UnsupportedOperationException();
596   }
597 
598   @Override
lock(long position, long size, boolean shared)599   public FileLock lock(long position, long size, boolean shared) throws IOException {
600     checkLockArguments(position, size, shared);
601 
602     // lock is interruptible
603     boolean completed = false;
604     try {
605       begin();
606       completed = true;
607       return new FakeFileLock(this, position, size, shared);
608     } finally {
609       try {
610         end(completed);
611       } catch (ClosedByInterruptException e) {
612         throw new FileLockInterruptionException();
613       }
614     }
615   }
616 
617   @Override
tryLock(long position, long size, boolean shared)618   public FileLock tryLock(long position, long size, boolean shared) throws IOException {
619     checkLockArguments(position, size, shared);
620 
621     // tryLock is not interruptible
622     return new FakeFileLock(this, position, size, shared);
623   }
624 
checkLockArguments(long position, long size, boolean shared)625   private void checkLockArguments(long position, long size, boolean shared) throws IOException {
626     Util.checkNotNegative(position, "position");
627     Util.checkNotNegative(size, "size");
628     checkOpen();
629     if (shared) {
630       checkReadable();
631     } else {
632       checkWritable();
633     }
634   }
635 
636   @Override
implCloseChannel()637   protected void implCloseChannel() {
638     // interrupt the current blocking threads, if any, causing them to throw
639     // ClosedByInterruptException
640     try {
641       synchronized (blockingThreads) {
642         for (Thread thread : blockingThreads) {
643           thread.interrupt();
644         }
645       }
646     } finally {
647       fileSystemState.unregister(this);
648       file.closed();
649     }
650   }
651 
652   /** A file lock that does nothing, since only one JVM process has access to this file system. */
653   static final class FakeFileLock extends FileLock {
654 
655     private final AtomicBoolean valid = new AtomicBoolean(true);
656 
FakeFileLock(FileChannel channel, long position, long size, boolean shared)657     public FakeFileLock(FileChannel channel, long position, long size, boolean shared) {
658       super(channel, position, size, shared);
659     }
660 
FakeFileLock(AsynchronousFileChannel channel, long position, long size, boolean shared)661     public FakeFileLock(AsynchronousFileChannel channel, long position, long size, boolean shared) {
662       super(channel, position, size, shared);
663     }
664 
665     @Override
isValid()666     public boolean isValid() {
667       return valid.get();
668     }
669 
670     @Override
release()671     public void release() throws IOException {
672       valid.set(false);
673     }
674   }
675 }
676