1 /* 2 * Copyright 2013 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.jimfs; 18 19 import static com.google.common.jimfs.TestUtils.buffer; 20 import static com.google.common.jimfs.TestUtils.regularFile; 21 import static com.google.common.truth.Truth.assertThat; 22 import static java.nio.file.StandardOpenOption.READ; 23 import static java.nio.file.StandardOpenOption.WRITE; 24 import static java.util.concurrent.TimeUnit.MILLISECONDS; 25 import static java.util.concurrent.TimeUnit.SECONDS; 26 import static org.junit.Assert.assertEquals; 27 import static org.junit.Assert.assertFalse; 28 import static org.junit.Assert.assertNotNull; 29 import static org.junit.Assert.assertSame; 30 import static org.junit.Assert.fail; 31 32 import com.google.common.collect.ImmutableSet; 33 import com.google.common.util.concurrent.Runnables; 34 import com.google.common.util.concurrent.SettableFuture; 35 import com.google.common.util.concurrent.Uninterruptibles; 36 import java.io.IOException; 37 import java.nio.ByteBuffer; 38 import java.nio.channels.AsynchronousCloseException; 39 import java.nio.channels.AsynchronousFileChannel; 40 import java.nio.channels.ClosedChannelException; 41 import java.nio.channels.CompletionHandler; 42 import java.nio.channels.FileLock; 43 import java.nio.file.OpenOption; 44 import java.util.concurrent.ExecutionException; 45 import java.util.concurrent.ExecutorService; 46 import java.util.concurrent.Executors; 47 import java.util.concurrent.Future; 48 import org.junit.Test; 49 import org.junit.runner.RunWith; 50 import org.junit.runners.JUnit4; 51 52 /** 53 * Tests for {@link JimfsAsynchronousFileChannel}. 54 * 55 * @author Colin Decker 56 */ 57 @RunWith(JUnit4.class) 58 public class JimfsAsynchronousFileChannelTest { 59 channel( RegularFile file, ExecutorService executor, OpenOption... options)60 private static JimfsAsynchronousFileChannel channel( 61 RegularFile file, ExecutorService executor, OpenOption... options) throws IOException { 62 JimfsFileChannel channel = 63 new JimfsFileChannel( 64 file, 65 Options.getOptionsForChannel(ImmutableSet.copyOf(options)), 66 new FileSystemState(Runnables.doNothing())); 67 return new JimfsAsynchronousFileChannel(channel, executor); 68 } 69 70 /** 71 * Just tests the main read/write methods... the methods all delegate to the non-async channel 72 * anyway. 73 */ 74 @Test testAsyncChannel()75 public void testAsyncChannel() throws Throwable { 76 RegularFile file = regularFile(15); 77 ExecutorService executor = Executors.newSingleThreadExecutor(); 78 JimfsAsynchronousFileChannel channel = channel(file, executor, READ, WRITE); 79 80 try { 81 assertEquals(15, channel.size()); 82 83 assertSame(channel, channel.truncate(5)); 84 assertEquals(5, channel.size()); 85 86 file.write(5, new byte[5], 0, 5); 87 checkAsyncRead(channel); 88 checkAsyncWrite(channel); 89 checkAsyncLock(channel); 90 91 channel.close(); 92 assertFalse(channel.isOpen()); 93 } finally { 94 executor.shutdown(); 95 } 96 } 97 98 @Test testClosedChannel()99 public void testClosedChannel() throws Throwable { 100 RegularFile file = regularFile(15); 101 ExecutorService executor = Executors.newSingleThreadExecutor(); 102 103 try { 104 JimfsAsynchronousFileChannel channel = channel(file, executor, READ, WRITE); 105 channel.close(); 106 107 assertClosed(channel.read(ByteBuffer.allocate(10), 0)); 108 assertClosed(channel.write(ByteBuffer.allocate(10), 15)); 109 assertClosed(channel.lock()); 110 assertClosed(channel.lock(0, 10, true)); 111 } finally { 112 executor.shutdown(); 113 } 114 } 115 116 @Test testAsyncClose_write()117 public void testAsyncClose_write() throws Throwable { 118 RegularFile file = regularFile(15); 119 ExecutorService executor = Executors.newFixedThreadPool(4); 120 121 try { 122 JimfsAsynchronousFileChannel channel = channel(file, executor, READ, WRITE); 123 124 file.writeLock().lock(); // cause another thread trying to write to block 125 126 // future-returning write 127 Future<Integer> future = channel.write(ByteBuffer.allocate(10), 0); 128 129 // completion handler write 130 SettableFuture<Integer> completionHandlerFuture = SettableFuture.create(); 131 channel.write(ByteBuffer.allocate(10), 0, null, setFuture(completionHandlerFuture)); 132 133 // Despite this 10ms sleep to allow plenty of time, it's possible, though very rare, for a 134 // race to cause the channel to be closed before the asynchronous calls get to the initial 135 // check that the channel is open, causing ClosedChannelException to be thrown rather than 136 // AsynchronousCloseException. This is not a problem in practice, just a quirk of how these 137 // tests work and that we don't have a way of waiting for the operations to get past that 138 // check. 139 Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS); 140 141 channel.close(); 142 143 assertAsynchronousClose(future); 144 assertAsynchronousClose(completionHandlerFuture); 145 } finally { 146 executor.shutdown(); 147 } 148 } 149 150 @Test testAsyncClose_read()151 public void testAsyncClose_read() throws Throwable { 152 RegularFile file = regularFile(15); 153 ExecutorService executor = Executors.newFixedThreadPool(2); 154 155 try { 156 JimfsAsynchronousFileChannel channel = channel(file, executor, READ, WRITE); 157 158 file.writeLock().lock(); // cause another thread trying to read to block 159 160 // future-returning read 161 Future<Integer> future = channel.read(ByteBuffer.allocate(10), 0); 162 163 // completion handler read 164 SettableFuture<Integer> completionHandlerFuture = SettableFuture.create(); 165 channel.read(ByteBuffer.allocate(10), 0, null, setFuture(completionHandlerFuture)); 166 167 // Despite this 10ms sleep to allow plenty of time, it's possible, though very rare, for a 168 // race to cause the channel to be closed before the asynchronous calls get to the initial 169 // check that the channel is open, causing ClosedChannelException to be thrown rather than 170 // AsynchronousCloseException. This is not a problem in practice, just a quirk of how these 171 // tests work and that we don't have a way of waiting for the operations to get past that 172 // check. 173 Uninterruptibles.sleepUninterruptibly(10, MILLISECONDS); 174 175 channel.close(); 176 177 assertAsynchronousClose(future); 178 assertAsynchronousClose(completionHandlerFuture); 179 } finally { 180 executor.shutdown(); 181 } 182 } 183 checkAsyncRead(AsynchronousFileChannel channel)184 private static void checkAsyncRead(AsynchronousFileChannel channel) throws Throwable { 185 ByteBuffer buf = buffer("1234567890"); 186 assertEquals(10, (int) channel.read(buf, 0).get()); 187 188 buf.flip(); 189 190 SettableFuture<Integer> future = SettableFuture.create(); 191 channel.read(buf, 0, null, setFuture(future)); 192 193 assertThat(future.get(10, SECONDS)).isEqualTo(10); 194 } 195 checkAsyncWrite(AsynchronousFileChannel asyncChannel)196 private static void checkAsyncWrite(AsynchronousFileChannel asyncChannel) throws Throwable { 197 ByteBuffer buf = buffer("1234567890"); 198 assertEquals(10, (int) asyncChannel.write(buf, 0).get()); 199 200 buf.flip(); 201 SettableFuture<Integer> future = SettableFuture.create(); 202 asyncChannel.write(buf, 0, null, setFuture(future)); 203 204 assertThat(future.get(10, SECONDS)).isEqualTo(10); 205 } 206 checkAsyncLock(AsynchronousFileChannel channel)207 private static void checkAsyncLock(AsynchronousFileChannel channel) throws Throwable { 208 assertNotNull(channel.lock().get()); 209 assertNotNull(channel.lock(0, 10, true).get()); 210 211 SettableFuture<FileLock> future = SettableFuture.create(); 212 channel.lock(0, 10, true, null, setFuture(future)); 213 214 assertNotNull(future.get(10, SECONDS)); 215 } 216 217 /** 218 * Returns a {@code CompletionHandler} that sets the appropriate result or exception on the given 219 * {@code future} on completion. 220 */ setFuture(final SettableFuture<T> future)221 private static <T> CompletionHandler<T, Object> setFuture(final SettableFuture<T> future) { 222 return new CompletionHandler<T, Object>() { 223 @Override 224 public void completed(T result, Object attachment) { 225 future.set(result); 226 } 227 228 @Override 229 public void failed(Throwable exc, Object attachment) { 230 future.setException(exc); 231 } 232 }; 233 } 234 235 /** Assert that the future fails, with the failure caused by {@code ClosedChannelException}. */ 236 private static void assertClosed(Future<?> future) throws Throwable { 237 try { 238 future.get(10, SECONDS); 239 fail("ChannelClosedException was not thrown"); 240 } catch (ExecutionException expected) { 241 assertThat(expected.getCause()).isInstanceOf(ClosedChannelException.class); 242 } 243 } 244 245 /** 246 * Assert that the future fails, with the failure caused by either {@code 247 * AsynchronousCloseException} or (rarely) {@code ClosedChannelException}. 248 */ 249 private static void assertAsynchronousClose(Future<?> future) throws Throwable { 250 try { 251 future.get(10, SECONDS); 252 fail("no exception was thrown"); 253 } catch (ExecutionException expected) { 254 Throwable t = expected.getCause(); 255 if (!(t instanceof AsynchronousCloseException || t instanceof ClosedChannelException)) { 256 fail( 257 "expected AsynchronousCloseException (or in rare cases ClosedChannelException); got " 258 + t); 259 } 260 } 261 } 262 } 263