• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.*
9 import org.junit.Test
10 import java.util.concurrent.locks.*
11 import kotlin.concurrent.*
12 import kotlin.test.*
13 
14 /**
15  * Tests event loops integration.
16  * See [https://github.com/Kotlin/kotlinx.coroutines/issues/860].
17  */
18 class EventLoopsTest : TestBase() {
19     @Test
testNestedRunBlockingnull20     fun testNestedRunBlocking() {
21         runBlocking { // outer event loop
22             // Produce string "OK"
23             val ch = produce { send("OK") }
24             // try receive this string in a blocking way:
25             assertEquals("OK", runBlocking { ch.receive() }) // it should not hang here
26         }
27     }
28 
29     @Test
testUnconfinedInRunBlockingnull30     fun testUnconfinedInRunBlocking() {
31         var completed = false
32         runBlocking {
33             launch(Dispatchers.Unconfined) {
34                 completed = true
35             }
36             // should not go into runBlocking loop, but complete here
37             assertTrue(completed)
38         }
39     }
40 
41     @Test
testNestedUnconfinednull42     fun testNestedUnconfined() {
43         expect(1)
44         GlobalScope.launch(Dispatchers.Unconfined) {
45             expect(2)
46             GlobalScope.launch(Dispatchers.Unconfined) {
47                 // this gets scheduled into outer unconfined loop
48                 expect(4)
49             }
50             expect(3) // ^^ executed before the above unconfined
51         }
52         finish(5)
53     }
54 
55     @Test
<lambda>null56     fun testEventLoopInDefaultExecutor() = runTest {
57         expect(1)
58         withContext(Dispatchers.Unconfined) {
59             delay(1)
60             assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME))
61             expect(2)
62             // now runBlocking inside default executor thread --> should use outer event loop
63             DefaultExecutor.enqueue(Runnable {
64                 expect(4) // will execute when runBlocking runs loop
65             })
66             expect(3)
67             runBlocking {
68                 expect(5)
69             }
70         }
71         finish(6)
72     }
73 
74     /**
75      * Simple test for [processNextEventInCurrentThread] API use-case.
76      */
77     @Test
<lambda>null78     fun testProcessNextEventInCurrentThreadSimple() = runTest {
79         expect(1)
80         val event = EventSync()
81         // this coroutine fires event
82         launch {
83             expect(3)
84             event.fireEvent()
85         }
86         // main coroutine waits for event (same thread!)
87         expect(2)
88         event.blockingAwait()
89         finish(4)
90     }
91 
92     @Test
<lambda>null93     fun testSecondThreadRunBlocking() = runTest {
94         val testThread = Thread.currentThread()
95         val testContext = coroutineContext
96         val event = EventSync() // will signal completion
97         var thread = thread {
98             runBlocking { // outer event loop
99                 // Produce string "OK"
100                 val ch = produce { send("OK") }
101                 // try receive this string in a blocking way using test context (another thread)
102                 assertEquals("OK", runBlocking(testContext) {
103                     assertEquals(testThread, Thread.currentThread())
104                     ch.receive() // it should not hang here
105                 })
106             }
107             event.fireEvent() // done thread
108         }
109         event.blockingAwait() // wait for thread to complete
110         thread.join() // it is safe to join thread now
111     }
112 
113     /**
114      * Test for [processNextEventInCurrentThread] API use-case with delay.
115      */
116     @Test
<lambda>null117     fun testProcessNextEventInCurrentThreadDelay() = runTest {
118         expect(1)
119         val event = EventSync()
120         // this coroutine fires event
121         launch {
122             expect(3)
123             delay(100)
124             event.fireEvent()
125         }
126         // main coroutine waits for event (same thread!)
127         expect(2)
128         event.blockingAwait()
129         finish(4)
130     }
131 
132     class EventSync {
133         private val waitingThread = atomic<Thread?>(null)
134         private val fired = atomic(false)
135 
fireEventnull136         fun fireEvent() {
137             fired.value = true
138             waitingThread.value?.let { LockSupport.unpark(it) }
139         }
140 
blockingAwaitnull141         fun blockingAwait() {
142             check(waitingThread.getAndSet(Thread.currentThread()) == null)
143             while (!fired.getAndSet(false)) {
144                 val time = processNextEventInCurrentThread()
145                 LockSupport.parkNanos(time)
146             }
147             waitingThread.value = null
148         }
149     }
150 }