• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 import java.lang.invoke.VarHandle;
18 import java.lang.invoke.MethodHandles;
19 import java.time.Duration;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.function.Consumer;
22 
23 /**
24  * Runs tests to validate the concurrency guarantees of VarHandle.
25  *
26  * The tests involve having a lot of tasks and significantly fewer threads. The tasks are stored on
27  * a queue and each thread tries to grab a task from the queue using operations like
28  * VarHandle.compareAndSet(). If the operation works as specified, then each task would only be
29  * handled in a single thread, exactly once.
30  *
31  * The tasks just add atomically a specified integer to a total. If the total is different from the
32  * expected one, then either some tasks were run multiple times (on multiple threads), or some task
33  * were not run at all (skipped by all threads).
34  */
35 public class Main {
36   private static final VarHandle QA;
37   static {
38       QA = MethodHandles.arrayElementVarHandle(TestTask[].class);
39   }
40 
41   private static final int TASK_COUNT = 10000;
42   private static final int THREAD_COUNT = 100;
43   /* Each test may need several retries before a concurrent failure is seen. In the past, for a
44    * known bug, between 5 and 10 retries were sufficient. Use RETRIES to configure how many
45    * iterations to retry for each test scenario. However, to avoid the test running for too long,
46    * for example with gcstress, set a cap duration in MAX_RETRIES_DURATION. With this at least one
47    * iteration would run, but there could be fewer retries if each of them takes too long. */
48   private static final int RETRIES = 50;
49   private static final Duration MAX_RETRIES_DURATION = Duration.ofMinutes(1);
50 
main(String[] args)51   public static void main(String[] args) throws Throwable {
52     testConcurrentProcessing(new CompareAndExchangeRunnerFactory(), "compareAndExchange");
53     testConcurrentProcessing(new CompareAndSetRunnerFactory(), "compareAndSet");
54     testConcurrentProcessing(new WeakCompareAndSetRunnerFactory(), "weakCompareAndSet");
55   }
56 
testConcurrentProcessing(RunnerFactory factory, String testName)57   private static void testConcurrentProcessing(RunnerFactory factory,
58           String testName) throws Throwable {
59     final Duration startTs = Duration.ofNanos(System.nanoTime());
60     final Duration endTs = startTs.plus(MAX_RETRIES_DURATION);
61     for (int i = 0; i < RETRIES; ++i) {
62       concurrentProcessingTestIteration(factory, i, testName);
63       Duration now = Duration.ofNanos(System.nanoTime());
64       if (0 < now.compareTo(endTs)) {
65           break;
66       }
67     }
68   }
69 
concurrentProcessingTestIteration(RunnerFactory factory, int iteration, String testName)70   private static void concurrentProcessingTestIteration(RunnerFactory factory,
71           int iteration, String testName) throws Throwable {
72       final TestTask[] tasks = new TestTask[TASK_COUNT];
73       final AtomicInteger result = new AtomicInteger();
74 
75       for (int i = 0; i < TASK_COUNT; ++i) {
76           tasks[i] = new TestTask(Integer.valueOf(i+1), result::addAndGet);
77       }
78 
79       Thread[] threads = new Thread[THREAD_COUNT];
80       for (int i = 0; i < THREAD_COUNT; ++i) {
81           threads[i] = factory.createRunner(tasks);
82       }
83 
84       for (int i = 0; i < THREAD_COUNT; ++i) {
85           threads[i].start();
86       }
87 
88       for (int i = 0; i < THREAD_COUNT; ++i) {
89           threads[i].join();
90       }
91 
92       check(result.get(), TASK_COUNT * (TASK_COUNT + 1) / 2,
93               testName + " test result not as expected", iteration);
94   }
95 
96   /**
97    * Processes the task queue until there are no tasks left.
98    *
99    * The actual task-grabbing mechanism is implemented in subclasses through grabTask(). This allows
100    * testing various mechanisms, like compareAndSet() and compareAndExchange().
101    */
102   private static abstract class TaskRunner extends Thread {
103 
104       protected final TestTask[] tasks;
105 
TaskRunner(TestTask[] tasks)106       TaskRunner(TestTask[] tasks) {
107           this.tasks = tasks;
108       }
109 
110       @Override
run()111       public void run() {
112           int i = 0;
113           while (i < TASK_COUNT) {
114               TestTask t = (TestTask) QA.get(tasks, i);
115               if (t == null) {
116                   ++i;
117                   continue;
118               }
119               if (!grabTask(t, i)) {
120                   continue;
121               }
122               ++i;
123               VarHandle.releaseFence();
124               t.exec();
125           }
126       }
127 
128       /**
129        * Grabs the next task from the queue in an atomic way.
130        *
131        * Once a task is retrieved successfully, the queue should no longer hold a reference to it.
132        * This would be done, for example, by swapping the task with a null value.
133        *
134        * @param t The task to get from the queue
135        * @param i The index where the task is found
136        *
137        * @return {@code true} if the task has been retrieved and is not available to any other
138        * threads. Otherwise {@code false}. If {@code false} is returned, then either the task was no
139        * longer present on the queue due to another thread grabbing it, or, in case of spurious
140        * failure, the task is still available and no other thread managed to grab it.
141        */
grabTask(TestTask t, int i)142       protected abstract boolean grabTask(TestTask t, int i);
143   }
144 
145   private static class TaskRunnerWithCompareAndExchange extends TaskRunner {
146 
TaskRunnerWithCompareAndExchange(TestTask[] tasks)147       TaskRunnerWithCompareAndExchange(TestTask[] tasks) {
148           super(tasks);
149       }
150 
151       @Override
grabTask(TestTask t, int i)152       protected boolean grabTask(TestTask t, int i) {
153           return (t == QA.compareAndExchange(tasks, i, t, null));
154       }
155   }
156 
157   private static class TaskRunnerWithCompareAndSet extends TaskRunner {
158 
TaskRunnerWithCompareAndSet(TestTask[] tasks)159       TaskRunnerWithCompareAndSet(TestTask[] tasks) {
160           super(tasks);
161       }
162 
163       @Override
grabTask(TestTask t, int i)164       protected boolean grabTask(TestTask t, int i) {
165           return QA.compareAndSet(tasks, i, t, null);
166       }
167   }
168 
169   private static class TaskRunnerWithWeakCompareAndSet extends TaskRunner {
170 
TaskRunnerWithWeakCompareAndSet(TestTask[] tasks)171       TaskRunnerWithWeakCompareAndSet(TestTask[] tasks) {
172           super(tasks);
173       }
174 
175       @Override
grabTask(TestTask t, int i)176       protected boolean grabTask(TestTask t, int i) {
177           return QA.weakCompareAndSet(tasks, i, t, null);
178       }
179   }
180 
181 
182   private interface RunnerFactory {
createRunner(TestTask[] tasks)183       Thread createRunner(TestTask[] tasks);
184   }
185 
186   private static class CompareAndExchangeRunnerFactory implements RunnerFactory {
187       @Override
createRunner(TestTask[] tasks)188       public Thread createRunner(TestTask[] tasks) {
189           return new TaskRunnerWithCompareAndExchange(tasks);
190       }
191   }
192 
193   private static class CompareAndSetRunnerFactory implements RunnerFactory {
194       @Override
createRunner(TestTask[] tasks)195       public Thread createRunner(TestTask[] tasks) {
196           return new TaskRunnerWithCompareAndSet(tasks);
197       }
198   }
199 
200   private static class WeakCompareAndSetRunnerFactory implements RunnerFactory {
201       @Override
createRunner(TestTask[] tasks)202       public Thread createRunner(TestTask[] tasks) {
203           return new TaskRunnerWithWeakCompareAndSet(tasks);
204       }
205   }
206 
207   private static class TestTask {
208       private final Integer ord;
209       private final Consumer<Integer> action;
210 
TestTask(Integer ord, Consumer<Integer> action)211       TestTask(Integer ord, Consumer<Integer> action) {
212           this.ord = ord;
213           this.action = action;
214       }
215 
exec()216       public void exec() {
217           action.accept(ord);
218       }
219   }
220 
check(int actual, int expected, String msg, int iteration)221   private static void check(int actual, int expected, String msg, int iteration) {
222     if (actual != expected) {
223       System.err.println(String.format("[iteration %d] %s : %d != %d",
224                   iteration, msg, actual, expected));
225       System.exit(1);
226     }
227   }
228 }
229