1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Copyright (c) 2024 Huawei Device Co., Ltd. 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16 17Description: Python HeapProfiler Domain Interfaces 18""" 19 20import json 21 22from aw import communicate_with_debugger_server 23from aw.cdp import heap_profiler 24from aw.types import ProtocolType 25from aw.api.protocol_api import ProtocolImpl 26 27 28class HeapProfilerImpl(ProtocolImpl): 29 30 def __init__(self, id_generator, websocket): 31 super().__init__(id_generator, websocket) 32 self.dispatch_table = {"startTrackingHeapObjects": (self.start_tracking_heap_objects, ProtocolType.send), 33 "stopTrackingHeapObjects": (self.stop_tracking_heap_objects, ProtocolType.send), 34 "takeHeapSnapshot": (self.take_heap_snapshot, ProtocolType.send), 35 "startSampling": (self.start_sampling, ProtocolType.send), 36 "stopSampling": (self.stop_sampling, ProtocolType.send)} 37 38 async def start_tracking_heap_objects(self, message_id, connection, params): 39 response = await communicate_with_debugger_server(connection.instance_id, 40 connection.send_msg_queue, 41 connection.received_msg_queue, 42 heap_profiler.start_tracking_heap_objects(params), message_id) 43 response = json.loads(response) 44 assert response == {"id": message_id, "result": {}} 45 return response 46 47 async def stop_tracking_heap_objects(self, message_id, connection, params): 48 response = await communicate_with_debugger_server(connection.instance_id, 49 connection.send_msg_queue, 50 connection.received_msg_queue, 51 heap_profiler.stop_tracking_heap_objects(), message_id) 52 while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): 53 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 54 connection.received_msg_queue) 55 assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response 56 pre_response = response 57 while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"') or \ 58 response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): 59 if response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): 60 pre_response = response 61 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 62 connection.received_msg_queue) 63 assert pre_response.endswith(r'\n]\n}\n"}}') 64 response = json.loads(response) 65 assert response == {"id": message_id, "result": {}} 66 return response 67 68 async def take_heap_snapshot(self, message_id, connection, params): 69 response = await communicate_with_debugger_server(connection.instance_id, 70 connection.send_msg_queue, 71 connection.received_msg_queue, 72 heap_profiler.take_heap_snapshot(), message_id) 73 assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response 74 pre_response = response 75 while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): 76 pre_response = response 77 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 78 connection.received_msg_queue) 79 assert pre_response.endswith(r'\n]\n}\n"}}') 80 response = json.loads(response) 81 assert response == {"id": message_id, "result": {}} 82 return response 83 84 async def start_sampling(self, message_id, connection, params): 85 response = await communicate_with_debugger_server(connection.instance_id, 86 connection.send_msg_queue, 87 connection.received_msg_queue, 88 heap_profiler.start_sampling(), message_id) 89 response = json.loads(response) 90 assert response == {"id": message_id, "result": {}} 91 return response 92 93 async def stop_sampling(self, message_id, connection, params): 94 response = await communicate_with_debugger_server(connection.instance_id, 95 connection.send_msg_queue, 96 connection.received_msg_queue, 97 heap_profiler.stop_sampling(), message_id) 98 response = json.loads(response) 99 assert response['id'] == message_id 100 return response