# # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import copy import logging import random import sys from vts.utils.python.fuzzer import FuzzerUtils from vts.utils.python.mirror import mirror_object_for_types from vts.proto import ComponentSpecificationMessage_pb2 as CompSpecMsg from google.protobuf import text_format # a dict containing the IDs of the registered function pointers. _function_pointer_id_dict = {} INTERFACE = "interface" API = "api" class MirrorObjectError(Exception): """Raised when there is a general error in manipulating a mirror object.""" pass class MirrorObject(object): """The class that mirrors objects on the native side. This class exists on the host and can be used to communicate to a particular HAL in the HAL agent on the target side. Attributes: _client: the TCP client instance. _if_spec_msg: the interface specification message of a host object to mirror. _callback_server: the instance of a callback server. _interface_id: integer, used when this mirror object is for a nested HIDL HAL interface. _parent_path: the name of a sub struct this object mirrors. _last_raw_code_coverage_data: NativeCodeCoverageRawDataMessage, last seen raw code coverage data. __caller_uid: string, the caller's UID if not None. """ def __init__(self, client, msg, callback_server, hal_driver_id=None, parent_path=None): self._client = client self._if_spec_msg = msg self._callback_server = callback_server self._hal_driver_id = hal_driver_id self._parent_path = parent_path self._last_raw_code_coverage_data = None self.__caller_uid = None def GetFunctionPointerID(self, function_pointer): """Returns the function pointer ID for the given one.""" max_num = 0 for key in _function_pointer_id_dict: if _function_pointer_id_dict[key] == function_pointer: return str(key) if not max_num or key > max_num: max_num = key _function_pointer_id_dict[max_num + 1] = function_pointer id = str(max_num + 1) if self._callback_server: self._callback_server.RegisterCallback(id, function_pointer) return id def OpenConventionalHal(self, module_name=None): """Opens the target conventional HAL component. This is only needed for conventional HAL. Args: module_name: string, the name of a module to load. """ call_msg = CompSpecMsg.FunctionCallMessage() if self._hal_driver_id is not None: call_msg.hal_driver_id = self._hal_driver_id call_msg.component_class = CompSpecMsg.HAL_CONVENTIONAL call_msg.api.name = "#Open" if module_name: arg = call_msg.api.arg.add() arg.type = CompSpecMsg.TYPE_STRING arg.string_value.message = module_name call_msg.api.return_type.type == CompSpecMsg.TYPE_SCALAR call_msg.api.return_type.scalar_type = "int32_t" logging.debug("final msg %s", call_msg) result = self._client.CallApi( text_format.MessageToString(call_msg), self.__caller_uid) logging.debug(result) return result def SetCallerUid(self, uid): """Sets target-side caller's UID. Args: uid: string, the caller's UID. """ self.__caller_uid = uid def GetAttributeValue(self, attribute_name): """Retrieves the value of an attribute from a target. Args: attribute_name: string, the name of an attribute. Returns: FunctionSpecificationMessage which contains the value. """ def RemoteCallToGetAttribute(*args, **kwargs): """Makes a remote call and retrieves an attribute.""" func_msg = self.GetAttribute(attribute_name) if not func_msg: raise MirrorObjectError("attribute %s unknown", func_msg) logging.debug("remote call %s.%s", self._parent_path, attribute_name) logging.info("remote call %s%s", attribute_name, args) if self._parent_path: func_msg.parent_path = self._parent_path try: if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage): logging.info("component_class %s", self._if_spec_msg.component_class) if (self._if_spec_msg.component_class == CompSpecMsg.HAL_CONVENTIONAL_SUBMODULE): submodule_name = self._if_spec_msg.original_data_structure_name if submodule_name.endswith("*"): submodule_name = submodule_name[:-1] func_msg.submodule_name = submodule_name elif isinstance(self._if_spec_msg, CompSpecMsg.StructSpecificationMessage): pass else: logging.error("unknown type %s", type(self._if_spec_msg)) sys.exit(1) except AttributeError as e: logging.exception("%s" % e) pass result = self._client.GetAttribute( text_format.MessageToString(func_msg)) logging.debug(result) return result var_msg = self.GetAttribute(attribute_name) if var_msg: logging.debug("attribute: %s", var_msg) return RemoteCallToGetAttribute() raise MirrorObjectError("unknown attribute name %s" % attribute_name) def GetHidlCallbackInterface(self, interface_name, **kwargs): var_msg = CompSpecMsg.VariableSpecificationMessage() var_msg.name = interface_name var_msg.type = CompSpecMsg.TYPE_FUNCTION_POINTER var_msg.is_callback = True msg = self._if_spec_msg specification = self._client.ReadSpecification( interface_name, msg.component_class, msg.component_type, msg.component_type_version, msg.package) logging.info("specification: %s", specification) interface = getattr(specification, INTERFACE, None) apis = getattr(interface, API, []) for api in apis: function_pointer = None if api.name in kwargs: function_pointer = kwargs[api.name] else: def dummy(*args): """Dummy implementation for any callback function.""" logging.info("Entering dummy implementation" " for callback function: %s", api.name) for arg_index in range(len(args)): logging.info("arg%s: %s", arg_index, args[arg_index]) function_pointer = dummy func_pt_msg = var_msg.function_pointer.add() func_pt_msg.function_name = api.name func_pt_msg.id = self.GetFunctionPointerID(function_pointer) return var_msg def GetHidlTypeInterface(self, interface_name, target_class=None, target_type=None, version=None, package=None): """Gets HIDL type interface's host-side mirror. Args: interface_name: string, the name of a target interface to read. target_class: integer, optional used to override the loaded HAL's component_class. target_type: integer, optional used to override the loaded HAL's component_type. version: integer, optional used to override the loaded HAL's component_type_version. package: integer, optional used to override the loaded HAL's package. Returns: a host-side mirror of a HIDL interface """ msg = self._if_spec_msg result = self._client.ReadSpecification( interface_name, msg.component_class if target_class is None else target_class, msg.component_type if target_type is None else target_type, msg.component_type_version if version is None else version, msg.package if package is None else package, recursive=True) logging.info("result %s", result) return mirror_object_for_types.MirrorObjectForTypes(result) def GetHidlNestedInterface(self, interface_name, hal_driver_id, target_class=None, target_type=None, version=None, package=None): """Gets HIDL type interface's host-side mirror. Args: interface_name: string, the name of a target interface to read. interface_id: integer, the ID of a target interface to control. target_class: integer, optional used to override the loaded HAL's component_class. target_type: integer, optional used to override the loaded HAL's component_type. version: integer, optional used to override the loaded HAL's component_type_version. package: integer, optional used to override the loaded HAL's package. Returns: a host-side mirror of a HIDL interface """ msg = self._if_spec_msg found_api_spec = self._client.ReadSpecification( interface_name, msg.component_class if target_class is None else target_class, msg.component_type if target_type is None else target_type, msg.component_type_version if version is None else version, msg.package if package is None else package, recursive=True) found_api_spec = str(found_api_spec) logging.debug("found_api_spec %s", found_api_spec) if_spec_msg = CompSpecMsg.ComponentSpecificationMessage() text_format.Merge(found_api_spec, if_spec_msg) # Instantiate a MirrorObject and return it. hal_mirror = MirrorObject( self._client, if_spec_msg, None, hal_driver_id=hal_driver_id) return hal_mirror def CleanUp(self): self._client.Disconnect() def GetApi(self, api_name): """Returns the Function Specification Message. Args: api_name: string, the name of the target function API. Returns: FunctionSpecificationMessage or StructSpecificationMessage if found, None otherwise """ logging.debug("GetAPI %s for %s", api_name, self._if_spec_msg) # handle reserved methods first. if api_name == "notifySyspropsChanged": func_msg = CompSpecMsg.FunctionSpecificationMessage() func_msg.name = api_name return func_msg if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage): if len(self._if_spec_msg.interface.api) > 0: for api in self._if_spec_msg.interface.api: if api.name == api_name: return copy.copy(api) elif isinstance(self._if_spec_msg, CompSpecMsg.StructSpecificationMessage): if len(self._if_spec_msg.api) > 0: for api in self._if_spec_msg.api: logging.info("api %s", api) if api.name == api_name: return copy.copy(api) if len(self._if_spec_msg.sub_struct) > 0: for sub_struct in self._if_spec_msg.sub_struct: if len(sub_struct.api) > 0: for api in sub_struct.api: logging.info("api %s", api) if api.name == api_name: return copy.copy(api) else: logging.error("unknown spec type %s", type(self._if_spec_msg)) sys.exit(1) return None def GetAttribute(self, attribute_name): """Returns the Message. """ logging.debug("GetAttribute %s for %s", attribute_name, self._if_spec_msg) if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: if attribute.name == attribute_name: func_msg = CompSpecMsg.FunctionSpecificationMessage() func_msg.name = attribute.name func_msg.return_type.type = attribute.type if func_msg.return_type.type == CompSpecMsg.TYPE_SCALAR: func_msg.return_type.scalar_type = attribute.scalar_type else: func_msg.return_type.predefined_type = attribute.predefined_type logging.info("GetAttribute attribute: %s", attribute) logging.info("GetAttribute request: %s", func_msg) return copy.copy(func_msg) if (self._if_spec_msg.interface and self._if_spec_msg.interface.attribute): for attribute in self._if_spec_msg.interface.attribute: if attribute.name == attribute_name: func_msg = CompSpecMsg.FunctionSpecificationMessage() func_msg.name = attribute.name func_msg.return_type.type = attribute.type if func_msg.return_type.type == CompSpecMsg.TYPE_SCALAR: func_msg.return_type.scalar_type = attribute.scalar_type else: func_msg.return_type.predefined_type = attribute.predefined_type logging.info("GetAttribute attribute: %s", attribute) logging.info("GetAttribute request: %s", func_msg) return copy.copy(func_msg) return None def GetSubStruct(self, sub_struct_name): """Returns the Struct Specification Message. Args: sub_struct_name: string, the name of the target sub struct attribute. Returns: StructSpecificationMessage if found, None otherwise """ if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage): if (self._if_spec_msg.interface and self._if_spec_msg.interface.sub_struct): for sub_struct in self._if_spec_msg.interface.sub_struct: if sub_struct.name == sub_struct_name: return copy.copy(sub_struct) elif isinstance(self._if_spec_msg, CompSpecMsg.StructSpecificationMessage): if len(self._if_spec_msg.sub_struct) > 0: for sub_struct in self._if_spec_msg.sub_struct: if sub_struct.name == sub_struct_name: return copy.copy(sub_struct) return None def GetCustomAggregateType(self, type_name): """Returns the Argument Specification Message. Args: type_name: string, the name of the target data type. Returns: VariableSpecificationMessage if found, None otherwise """ try: if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: if not attribute.is_const and attribute.name == type_name: return copy.copy(attribute) if (self._if_spec_msg.interface and self._if_spec_msg.interface.attribute): for attribute in self._if_spec_msg.interface.attribute: if not attribute.is_const and attribute.name == type_name: return copy.copy(attribute) return None except AttributeError as e: # TODO: check in advance whether self._if_spec_msg Interface # SpecificationMessage. return None def GetConstType(self, type_name): """Returns the Argument Specification Message. Args: type_name: string, the name of the target const data variable. Returns: VariableSpecificationMessage if found, None otherwise """ try: if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: if attribute.is_const and attribute.name == type_name: return copy.copy(attribute) elif attribute.type == CompSpecMsg.TYPE_ENUM: for enumerator in attribute.enum_value.enumerator: if enumerator == type_name: return copy.copy(attribute) if self._if_spec_msg.interface and self._if_spec_msg.interface.attribute: for attribute in self._if_spec_msg.interface.attribute: if attribute.is_const and attribute.name == type_name: return copy.copy(attribute) elif attribute.type == CompSpecMsg.TYPE_ENUM: for enumerator in attribute.enum_value.enumerator: if enumerator == type_name: return copy.copy(attribute) return None except AttributeError as e: # TODO: check in advance whether self._if_spec_msg Interface # SpecificationMessage. return None def ArgToPb(self, arg_msg, value_msg): """Converts args to a ProtoBuf message. Args: arg_msg: the ProtoBuf message where the result will be stored in. value_msg: value given as an argument. it can be either Python data structure, a ProtoBuf message, or both. Raises: AttributeError: when unexpected type is seen. """ if isinstance(value_msg, CompSpecMsg.VariableSpecificationMessage): arg_msg.CopyFrom(value_msg) elif isinstance(value_msg, int): arg_msg.type = CompSpecMsg.TYPE_SCALAR if not arg_msg.scalar_type: arg_msg.scalar_type = "int32_t" setattr(arg_msg.scalar_value, arg_msg.scalar_type, value_msg) elif isinstance(value_msg, float): arg_msg.type = CompSpecMsg.TYPE_SCALAR arg_msg.scalar_value.float_t = value_msg arg_msg.scalar_type = "float_t" elif isinstance(value_msg, str): if ((arg_msg.type == CompSpecMsg.TYPE_SCALAR and (arg_msg.scalar_type == "char_pointer" or arg_msg.scalar_type == "uchar_pointer")) or arg_msg.type == CompSpecMsg.TYPE_STRING): arg_msg.string_value.message = value_msg arg_msg.string_value.length = len(value_msg) else: raise MirrorObjectError("unsupported type %s for str" % arg_msg) elif isinstance(value_msg, list): if (arg_msg.type == CompSpecMsg.TYPE_VECTOR or arg_msg.type == CompSpecMsg.TYPE_ARRAY): first = True for list_element in value_msg: if first: self.ArgToPb(arg_msg.vector_value[0], list_element) first = False else: self.ArgToPb(arg_msg.vector_value.add(), list_element) arg_msg.vector_size = len(value_msg) else: raise MirrorObjectError( "unsupported arg_msg type %s for list" % arg_msg.type) else: raise MirrorObjectError("unsupported value type %s" % type(value_msg)) # TODO: Guard against calls to this function after self.CleanUp is called. def __getattr__(self, api_name, *args, **kwargs): """Calls a target component's API. Args: api_name: string, the name of an API function to call. *args: a list of arguments **kwargs: a dict for the arg name and value pairs """ def RemoteCall(*args, **kwargs): """Dynamically calls a remote API and returns the result value.""" func_msg = self.GetApi(api_name) if not func_msg: raise MirrorObjectError("api %s unknown", func_msg) logging.debug("remote call %s.%s", self._parent_path, api_name) logging.info("remote call %s%s", api_name, args) if args: for arg_msg, value_msg in zip(func_msg.arg, args): logging.debug("arg msg %s", arg_msg) logging.debug("value %s", value_msg) if value_msg is not None: self.ArgToPb(arg_msg, value_msg) logging.debug("final msg %s", func_msg) else: # TODO: use kwargs for arg in func_msg.arg: # TODO: handle other if (arg.type == CompSpecMsg.TYPE_SCALAR and arg.scalar_type == "pointer"): arg.scalar_value.pointer = 0 logging.debug(func_msg) if self._parent_path: func_msg.parent_path = self._parent_path call_msg = CompSpecMsg.FunctionCallMessage() if isinstance(self._if_spec_msg, CompSpecMsg.ComponentSpecificationMessage): if self._if_spec_msg.component_class: logging.info("component_class %s", self._if_spec_msg.component_class) call_msg.component_class = self._if_spec_msg.component_class if self._if_spec_msg.component_class == CompSpecMsg.HAL_CONVENTIONAL_SUBMODULE: submodule_name = self._if_spec_msg.original_data_structure_name if submodule_name.endswith("*"): submodule_name = submodule_name[:-1] func_msg.submodule_name = submodule_name if self._hal_driver_id is not None: call_msg.hal_driver_id = self._hal_driver_id call_msg.api.CopyFrom(func_msg) result = self._client.CallApi( text_format.MessageToString(call_msg), self.__caller_uid) logging.debug(result) if (isinstance(result, tuple) and len(result) == 2 and isinstance(result[1], dict) and "coverage" in result[1]): self._last_raw_code_coverage_data = result[1]["coverage"] result = result[0] if (result and isinstance( result, CompSpecMsg.VariableSpecificationMessage) and result.type == CompSpecMsg.TYPE_HIDL_INTERFACE): if result.hidl_interface_id <= -1: return None hal_driver_id = result.hidl_interface_id nested_interface_name = result.predefined_type.split("::")[-1] logging.debug("Nested interface name: %s", nested_interface_name) nested_interface = self.GetHidlNestedInterface( nested_interface_name, hal_driver_id) return nested_interface return result def MessageGenerator(*args, **kwargs): """Dynamically generates a custom message instance.""" arg_msg = self.GetCustomAggregateType(api_name) if not arg_msg: raise MirrorObjectError("arg %s unknown" % arg_msg) logging.info("MessageGenerator %s %s", api_name, arg_msg) logging.debug("MESSAGE %s", api_name) if arg_msg.type == CompSpecMsg.TYPE_STRUCT: for struct_value in arg_msg.struct_value: logging.debug("matching struct %s %s", struct_value.name, struct_value.scalar_type) for given_name, given_value in kwargs.iteritems(): if given_name == struct_value.name: logging.debug("matched type=%s", struct_value.scalar_type) if struct_value.type == CompSpecMsg.TYPE_SCALAR: if struct_value.scalar_type == "uint32_t": struct_value.scalar_value.uint32_t = given_value elif struct_value.scalar_type == "int32_t": struct_value.scalar_value.int32_t = given_value else: raise MirrorObjectError( "support %s" % struct_value.scalar_type) break for struct_value, given_value in zip(arg_msg.struct_value, args): if struct_value.type == CompSpecMsg.TYPE_SCALAR: logging.debug("matched arg type=%s", struct_value.scalar_type) if struct_value.scalar_type == "uint32_t": struct_value.scalar_value.uint32_t = given_value elif struct_value.scalar_type == "int32_t": struct_value.scalar_value.int32_t = given_value else: raise MirrorObjectError("support %s" % p_type) elif arg_msg.type == CompSpecMsg.TYPE_FUNCTION_POINTER: for fp_value in arg_msg.function_pointer: logging.debug("matching function %s", fp_value.function_name) for given_name, given_value in kwargs.iteritems(): if given_name == fp_value.function_name: logging.debug("matched function %s %s", fp_value.function_name, given_name) fp_value.id = self.GetFunctionPointerID( given_value) break for fp_value, given_value in zip(arg_msg.function_pointer, args): logging.debug("for %s", fp_value.function_name) fp_value.id = self.GetFunctionPointerID(given_value) logging.debug("fp %s", fp_value) logging.debug("generated %s", arg_msg) return arg_msg def MessageFuzzer(arg_msg): """Fuzz a custom message instance.""" if not self.GetCustomAggregateType(api_name): raise MirrorObjectError("fuzz arg %s unknown" % arg_msg) if arg_msg.type == CompSpecMsg.TYPE_STRUCT: index = random.randint(0, len(arg_msg.struct_value)) count = 0 for struct_value in arg_msg.struct_value: if count == index: if struct_value.scalar_type == "uint32_t": struct_value.scalar_value.uint32_t ^= FuzzerUtils.mask_uint32_t( ) elif struct_value.scalar_type == "int32_t": mask = FuzzerUtils.mask_int32_t() if mask == (1 << 31): struct_value.scalar_value.int32_t *= -1 struct_value.scalar_value.int32_t += 1 else: struct_value.scalar_value.int32_t ^= mask else: raise MirrorObjectError("support %s" % struct_value.scalar_type) break count += 1 logging.debug("fuzzed %s", arg_msg) else: raise MirrorObjectError("unsupported fuzz message type %s." % arg_msg.type) return arg_msg def ConstGenerator(): """Dynamically generates a const variable's value.""" arg_msg = self.GetConstType(api_name) if not arg_msg: raise MirrorObjectError("const %s unknown" % arg_msg) logging.debug("check %s", api_name) if arg_msg.type == CompSpecMsg.TYPE_SCALAR: ret_v = getattr(arg_msg.scalar_value, arg_msg.scalar_type, None) if ret_v is None: raise MirrorObjectError("No value found for type %s in %s." % (arg_msg.scalar_type, api_name)) return ret_v elif arg_msg.type == CompSpecMsg.TYPE_STRING: return arg_msg.string_value.message elif arg_msg.type == CompSpecMsg.TYPE_ENUM: for enumerator, scalar_value in zip( arg_msg.enum_value.enumerator, arg_msg.enum_value.scalar_value): if enumerator == api_name: return getattr(scalar_value, arg_msg.enum_value.scalar_type) raise MirrorObjectError("const %s not found" % api_name) # handle APIs. func_msg = self.GetApi(api_name) if func_msg: logging.debug("api %s", func_msg) return RemoteCall struct_msg = self.GetSubStruct(api_name) if struct_msg: logging.debug("sub_struct %s", struct_msg) if self._parent_path: parent_name = "%s.%s" % (self._parent_path, api_name) else: parent_name = api_name return MirrorObject( self._client, struct_msg, self._callback_server, parent_path=parent_name) # handle attributes. fuzz = False if api_name.endswith("_fuzz"): fuzz = True api_name = api_name[:-5] arg_msg = self.GetCustomAggregateType(api_name) if arg_msg: logging.debug("arg %s", arg_msg) if not fuzz: return MessageGenerator else: return MessageFuzzer arg_msg = self.GetConstType(api_name) if arg_msg: logging.debug("const %s *\n%s", api_name, arg_msg) return ConstGenerator() raise MirrorObjectError("unknown api name %s" % api_name) def GetRawCodeCoverage(self): """Returns any measured raw code coverage data.""" return self._last_raw_code_coverage_data def __str__(self): """Prints all the attributes and methods.""" result = "" if self._if_spec_msg: if self._if_spec_msg.attribute: for attribute in self._if_spec_msg.attribute: result += "attribute %s\n" % attribute.name if self._if_spec_msg.api: for api in self._if_spec_msg.api: result += "api %s\n" % api.name return result