• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.google.android.libraries.mobiledatadownload.file.openers;
17 
18 import static com.google.android.libraries.mobiledatadownload.file.common.testing.StreamUtils.createFile;
19 import static com.google.android.libraries.mobiledatadownload.file.common.testing.StreamUtils.readFile;
20 import static com.google.common.truth.Truth.assertThat;
21 import static java.nio.charset.StandardCharsets.UTF_8;
22 import static org.junit.Assert.assertThrows;
23 
24 import android.app.Activity;
25 import android.app.Service;
26 import android.content.BroadcastReceiver;
27 import android.content.Context;
28 import android.content.Intent;
29 import android.content.IntentFilter;
30 import android.net.Uri;
31 import android.os.Binder;
32 import android.os.IBinder;
33 import android.util.Log;
34 import androidx.test.core.app.ApplicationProvider;
35 import androidx.test.rule.ServiceTestRule;
36 import com.google.android.libraries.mobiledatadownload.file.SynchronousFileStorage;
37 import com.google.android.libraries.mobiledatadownload.file.backends.JavaFileBackend;
38 import com.google.android.libraries.mobiledatadownload.file.common.testing.TemporaryUri;
39 import com.google.common.base.Ascii;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.io.ByteStreams;
42 import java.io.IOException;
43 import java.io.InputStream;
44 import java.io.OutputStream;
45 import org.junit.Rule;
46 import org.junit.Test;
47 import org.junit.rules.TestName;
48 import org.junit.runner.RunWith;
49 import org.junit.runners.JUnit4;
50 
51 @RunWith(JUnit4.class)
52 public final class StreamMutationOpenerAndroidTest {
53   private static final String TAG = "TestStreamMutationOpenerAndroid";
54 
55   private final Context context = ApplicationProvider.getApplicationContext();
56 
57   @Rule public TemporaryUri tmpUri = new TemporaryUri();
58   @Rule public final ServiceTestRule serviceRule = new ServiceTestRule();
59   @Rule public TestName testName = new TestName();
60 
storage()61   public static SynchronousFileStorage storage() {
62     return new SynchronousFileStorage(ImmutableList.of(new JavaFileBackend()));
63   }
64 
65   @Test
interleaveMutations_withoutLocking_lacksIsolation()66   public void interleaveMutations_withoutLocking_lacksIsolation() throws Exception {
67     serviceRule.startService(new Intent(context, TestHelper.class));
68 
69     SynchronousFileStorage storage = storage();
70     Uri dirUri = tmpUri.newDirectoryUri();
71     Uri uri = dirUri.buildUpon().appendPath("testfile").build();
72 
73     createFile(storage, uri, "content");
74     sendToHelper(uri);
75 
76     try (StreamMutationOpener.Mutator mutator = storage.open(uri, StreamMutationOpener.create())) {
77       mutator.mutate(
78           (InputStream in, OutputStream out) -> {
79             assertThat(readFile(storage, uri)).isEqualTo("content");
80 
81             String read = new String(ByteStreams.toByteArray(in), UTF_8);
82             String write = Ascii.toUpperCase(read);
83             assertThat(write).isEqualTo("CONTENT");
84 
85             sendToHelper(uri);
86 
87             assertThat(readFile(storage, uri)).isEqualTo("tnetnoc");
88 
89             out.write(write.getBytes(UTF_8));
90             // This write hasn't closed, so the destination file is unchanged. This mutation isn't
91             // applied to the result of the first one and it will overwrite it.
92             assertThat(readFile(storage, uri)).isEqualTo("tnetnoc");
93             return true;
94           });
95     }
96 
97     String actual = readFile(storage, uri);
98     assertThat(actual).isEqualTo("CONTENT"); // Only the second mutation is applied.
99     assertThat(storage.children(dirUri)).hasSize(1);
100   }
101 
102   /** Helper for interleaveMutations_withoutLocking_lacksIsolation. */
103   private static class InterleaveMutationsWithoutLockingLacksIsolationBroadcastReceiver
104       extends TestingBroadcastReceiver {
105     @Override
run()106     public void run() {
107       SynchronousFileStorage storage = storage();
108 
109       try (StreamMutationOpener.Mutator mutator =
110           storage.open(uri, StreamMutationOpener.create())) {
111         mutator.mutate(
112             (InputStream in, OutputStream out) -> {
113               String read = new String(ByteStreams.toByteArray(in), UTF_8);
114               done.signal();
115               resume.await();
116               out.write(new StringBuilder(read).reverse().toString().getBytes(UTF_8));
117               return true;
118             });
119       } catch (Exception ex) {
120         Log.e(TAG, "failed", ex);
121         setResultCode(Activity.RESULT_CANCELED);
122       }
123       done.signal();
124     }
125   }
126 
127   @Test
interleaveMutations_withLocking()128   public void interleaveMutations_withLocking() throws Exception {
129     serviceRule.startService(new Intent(context, TestHelper.class));
130 
131     SynchronousFileStorage storage = storage();
132     Uri dirUri = tmpUri.newDirectoryUri();
133     Uri uri = dirUri.buildUpon().appendPath("testfile").build();
134 
135     createFile(storage, uri, "content");
136     sendToHelper(uri);
137 
138     // At first we fail to acquire the lock.
139     LockFileOpener nonBlockingLocking = LockFileOpener.createExclusive().nonBlocking(true);
140     assertThrows(
141         IOException.class,
142         () -> storage.open(uri, StreamMutationOpener.create().withLocking(nonBlockingLocking)));
143 
144     // Peek at the file, ignoring advisory locks.
145     assertThat(readFile(storage, uri)).isEqualTo("content");
146 
147     sendToHelper(uri);
148 
149     // Now the lock should be free and we can proceed safely.
150     LockFileOpener blockingLocking = LockFileOpener.createExclusive().nonBlocking(true);
151     try (StreamMutationOpener.Mutator mutator =
152         storage.open(uri, StreamMutationOpener.create().withLocking(blockingLocking))) {
153       mutator.mutate(
154           (InputStream in, OutputStream out) -> {
155             String read = new String(ByteStreams.toByteArray(in), UTF_8);
156             assertThat(read).isEqualTo("tnetnoc");
157             String write = Ascii.toUpperCase(read);
158             out.write(write.getBytes(UTF_8));
159             return true;
160           });
161     }
162     String actual = readFile(storage, uri);
163     assertThat(actual).isEqualTo("TNETNOC");
164     assertThat(storage.children(dirUri)).hasSize(2); // data file + lock file
165   }
166 
167   /** Helper for interleaveMutations_withLocking. */
168   private static class InterleaveMutationsWithLockingBroadcastReceiver
169       extends TestingBroadcastReceiver {
170     @Override
run()171     public void run() {
172       SynchronousFileStorage storage = storage();
173       try {
174         LockFileOpener locking = LockFileOpener.createExclusive();
175         try (StreamMutationOpener.Mutator mutator =
176             storage.open(uri, StreamMutationOpener.create().withLocking(locking))) {
177           mutator.mutate(
178               (InputStream in, OutputStream out) -> {
179                 String read = new String(ByteStreams.toByteArray(in), UTF_8);
180                 done.signal();
181                 resume.await();
182                 out.write(new StringBuilder(read).reverse().toString().getBytes(UTF_8));
183                 return true;
184               });
185         }
186       } catch (Exception ex) {
187         Log.e(TAG, "failed", ex);
188         setResultCode(Activity.RESULT_CANCELED);
189       }
190       done.signal();
191     }
192   }
193 
194   // Testing infrastructure from here down.
195   // TODO(b/120859198): Refactor into shared code.
196   private abstract static class TestingBroadcastReceiver extends BroadcastReceiver
197       implements Runnable {
198     protected final Latch resume = new Latch();
199     protected final Latch done = new Latch();
200     protected Uri uri;
201     private Thread thread = null;
202 
203     @Override
onReceive(Context context, Intent intent)204     public void onReceive(Context context, Intent intent) {
205       setResultCode(Activity.RESULT_OK);
206       uri = Uri.parse(intent.getExtras().getString("uri"));
207       if (thread == null) {
208         thread = new Thread(this);
209         thread.start();
210       } else {
211         resume.signal();
212       }
213       done.await();
214     }
215 
216     @Override
run()217     public abstract void run();
218   }
219 
220   public static class TestHelper extends Service {
221     private final BroadcastReceiver withoutLocking =
222         new InterleaveMutationsWithoutLockingLacksIsolationBroadcastReceiver();
223     private final BroadcastReceiver withLocking =
224         new InterleaveMutationsWithLockingBroadcastReceiver();
225     private final IBinder dummyBinder = new Binder() {};
226 
227     @Override
onBind(Intent intent)228     public IBinder onBind(Intent intent) {
229       return dummyBinder;
230     }
231 
232     @Override
onCreate()233     public void onCreate() {
234       registerReceiver(
235           withoutLocking, new IntentFilter("interleaveMutations_withoutLocking_lacksIsolation"));
236       registerReceiver(withLocking, new IntentFilter("interleaveMutations_withLocking"));
237     }
238 
239     @Override
onDestroy()240     public void onDestroy() {
241       unregisterReceiver(withoutLocking);
242       unregisterReceiver(withLocking);
243     }
244   }
245 
246   /** A broadcast receiver that allows caller to wait until it receives an intent. */
247   private static class NotifyingBroadcastReceiver extends BroadcastReceiver {
248     private final Latch received = new Latch();
249     private boolean success = true;
250 
251     @Override
onReceive(Context context, Intent intent)252     public void onReceive(Context context, Intent intent) {
253       success = (getResultCode() == Activity.RESULT_OK);
254       received.signal();
255     }
256 
awaitDelivery()257     public void awaitDelivery() throws Exception {
258       received.await();
259       if (!success) {
260         throw new Exception("broadcast handler failed");
261       }
262     }
263   }
264 
265   /** A simple latch that resets itself after await. */
266   private static class Latch {
267     boolean signaled = false;
268 
signal()269     synchronized void signal() {
270       signaled = true;
271       notify();
272     }
273 
await()274     synchronized void await() {
275       while (!signaled) {
276         try {
277           wait();
278         } catch (InterruptedException ex) {
279           // Ignore.
280         }
281       }
282       signaled = false;
283     }
284   }
285 
286   /** Sends params to helper and wait for it to finish processing them. */
sendToHelper(Uri uri)287   private void sendToHelper(Uri uri) throws IOException {
288     Intent intent = new Intent(testName.getMethodName());
289     intent.putExtra("uri", uri.toString());
290     NotifyingBroadcastReceiver receiver = new NotifyingBroadcastReceiver();
291     context.sendOrderedBroadcast(
292         intent, null, receiver, null, Activity.RESULT_FIRST_USER, null, null);
293     try {
294       receiver.awaitDelivery();
295     } catch (IOException ex) {
296       throw ex;
297     } catch (Exception ex) {
298       throw new IOException(ex);
299     }
300   }
301 }
302