• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: Python Debugger Domain Interfaces
18"""
19
20import sys
21import json
22from pathlib import Path
23
24sys.path.append(str(Path(__file__).parent.parent)) # add aw path to sys.path
25
26from all_utils import CommonUtils
27from implement_api.protocol_api import ProtocolImpl
28from cdp import debugger
29from customized_types import ThreadConnectionInfo, ProtocolType
30
31comm_with_debugger_server = CommonUtils.communicate_with_debugger_server
32
33
34class DebuggerImpl(ProtocolImpl):
35
36    def __init__(self, id_generator, websocket):
37        super().__init__(id_generator, websocket)
38        self.dispatch_table = {"enable": (self.enable, ProtocolType.send),
39                               "disable": (self.disable, ProtocolType.send),
40                               "resume": (self.resume, ProtocolType.send),
41                               "pause": (self.pause, ProtocolType.send),
42                               "removeBreakpointsByUrl": (self.remove_breakpoints_by_url, ProtocolType.send),
43                               "getPossibleAndSetBreakpointsByUrl": (self.get_possible_and_set_breakpoints_by_url,
44                                                                     ProtocolType.send),
45                               "setSymbolicBreakpoints": (self.set_symbolic_breakpoints, ProtocolType.send),
46                               "removeSymbolicBreakpoints": (self.remove_symbolic_breakpoints, ProtocolType.send),
47                               "stepOver": (self.step_over, ProtocolType.send),
48                               "stepInto": (self.step_into, ProtocolType.send),
49                               "stepOut": (self.step_out, ProtocolType.send),
50                               "setPauseOnExceptions": (self.set_pause_on_exceptions, ProtocolType.send),
51                               "evaluateOnCallFrame": (self.evaluate_on_call_frame, ProtocolType.send),
52                               "smartStepInto": (self.smart_step_into, ProtocolType.send),
53                               "setMixedDebugEnabled": (self.set_mixed_debug_enabled, ProtocolType.send),
54                               "replyNativeCalling": (self.reply_native_calling, ProtocolType.send),
55                               "dropFrame": (self.drop_frame, ProtocolType.send),
56                               "saveAllPossibleBreakpoints": (self.save_all_possible_breakpoints, ProtocolType.send),
57                               "scriptParsed": (self.script_parsed, ProtocolType.recv),
58                               "paused": (self.paused, ProtocolType.recv),
59                               "nativeCalling": (self.native_calling, ProtocolType.recv)}
60
61    async def connect_to_debugger_server(self, pid, is_main=True):
62        if is_main:
63            send_msg = {"type": "connected"}
64            await self.websocket.send_msg_to_connect_server(send_msg)
65            response = await self.websocket.recv_msg_of_connect_server()
66            response = json.loads(response)
67            CommonUtils.assert_equal(response['type'], 'addInstance')
68            CommonUtils.assert_equal(response['instanceId'], 0)
69            CommonUtils.assert_equal(response['tid'], pid)
70        else:
71            response = await self.websocket.recv_msg_of_connect_server()
72            response = json.loads(response)
73            CommonUtils.assert_equal(response['type'], 'addInstance')
74            assert response['instanceId'] != 0, f"Worker instanceId can not be 0"
75            assert response['tid'] != pid, f"Worker tid can not be {pid}"
76            assert 'workerThread_' in response['name'], f"'workerThread_' not in {response['name']}"
77        instance_id = await self.websocket.get_instance()
78        send_queue = self.websocket.to_send_msg_queues[instance_id]
79        recv_queue = self.websocket.received_msg_queues[instance_id]
80        connection = ThreadConnectionInfo(instance_id, send_queue, recv_queue)
81        return connection
82
83    async def destroy_instance(self):
84        response = await self.websocket.recv_msg_of_connect_server()
85        response = json.loads(response)
86        CommonUtils.assert_equal(response['type'], 'destroyInstance')
87        return response
88
89    async def enable(self, message_id, connection, params):
90        response = await comm_with_debugger_server(self.websocket, connection,
91                                                   debugger.enable(params), message_id)
92        while response.startswith('{"method":"Debugger.scriptParsed"'):
93            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
94                                                                        connection.received_msg_queue)
95        # 暂时删除对response的校验,以适配不同版本
96
97    async def disable(self, message_id, connection, params):
98        response = await comm_with_debugger_server(self.websocket, connection,
99                                                   debugger.disable(), message_id)
100        while response.startswith('{"method":"Debugger.resumed"') \
101                or response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') \
102                or response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'):
103            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
104                                                                        connection.received_msg_queue)
105        expected_response = {"id": message_id, "result": {}}
106        CommonUtils.assert_equal(json.loads(response), expected_response)
107
108    async def script_parsed(self, connection, params):
109        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
110                                                                    connection.received_msg_queue)
111        response = json.loads(response)
112        return response
113
114    async def paused(self, connection, params):
115        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
116                                                                    connection.received_msg_queue)
117        response = json.loads(response)
118        CommonUtils.assert_equal(response['method'], 'Debugger.paused')
119        return response
120
121    async def resume(self, message_id, connection, params):
122        response = await comm_with_debugger_server(self.websocket, connection,
123                                                   debugger.resume(), message_id)
124        expected_response = {"method": "Debugger.resumed", "params": {}}
125        CommonUtils.assert_equal(json.loads(response), expected_response)
126        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
127                                                                    connection.received_msg_queue)
128        expected_response = {"id": message_id, "result": {}}
129        CommonUtils.assert_equal(json.loads(response), expected_response)
130
131    async def remove_breakpoints_by_url(self, message_id, connection, params):
132        response = await comm_with_debugger_server(self.websocket, connection,
133                                                   debugger.remove_breakpoints_by_url(params), message_id)
134        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
135
136    async def get_possible_and_set_breakpoints_by_url(self, message_id, connection, params):
137        response = await comm_with_debugger_server(self.websocket, connection,
138                                                   debugger.get_possible_and_set_breakpoint_by_url(params),
139                                                   message_id)
140        response = json.loads(response)
141        CommonUtils.assert_equal(response['id'], message_id)
142        return response
143
144    async def set_symbolic_breakpoints(self, message_id, connection, params):
145        response = await comm_with_debugger_server(self.websocket, connection,
146                                                   debugger.set_symbolic_breakpoints(params),
147                                                   message_id)
148        response = json.loads(response)
149        CommonUtils.assert_equal(response['id'], message_id)
150        return response
151
152    async def remove_symbolic_breakpoints(self, message_id, connection, params):
153        response = await comm_with_debugger_server(self.websocket, connection,
154                                                   debugger.remove_symbolic_breakpoints(params),
155                                                   message_id)
156        response = json.loads(response)
157        CommonUtils.assert_equal(response['id'], message_id)
158        return response
159
160    async def step_over(self, message_id, connection, params):
161        response = await comm_with_debugger_server(self.websocket, connection,
162                                                   debugger.step_over(), message_id)
163        CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}})
164        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
165                                                                    connection.received_msg_queue)
166        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
167
168    async def step_out(self, message_id, connection, params):
169        response = await comm_with_debugger_server(self.websocket, connection,
170                                                   debugger.step_out(), message_id)
171        CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}})
172        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
173                                                                    connection.received_msg_queue)
174        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
175
176    async def step_into(self, message_id, connection, params):
177        response = await comm_with_debugger_server(self.websocket, connection,
178                                                   debugger.step_into(), message_id)
179        CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}})
180        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
181                                                                    connection.received_msg_queue)
182        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
183
184    async def set_pause_on_exceptions(self, message_id, connection, params):
185        response = await comm_with_debugger_server(self.websocket, connection,
186                                                   debugger.set_pause_on_exceptions(params), message_id)
187        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
188
189    async def evaluate_on_call_frame(self, message_id, connection, params):
190        response = await comm_with_debugger_server(self.websocket, connection,
191                                                   debugger.evaluate_on_call_frame(params), message_id)
192        response = json.loads(response)
193        CommonUtils.assert_equal(response['id'], message_id)
194        return response
195
196    async def pause(self, message_id, connection, params):
197        response = await comm_with_debugger_server(self.websocket, connection,
198                                                   debugger.pause(), message_id)
199        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
200
201    async def smart_step_into(self, message_id, connection, params):
202        response = await comm_with_debugger_server(self.websocket, connection,
203                                                   debugger.smart_step_into(params), message_id)
204        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
205
206    async def set_mixed_debug_enabled(self, message_id, connection, params):
207        response = await comm_with_debugger_server(self.websocket, connection,
208                                                   debugger.set_mixed_debug_enabled(params), message_id)
209        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
210
211    async def native_calling(self, connection, params):
212        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
213                                                                    connection.received_msg_queue)
214        response = json.loads(response)
215        CommonUtils.assert_equal(response['method'], 'Debugger.nativeCalling')
216        return response
217
218    async def reply_native_calling(self, message_id, connection, params):
219        response = await comm_with_debugger_server(self.websocket, connection,
220                                                   debugger.reply_native_calling(params), message_id)
221        CommonUtils.assert_equal(json.loads(response), {"method": "Debugger.resumed", "params": {}})
222        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
223                                                                    connection.received_msg_queue)
224        CommonUtils.assert_equal(json.loads(response), {"id": message_id, "result": {}})
225
226    async def drop_frame(self, message_id, connection, params):
227        response = await comm_with_debugger_server(self.websocket, connection,
228                                                   debugger.drop_frame(params), message_id)
229        response = json.loads(response)
230        CommonUtils.assert_equal(response['id'], message_id)
231        return response
232
233    async def save_all_possible_breakpoints(self, message_id, connection, params):
234        response = await comm_with_debugger_server(self.websocket, connection,
235                                                   debugger.save_all_possible_breakpoints(params), message_id)
236        response = json.loads(response)
237        CommonUtils.assert_equal(response['id'], message_id)
238        return response
239