1""" 2Test lldb Python event APIs. 3""" 4 5import os, time 6import re 7import unittest2 8import lldb, lldbutil 9from lldbtest import * 10 11class EventAPITestCase(TestBase): 12 13 mydir = os.path.join("python_api", "event") 14 15 @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") 16 @python_api_test 17 @dsym_test 18 def test_listen_for_and_print_event_with_dsym(self): 19 """Exercise SBEvent API.""" 20 self.buildDsym() 21 self.do_listen_for_and_print_event() 22 23 @python_api_test 24 @dwarf_test 25 def test_listen_for_and_print_event_with_dwarf(self): 26 """Exercise SBEvent API.""" 27 self.buildDwarf() 28 self.do_listen_for_and_print_event() 29 30 @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") 31 @python_api_test 32 @dsym_test 33 def test_wait_for_event_with_dsym(self): 34 """Exercise SBListener.WaitForEvent() API.""" 35 self.buildDsym() 36 self.do_wait_for_event() 37 38 @python_api_test 39 @dwarf_test 40 def test_wait_for_event_with_dwarf(self): 41 """Exercise SBListener.WaitForEvent() API.""" 42 self.buildDwarf() 43 self.do_wait_for_event() 44 45 @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") 46 @python_api_test 47 @dsym_test 48 def test_add_listener_to_broadcaster_with_dsym(self): 49 """Exercise some SBBroadcaster APIs.""" 50 self.buildDsym() 51 self.do_add_listener_to_broadcaster() 52 53 @python_api_test 54 @dwarf_test 55 def test_add_listener_to_broadcaster_with_dwarf(self): 56 """Exercise some SBBroadcaster APIs.""" 57 self.buildDwarf() 58 self.do_add_listener_to_broadcaster() 59 60 def setUp(self): 61 # Call super's setUp(). 62 TestBase.setUp(self) 63 # Find the line number to of function 'c'. 64 self.line = line_number('main.c', '// Find the line number of function "c" here.') 65 66 def do_listen_for_and_print_event(self): 67 """Create a listener and use SBEvent API to print the events received.""" 68 exe = os.path.join(os.getcwd(), "a.out") 69 70 # Create a target by the debugger. 71 target = self.dbg.CreateTarget(exe) 72 self.assertTrue(target, VALID_TARGET) 73 74 # Now create a breakpoint on main.c by name 'c'. 75 breakpoint = target.BreakpointCreateByName('c', 'a.out') 76 77 # Now launch the process, and do not stop at the entry point. 78 process = target.LaunchSimple(None, None, os.getcwd()) 79 self.assertTrue(process.GetState() == lldb.eStateStopped, 80 PROCESS_STOPPED) 81 82 # Get a handle on the process's broadcaster. 83 broadcaster = process.GetBroadcaster() 84 85 # Create an empty event object. 86 event = lldb.SBEvent() 87 88 # Create a listener object and register with the broadcaster. 89 listener = lldb.SBListener("my listener") 90 rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) 91 self.assertTrue(rc, "AddListener successfully retruns") 92 93 traceOn = self.TraceOn() 94 if traceOn: 95 lldbutil.print_stacktraces(process) 96 97 # Create MyListeningThread class to wait for any kind of event. 98 import threading 99 class MyListeningThread(threading.Thread): 100 def run(self): 101 count = 0 102 # Let's only try at most 4 times to retrieve any kind of event. 103 # After that, the thread exits. 104 while not count > 3: 105 if traceOn: 106 print "Try wait for event..." 107 if listener.WaitForEventForBroadcasterWithType(5, 108 broadcaster, 109 lldb.SBProcess.eBroadcastBitStateChanged, 110 event): 111 if traceOn: 112 desc = lldbutil.get_description(event) 113 print "Event description:", desc 114 print "Event data flavor:", event.GetDataFlavor() 115 print "Process state:", lldbutil.state_type_to_str(process.GetState()) 116 print 117 else: 118 if traceOn: 119 print "timeout occurred waiting for event..." 120 count = count + 1 121 return 122 123 # Let's start the listening thread to retrieve the events. 124 my_thread = MyListeningThread() 125 my_thread.start() 126 127 # Use Python API to continue the process. The listening thread should be 128 # able to receive the state changed events. 129 process.Continue() 130 131 # Use Python API to kill the process. The listening thread should be 132 # able to receive the state changed event, too. 133 process.Kill() 134 135 # Wait until the 'MyListeningThread' terminates. 136 my_thread.join() 137 138 def do_wait_for_event(self): 139 """Get the listener associated with the debugger and exercise WaitForEvent API.""" 140 exe = os.path.join(os.getcwd(), "a.out") 141 142 # Create a target by the debugger. 143 target = self.dbg.CreateTarget(exe) 144 self.assertTrue(target, VALID_TARGET) 145 146 # Now create a breakpoint on main.c by name 'c'. 147 breakpoint = target.BreakpointCreateByName('c', 'a.out') 148 #print "breakpoint:", breakpoint 149 self.assertTrue(breakpoint and 150 breakpoint.GetNumLocations() == 1, 151 VALID_BREAKPOINT) 152 153 # Get the debugger listener. 154 listener = self.dbg.GetListener() 155 156 # Now launch the process, and do not stop at entry point. 157 error = lldb.SBError() 158 process = target.Launch (listener, None, None, None, None, None, None, 0, False, error) 159 self.assertTrue(error.Success() and process, PROCESS_IS_VALID) 160 161 # Get a handle on the process's broadcaster. 162 broadcaster = process.GetBroadcaster() 163 self.assertTrue(broadcaster, "Process with valid broadcaster") 164 165 # Create an empty event object. 166 event = lldb.SBEvent() 167 self.assertFalse(event, "Event should not be valid initially") 168 169 # Create MyListeningThread to wait for any kind of event. 170 import threading 171 class MyListeningThread(threading.Thread): 172 def run(self): 173 count = 0 174 # Let's only try at most 3 times to retrieve any kind of event. 175 while not count > 3: 176 if listener.WaitForEvent(5, event): 177 #print "Got a valid event:", event 178 #print "Event data flavor:", event.GetDataFlavor() 179 #print "Event type:", lldbutil.state_type_to_str(event.GetType()) 180 return 181 count = count + 1 182 print "Timeout: listener.WaitForEvent" 183 184 return 185 186 # Use Python API to kill the process. The listening thread should be 187 # able to receive a state changed event. 188 process.Kill() 189 190 # Let's start the listening thread to retrieve the event. 191 my_thread = MyListeningThread() 192 my_thread.start() 193 194 # Wait until the 'MyListeningThread' terminates. 195 my_thread.join() 196 197 self.assertTrue(event, 198 "My listening thread successfully received an event") 199 200 def do_add_listener_to_broadcaster(self): 201 """Get the broadcaster associated with the process and wait for broadcaster events.""" 202 exe = os.path.join(os.getcwd(), "a.out") 203 204 # Create a target by the debugger. 205 target = self.dbg.CreateTarget(exe) 206 self.assertTrue(target, VALID_TARGET) 207 208 # Now create a breakpoint on main.c by name 'c'. 209 breakpoint = target.BreakpointCreateByName('c', 'a.out') 210 #print "breakpoint:", breakpoint 211 self.assertTrue(breakpoint and 212 breakpoint.GetNumLocations() == 1, 213 VALID_BREAKPOINT) 214 215 # Now launch the process, and do not stop at the entry point. 216 process = target.LaunchSimple(None, None, os.getcwd()) 217 self.assertTrue(process.GetState() == lldb.eStateStopped, 218 PROCESS_STOPPED) 219 220 # Get a handle on the process's broadcaster. 221 broadcaster = process.GetBroadcaster() 222 self.assertTrue(broadcaster, "Process with valid broadcaster") 223 224 # Create an empty event object. 225 event = lldb.SBEvent() 226 self.assertFalse(event, "Event should not be valid initially") 227 228 # Create a listener object and register with the broadcaster. 229 listener = lldb.SBListener("TestEvents.listener") 230 rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) 231 self.assertTrue(rc, "AddListener successfully retruns") 232 233 # The finite state machine for our custom listening thread, with an 234 # initail state of 0, which means a "running" event is expected. 235 # It changes to 1 after "running" is received. 236 # It cahnges to 2 after "stopped" is received. 237 # 2 will be our final state and the test is complete. 238 self.state = 0 239 240 # Create MyListeningThread to wait for state changed events. 241 # By design, a "running" event is expected following by a "stopped" event. 242 import threading 243 class MyListeningThread(threading.Thread): 244 def run(self): 245 #print "Running MyListeningThread:", self 246 247 # Regular expression pattern for the event description. 248 pattern = re.compile("data = {.*, state = (.*)}$") 249 250 # Let's only try at most 6 times to retrieve our events. 251 count = 0 252 while True: 253 if listener.WaitForEventForBroadcasterWithType(5, 254 broadcaster, 255 lldb.SBProcess.eBroadcastBitStateChanged, 256 event): 257 desc = lldbutil.get_description(event) 258 #print "Event description:", desc 259 match = pattern.search(desc) 260 if not match: 261 break; 262 if self.context.state == 0 and match.group(1) == 'running': 263 self.context.state = 1 264 continue 265 elif self.context.state == 1 and match.group(1) == 'stopped': 266 # Whoopee, both events have been received! 267 self.context.state = 2 268 break 269 else: 270 break 271 print "Timeout: listener.WaitForEvent" 272 count = count + 1 273 if count > 6: 274 break 275 276 return 277 278 # Use Python API to continue the process. The listening thread should be 279 # able to receive the state changed events. 280 process.Continue() 281 282 # Start the listening thread to receive the "running" followed by the 283 # "stopped" events. 284 my_thread = MyListeningThread() 285 # Supply the enclosing context so that our listening thread can access 286 # the 'state' variable. 287 my_thread.context = self 288 my_thread.start() 289 290 # Wait until the 'MyListeningThread' terminates. 291 my_thread.join() 292 293 # We are no longer interested in receiving state changed events. 294 # Remove our custom listener before the inferior is killed. 295 broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) 296 297 # The final judgement. :-) 298 self.assertTrue(self.state == 2, 299 "Both expected state changed events received") 300 301 302if __name__ == '__main__': 303 import atexit 304 lldb.SBDebugger.Initialize() 305 atexit.register(lambda: lldb.SBDebugger.Terminate()) 306 unittest2.main() 307