1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.channels.* 9 import org.junit.Test 10 import java.util.concurrent.locks.* 11 import kotlin.concurrent.* 12 import kotlin.test.* 13 14 /** 15 * Tests event loops integration. 16 * See [https://github.com/Kotlin/kotlinx.coroutines/issues/860]. 17 */ 18 class EventLoopsTest : TestBase() { 19 @Test testNestedRunBlockingnull20 fun testNestedRunBlocking() { 21 runBlocking { // outer event loop 22 // Produce string "OK" 23 val ch = produce { send("OK") } 24 // try receive this string in a blocking way: 25 assertEquals("OK", runBlocking { ch.receive() }) // it should not hang here 26 } 27 } 28 29 @Test testUnconfinedInRunBlockingnull30 fun testUnconfinedInRunBlocking() { 31 var completed = false 32 runBlocking { 33 launch(Dispatchers.Unconfined) { 34 completed = true 35 } 36 // should not go into runBlocking loop, but complete here 37 assertTrue(completed) 38 } 39 } 40 41 @Test testNestedUnconfinednull42 fun testNestedUnconfined() { 43 expect(1) 44 GlobalScope.launch(Dispatchers.Unconfined) { 45 expect(2) 46 GlobalScope.launch(Dispatchers.Unconfined) { 47 // this gets scheduled into outer unconfined loop 48 expect(4) 49 } 50 expect(3) // ^^ executed before the above unconfined 51 } 52 finish(5) 53 } 54 55 @Test <lambda>null56 fun testEventLoopInDefaultExecutor() = runTest { 57 expect(1) 58 withContext(Dispatchers.Unconfined) { 59 delay(1) 60 assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) 61 expect(2) 62 // now runBlocking inside default executor thread --> should use outer event loop 63 DefaultExecutor.enqueue(Runnable { 64 expect(4) // will execute when runBlocking runs loop 65 }) 66 expect(3) 67 runBlocking { 68 expect(5) 69 } 70 } 71 finish(6) 72 } 73 74 /** 75 * Simple test for [processNextEventInCurrentThread] API use-case. 76 */ 77 @Test <lambda>null78 fun testProcessNextEventInCurrentThreadSimple() = runTest { 79 expect(1) 80 val event = EventSync() 81 // this coroutine fires event 82 launch { 83 expect(3) 84 event.fireEvent() 85 } 86 // main coroutine waits for event (same thread!) 87 expect(2) 88 event.blockingAwait() 89 finish(4) 90 } 91 92 @Test <lambda>null93 fun testSecondThreadRunBlocking() = runTest { 94 val testThread = Thread.currentThread() 95 val testContext = coroutineContext 96 val event = EventSync() // will signal completion 97 var thread = thread { 98 runBlocking { // outer event loop 99 // Produce string "OK" 100 val ch = produce { send("OK") } 101 // try receive this string in a blocking way using test context (another thread) 102 assertEquals("OK", runBlocking(testContext) { 103 assertEquals(testThread, Thread.currentThread()) 104 ch.receive() // it should not hang here 105 }) 106 } 107 event.fireEvent() // done thread 108 } 109 event.blockingAwait() // wait for thread to complete 110 thread.join() // it is safe to join thread now 111 } 112 113 /** 114 * Test for [processNextEventInCurrentThread] API use-case with delay. 115 */ 116 @Test <lambda>null117 fun testProcessNextEventInCurrentThreadDelay() = runTest { 118 expect(1) 119 val event = EventSync() 120 // this coroutine fires event 121 launch { 122 expect(3) 123 delay(100) 124 event.fireEvent() 125 } 126 // main coroutine waits for event (same thread!) 127 expect(2) 128 event.blockingAwait() 129 finish(4) 130 } 131 132 class EventSync { 133 private val waitingThread = atomic<Thread?>(null) 134 private val fired = atomic(false) 135 fireEventnull136 fun fireEvent() { 137 fired.value = true 138 waitingThread.value?.let { LockSupport.unpark(it) } 139 } 140 blockingAwaitnull141 fun blockingAwait() { 142 check(waitingThread.getAndSet(Thread.currentThread()) == null) 143 while (!fired.getAndSet(false)) { 144 val time = processNextEventInCurrentThread() 145 LockSupport.parkNanos(time) 146 } 147 waitingThread.value = null 148 } 149 } 150 }