#!/usr/bin/python # # Copyright (C) 2010 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License") # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Test server A detailed description of tms. """ __author__ = 'wink@google.com (Wink Saville)' import socket import struct import sys import time import ctrl_pb2 import ril_pb2 import msgheader_pb2 def recvall(s, count): """Receive all of the data otherwise return none. Args: s: socket count: number of bytes Returns: data received None if no data is received """ all_data = [] while (count > 0): data = s.recv(count) if (len(data) == 0): return None count -= len(data) all_data.append(data) result = ''.join(all_data); return result def sendall(s, data): """Send all of the data Args: s: socket count: number of bytes Returns: Nothing """ s.sendall(data) class MsgHeader: """A fixed length message header; cmd, token, status and length of protobuf.""" def __init__(self): self.cmd = 0 self.token = 0 self.status = 0 self.length_protobuf = 0 def sendHeader(self, s): """Send the header to the socket Args: s: socket Returns nothing """ mh = msgheader_pb2.MsgHeader() mh.cmd = self.cmd mh.token = self.token mh.status = self.status mh.length_data = self.length_protobuf mhser = mh.SerializeToString() len_msg_header_raw = struct.pack(' 0): self.header.length_protobuf = len(protobuf) else: self.header.length_protobuf = 0 self.protobuf = '' self.header.sendHeader(s) if (self.header.length_protobuf > 0): sendall(s, self.protobuf) def recvMsg(self, s): """Receive a message from a socket Args: s: socket Returns: nothing """ self.header.recvHeader(s) self.cmd = self.header.cmd self.token = self.header.token self.status = self.header.status if (self.header.length_protobuf > 0): self.protobuf = recvall(s, self.header.length_protobuf) else: self.protobuf = '' def main(argv): """Create a socket and connect. Before using you'll need to forward the port used by mock_ril, 54312 to a port on the PC. The following worked for me: adb forward tcp:11111 tcp:54312. Then you can execute this test using: tms.py 127.0.0.1 11111 """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host = sys.argv[1] # server address print "host=%s" % host port = int(sys.argv[2]) # server port print "port=%d" % port s.connect((host, port)) # Create an object which is serializable to a protobuf rrs = ctrl_pb2.CtrlRspRadioState() rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE print "rrs.state=%d" % (rrs.state) # Serialize rrs_ser = rrs.SerializeToString() print "len(rrs_ser)=%d" % (len(rrs_ser)) # Deserialize rrs_new = ctrl_pb2.CtrlRspRadioState() rrs_new.ParseFromString(rrs_ser) print "rrs_new.state=%d" % (rrs_new.state) # Do an echo test req = Msg() req.sendMsg(s, 0, 1234567890123, rrs_ser) resp = Msg() resp.recvMsg(s) response = ctrl_pb2.CtrlRspRadioState() response.ParseFromString(resp.protobuf) print "cmd=%d" % (resp.cmd) print "token=%d" % (resp.token) print "status=%d" % (resp.status) print "len(protobuf)=%d" % (len(resp.protobuf)) print "response.state=%d" % (response.state) if ((resp.cmd == 0) & (resp.token == 1234567890123) & (resp.status == 0) & (response.state == 1)): print "SUCCESS: echo ok" else: print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1" # Test CTRL_GET_RADIO_STATE req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4) resp = Msg() resp.recvMsg(s) print "cmd=%d" % (resp.cmd) print "token=%d" % (resp.token) print "status=%d" % (resp.status) print "length_protobuf=%d" % (len(resp.protobuf)) if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE): response = ctrl_pb2.CtrlRspRadioState() response.ParseFromString(resp.protobuf) print "SUCCESS: response.state=%d" % (response.state) else: print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE" # Close socket print "closing socket" s.close() if __name__ == '__main__': main(sys.argv)