""" Test lldb Python event APIs. """ import os, time import re import unittest2 import lldb, lldbutil from lldbtest import * class EventAPITestCase(TestBase): mydir = os.path.join("python_api", "event") @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") @python_api_test @dsym_test def test_listen_for_and_print_event_with_dsym(self): """Exercise SBEvent API.""" self.buildDsym() self.do_listen_for_and_print_event() @python_api_test @dwarf_test def test_listen_for_and_print_event_with_dwarf(self): """Exercise SBEvent API.""" self.buildDwarf() self.do_listen_for_and_print_event() @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") @python_api_test @dsym_test def test_wait_for_event_with_dsym(self): """Exercise SBListener.WaitForEvent() API.""" self.buildDsym() self.do_wait_for_event() @python_api_test @dwarf_test def test_wait_for_event_with_dwarf(self): """Exercise SBListener.WaitForEvent() API.""" self.buildDwarf() self.do_wait_for_event() @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin") @python_api_test @dsym_test def test_add_listener_to_broadcaster_with_dsym(self): """Exercise some SBBroadcaster APIs.""" self.buildDsym() self.do_add_listener_to_broadcaster() @python_api_test @dwarf_test def test_add_listener_to_broadcaster_with_dwarf(self): """Exercise some SBBroadcaster APIs.""" self.buildDwarf() self.do_add_listener_to_broadcaster() def setUp(self): # Call super's setUp(). TestBase.setUp(self) # Find the line number to of function 'c'. self.line = line_number('main.c', '// Find the line number of function "c" here.') def do_listen_for_and_print_event(self): """Create a listener and use SBEvent API to print the events received.""" exe = os.path.join(os.getcwd(), "a.out") # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName('c', 'a.out') # Now launch the process, and do not stop at the entry point. process = target.LaunchSimple(None, None, os.getcwd()) self.assertTrue(process.GetState() == lldb.eStateStopped, PROCESS_STOPPED) # Get a handle on the process's broadcaster. broadcaster = process.GetBroadcaster() # Create an empty event object. event = lldb.SBEvent() # Create a listener object and register with the broadcaster. listener = lldb.SBListener("my listener") rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) self.assertTrue(rc, "AddListener successfully retruns") traceOn = self.TraceOn() if traceOn: lldbutil.print_stacktraces(process) # Create MyListeningThread class to wait for any kind of event. import threading class MyListeningThread(threading.Thread): def run(self): count = 0 # Let's only try at most 4 times to retrieve any kind of event. # After that, the thread exits. while not count > 3: if traceOn: print "Try wait for event..." if listener.WaitForEventForBroadcasterWithType(5, broadcaster, lldb.SBProcess.eBroadcastBitStateChanged, event): if traceOn: desc = lldbutil.get_description(event) print "Event description:", desc print "Event data flavor:", event.GetDataFlavor() print "Process state:", lldbutil.state_type_to_str(process.GetState()) print else: if traceOn: print "timeout occurred waiting for event..." count = count + 1 return # Let's start the listening thread to retrieve the events. my_thread = MyListeningThread() my_thread.start() # Use Python API to continue the process. The listening thread should be # able to receive the state changed events. process.Continue() # Use Python API to kill the process. The listening thread should be # able to receive the state changed event, too. process.Kill() # Wait until the 'MyListeningThread' terminates. my_thread.join() def do_wait_for_event(self): """Get the listener associated with the debugger and exercise WaitForEvent API.""" exe = os.path.join(os.getcwd(), "a.out") # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName('c', 'a.out') #print "breakpoint:", breakpoint self.assertTrue(breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT) # Get the debugger listener. listener = self.dbg.GetListener() # Now launch the process, and do not stop at entry point. error = lldb.SBError() process = target.Launch (listener, None, None, None, None, None, None, 0, False, error) self.assertTrue(error.Success() and process, PROCESS_IS_VALID) # Get a handle on the process's broadcaster. broadcaster = process.GetBroadcaster() self.assertTrue(broadcaster, "Process with valid broadcaster") # Create an empty event object. event = lldb.SBEvent() self.assertFalse(event, "Event should not be valid initially") # Create MyListeningThread to wait for any kind of event. import threading class MyListeningThread(threading.Thread): def run(self): count = 0 # Let's only try at most 3 times to retrieve any kind of event. while not count > 3: if listener.WaitForEvent(5, event): #print "Got a valid event:", event #print "Event data flavor:", event.GetDataFlavor() #print "Event type:", lldbutil.state_type_to_str(event.GetType()) return count = count + 1 print "Timeout: listener.WaitForEvent" return # Use Python API to kill the process. The listening thread should be # able to receive a state changed event. process.Kill() # Let's start the listening thread to retrieve the event. my_thread = MyListeningThread() my_thread.start() # Wait until the 'MyListeningThread' terminates. my_thread.join() self.assertTrue(event, "My listening thread successfully received an event") def do_add_listener_to_broadcaster(self): """Get the broadcaster associated with the process and wait for broadcaster events.""" exe = os.path.join(os.getcwd(), "a.out") # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName('c', 'a.out') #print "breakpoint:", breakpoint self.assertTrue(breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT) # Now launch the process, and do not stop at the entry point. process = target.LaunchSimple(None, None, os.getcwd()) self.assertTrue(process.GetState() == lldb.eStateStopped, PROCESS_STOPPED) # Get a handle on the process's broadcaster. broadcaster = process.GetBroadcaster() self.assertTrue(broadcaster, "Process with valid broadcaster") # Create an empty event object. event = lldb.SBEvent() self.assertFalse(event, "Event should not be valid initially") # Create a listener object and register with the broadcaster. listener = lldb.SBListener("TestEvents.listener") rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) self.assertTrue(rc, "AddListener successfully retruns") # The finite state machine for our custom listening thread, with an # initail state of 0, which means a "running" event is expected. # It changes to 1 after "running" is received. # It cahnges to 2 after "stopped" is received. # 2 will be our final state and the test is complete. self.state = 0 # Create MyListeningThread to wait for state changed events. # By design, a "running" event is expected following by a "stopped" event. import threading class MyListeningThread(threading.Thread): def run(self): #print "Running MyListeningThread:", self # Regular expression pattern for the event description. pattern = re.compile("data = {.*, state = (.*)}$") # Let's only try at most 6 times to retrieve our events. count = 0 while True: if listener.WaitForEventForBroadcasterWithType(5, broadcaster, lldb.SBProcess.eBroadcastBitStateChanged, event): desc = lldbutil.get_description(event) #print "Event description:", desc match = pattern.search(desc) if not match: break; if self.context.state == 0 and match.group(1) == 'running': self.context.state = 1 continue elif self.context.state == 1 and match.group(1) == 'stopped': # Whoopee, both events have been received! self.context.state = 2 break else: break print "Timeout: listener.WaitForEvent" count = count + 1 if count > 6: break return # Use Python API to continue the process. The listening thread should be # able to receive the state changed events. process.Continue() # Start the listening thread to receive the "running" followed by the # "stopped" events. my_thread = MyListeningThread() # Supply the enclosing context so that our listening thread can access # the 'state' variable. my_thread.context = self my_thread.start() # Wait until the 'MyListeningThread' terminates. my_thread.join() # We are no longer interested in receiving state changed events. # Remove our custom listener before the inferior is killed. broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged) # The final judgement. :-) self.assertTrue(self.state == 2, "Both expected state changed events received") if __name__ == '__main__': import atexit lldb.SBDebugger.Initialize() atexit.register(lambda: lldb.SBDebugger.Terminate()) unittest2.main()