#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2017 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ This module tests the Vehicle HAL using adb socket. Protocol Buffer: This module relies on VehicleHalProto_pb2.py being in sync with the protobuf in the VHAL. If the VehicleHalProto.proto file has changed, re-generate the python version using a command of the form: protoc -I= --python_out= /VehicleHalProto.proto For example: protoDir=~/android/master/hardware/interfaces/automotive/vehicle/2.0/default/impl/vhal_v2_0/proto outDir=~/android/master/packages/services/Car/tools/emulator protoc -I=$protoDir --python_out=$outDir $protoDir/VehicleHalProto.proto """ from __future__ import print_function # Suppress .pyc files import sys sys.dont_write_bytecode = True import VehicleHalProto_pb2 import vhal_consts_2_0 import vhal_emulator import logging class VhalTest: # Global vars _badProps = [0, 0x3FFFFFFF] # List of bad properties to try for negative tests _configs = 0 # List of configs from DUT _log = 0 # Logger module _vhal = 0 # Handle to VHAL object that communicates over socket to DUT # TODO: b/38203109 - Fix OBD2 values, implement handling for complex properties _skipProps = [ vhal_consts_2_0.VEHICLEPROPERTY_OBD2_LIVE_FRAME, vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME, vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME_INFO, vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME_CLEAR, vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, vhal_consts_2_0.VEHICLEPROPERTY_WHEEL_TICK, # Need to support complex properties 0x21E00666 # FakeDataControllingProperty - an internal test property ] def _getMidpoint(self, minVal, maxVal): retVal = minVal + (maxVal - minVal)/2 return retVal # Generates a test value based on the config def _generateTestValue(self, cfg, idx, origValue): valType = cfg.value_type if valType in self._types.TYPE_STRING: testValue = "test string" elif valType in self._types.TYPE_BYTES: # Generate array of integers counting from 0 testValue = list(range(len(origValue))) elif valType == vhal_consts_2_0.VEHICLEPROPERTYTYPE_BOOLEAN: testValue = origValue ^ 1 elif valType in self._types.TYPE_INT32: try: testValue = self._getMidpoint(cfg.area_configs[idx].min_int32_value, cfg.area_configs[idx].max_int32_value) except: # min/max values aren't set. Set a hard-coded value testValue = 123 elif valType in self._types.TYPE_INT64: try: testValue = self._getMidpoint(cfg.area_configs[idx].min_int64_value, cfg.area_configs[idx].max_int64_value) except: # min/max values aren't set. Set a large hard-coded value testValue = 1 << 50 elif valType in self._types.TYPE_FLOAT: try: testValue = self._getMidpoint(cfg.area_configs[idx].min_float_value, cfg.area_configs[idx].max_float_value) except: # min/max values aren't set. Set a hard-coded value testValue = 123.456 # Truncate float to 5 decimal places testValue = "%.5f" % testValue testValue = float(testValue) else: self._log.error("generateTestValue: valType=0x%X is not handled", valType) testValue = None return testValue # Helper function to extract values array from rxMsg def _getValueFromMsg(self, rxMsg): # Check to see only one property value is returned if len(rxMsg.value) != 1: self._log.error("getValueFromMsg: Received invalid value") value = None else: valType = rxMsg.value[0].value_type try: if valType in self._types.TYPE_STRING: value = rxMsg.value[0].string_value elif valType in self._types.TYPE_BYTES: value = rxMsg.value[0].bytes_value elif valType == vhal_consts_2_0.VEHICLEPROPERTYTYPE_BOOLEAN: value = rxMsg.value[0].int32_values[0] elif valType in self._types.TYPE_INT32: value = rxMsg.value[0].int32_values[0] elif valType in self._types.TYPE_INT64: value = rxMsg.value[0].int64_values[0] elif valType in self._types.TYPE_FLOAT: value = rxMsg.value[0].float_values[0] # Truncate float to 5 decimal places value = "%.5f" % value value = float(value) else: self._log.error("getValueFromMsg: valType=0x%X is not handled", valType) value = None except IndexError: self._log.error("getValueFromMsg: Received malformed message: %s", str(rxMsg)) value = None return value def _validateVmsMessage(self, rxMsg): return (len(rxMsg.value) == 1 and rxMsg.value[0].value_type in self._types.TYPE_MIXED and len(rxMsg.value[0].int32_values) > 0 and vhal_consts_2_0.VMSMESSAGETYPE_SUBSCRIBE <= rxMsg.value[0].int32_values[0] <= vhal_consts_2_0.VMSMESSAGETYPE_LAST_VMS_MESSAGE_TYPE) def _getVmsMessageTypeFromMsg(self, rxMsg): if self._validateVmsMessage(rxMsg): value = rxMsg.value[0].int32_values[ vhal_consts_2_0.VMSBASEMESSAGEINTEGERVALUESINDEX_MESSAGE_TYPE] else: self._log.error("getVmsMessageTypeFromMsg: Received invalid message") value = None return value # Helper function to receive a message and validate the type and status # retVal = 1 if no errors # retVal = 0 if errors detected def _rxMsgAndValidate(self, expectedType, expectedStatus): retVal = 1 rxMsg = self._vhal.rxMsg() if rxMsg.msg_type != expectedType: self._log.error("rxMsg Type expected: 0x%X, received: 0x%X", expectedType, rxMsg.msg_type) retVal = 0 if rxMsg.status != expectedStatus: self._log.error("rxMsg Status expected: 0x%X, received: 0x%X", expectedStatus, rxMsg.status) retVal = 0 return rxMsg, retVal # Calls getConfig() on each individual property ID and verifies it matches with the config # received in getConfigAll() def testGetConfig(self): self._log.info("Starting testGetConfig...") for cfg in self._configs: self._log.debug(" Getting config for propId=0x%X", cfg.prop) self._vhal.getConfig(cfg.prop) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_CONFIG_RESP, VehicleHalProto_pb2.RESULT_OK) if retVal: if rxMsg.config[0] != cfg: self._log.error("testGetConfig failed. prop=0x%X, expected:\n%s\nreceived:\n%s", cfg.prop, str(cfg), str(rxMsg.config)) self._log.info(" Finished testGetConfig!") # Calls getConfig() on invalid property ID and verifies it generates an error def testGetBadConfig(self): self._log.info("Starting testGetBadConfig...") for prop in self._badProps: self._log.debug(" Testing bad propId=0x%X", prop) self._vhal.getConfig(prop) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_CONFIG_RESP, VehicleHalProto_pb2.ERROR_INVALID_PROPERTY) if retVal: for cfg in rxMsg.config: self._log.error("testGetBadConfig prop=0x%X, expected:None, received:\n%s", cfg.prop, str(rxMsg.config)) self._log.info(" Finished testGetBadConfig!") def testGetPropertyAll(self): self._log.info("Starting testGetPropertyAll...") self._vhal.getPropertyAll() rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_ALL_RESP, VehicleHalProto_pb2.RESULT_OK) if retVal == 0: self._log.error("testGetPropertyAll: Failed to receive proper rxMsg") # TODO: Finish writing this test. What should we be testing, anyway? self._log.info(" Finished testGetPropertyAll!") def testGetSet(self): self._log.info("Starting testGetSet()...") for cfg in self._configs: if cfg.prop in self._skipProps: # Skip properties that cannot be handled properly by this test. self._log.warning(" Skipping propId=0x%X", cfg.prop) continue areas = cfg.supported_areas idx = -1 while (idx == -1) | (areas != 0): idx += 1 # Get the area to test area = areas & (areas -1) area ^= areas # Remove the area from areas areas ^= area self._log.debug(" Testing propId=0x%X, area=0x%X", cfg.prop, area) # Get the current value self._vhal.getProperty(cfg.prop, area) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) # Save the original value origValue = self._getValueFromMsg(rxMsg) if origValue == None: self._log.error("testGetSet: Could not get value for prop=0x%X, area=0x%X", cfg.prop, area) continue # Generate the test value testValue = self._generateTestValue(cfg, idx, origValue) if testValue == None: self._log.error("testGetSet: Cannot generate test value for prop=0x%X, area=0x%X", cfg.prop, area) continue # Send the new value self._vhal.setProperty(cfg.prop, area, testValue) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) # Get the new value and verify it self._vhal.getProperty(cfg.prop, area) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) newValue = self._getValueFromMsg(rxMsg) if newValue != testValue: self._log.error("testGetSet: set failed for propId=0x%X, area=0x%X", cfg.prop, area) print("testValue= ", testValue, "newValue= ", newValue) continue # Reset the value to what it was before self._vhal.setProperty(cfg.prop, area, origValue) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) self._log.info(" Finished testGetSet()!") def testGetBadProperty(self): self._log.info("Starting testGetBadProperty()...") for prop in self._badProps: self._log.debug(" Testing bad propId=0x%X", prop) self._vhal.getProperty(prop, 0) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, VehicleHalProto_pb2.ERROR_INVALID_PROPERTY) if retVal: for value in rxMsg.value: self._log.error("testGetBadProperty prop=0x%X, expected:None, received:\n%s", prop, str(rxMsg)) self._log.info(" Finished testGetBadProperty()!") def testSetBadProperty(self): self._log.info("Starting testSetBadProperty()...") area = 1 value = 100 for prop in self._badProps: self._log.debug(" Testing bad propId=0x%X", prop) area = area + 1 value = value + 1 try: self._vhal.setProperty(prop, area, value) self._log.error("testGetBadProperty failed. prop=0x%X, area=0x%X, value=%d", prop, area, value) except ValueError as e: # Received expected error pass self._log.info(" Finished testSetBadProperty()!") def testGetVmsAvailability(self): self._log.info("Starting testVms()...") # Request the availability from the VmsCore. value = {'int32_values' : [vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_REQUEST] } self._vhal.setProperty( vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, 0, value) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) # The Vms Core should immediately respond rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_ASYNC, VehicleHalProto_pb2.RESULT_OK) if self._getVmsMessageTypeFromMsg(rxMsg) != vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_RESPONSE: self._log.error("testVms: VmsCore did not respond with AvailabilityResponse: %s", str(rxMsg)) # Test that we can get the property on command self._vhal.getProperty( vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, 0) rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, VehicleHalProto_pb2.RESULT_OK) if self._getVmsMessageTypeFromMsg(rxMsg) != vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_RESPONSE: self._log.error("testVms: VmsCore did not respond with AvailabilityResponse: %s", str(rxMsg)) else: # Parse Availability Response layers = rxMsg.value[0].int32_values[ vhal_consts_2_0.VMSAVAILABILITYSTATEINTEGERVALUESINDEX_NUMBER_OF_ASSOCIATED_LAYERS] index = vhal_consts_2_0.VMSAVAILABILITYSTATEINTEGERVALUESINDEX_LAYERS_START numPublishersIndex = vhal_consts_2_0.VMSMESSAGEWITHLAYERINTEGERVALUESINDEX_LAYER_VERSION self._log.info("testVms: %d available layers", layers) for layer in xrange(layers): self._log.info("testVms: Available layer: %s", rxMsg.value[0].int32_values[index:index+numPublishersIndex]) index += numPublishersIndex + 1 + rxMsg.value[0].int32_values[index+numPublishersIndex] if len(rxMsg.value[0].int32_values) != index: self._log.error("testVms: Malformed AvailabilityResponse: index: %d %s", index, str(rxMsg)) def runTests(self): self.testGetConfig() self.testGetBadConfig() self.testGetPropertyAll() self.testGetSet() self.testGetBadProperty() self.testSetBadProperty() self.testGetVmsAvailability() # Add new tests here to be run # Valid logLevels: # CRITICAL 50 # ERRROR 40 # WARNING 30 # INFO 20 # DEBUG 10 # NOTSET 0 def __init__(self, types, logLevel=20): self._types = types # Configure the logger logging.basicConfig() self._log = logging.getLogger('vhal_emulator_test') self._log.setLevel(logLevel) # Start the VHAL Emulator self._vhal = vhal_emulator.Vhal(types) # Get the list of configs self._vhal.getConfigAll() self._configs = self._vhal.rxMsg().config if __name__ == '__main__': v = VhalTest(vhal_consts_2_0.vhal_types_2_0) v.runTests()