#!/usr/bin/python # # Copyright (C) 2010 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License") # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Test server A detailed description of tms. """ __author__ = 'wink@google.com (Wink Saville)' import socket import struct import sys import time import ctrl_pb2 import ril_pb2 import msgheader_pb2 def recvall(s, count): """Receive all of the data otherwise return none. Args: s: socket count: number of bytes Returns: data received None if no data is received """ all_data = [] while (count > 0): data = s.recv(count) if (len(data) == 0): return None count -= len(data) all_data.append(data) result = ''.join(all_data); return result def sendall(s, data): """Send all of the data Args: s: socket count: number of bytes Returns: Nothing """ s.sendall(data) class MsgHeader: """A fixed length message header; cmd, token, status and length of protobuf.""" def __init__(self): self.cmd = 0 self.token = 0 self.status = 0 self.length_protobuf = 0 def sendHeader(self, s): """Send the header to the socket Args: s: socket Returns nothing """ mh = msgheader_pb2.MsgHeader() mh.cmd = self.cmd mh.token = self.token mh.status = self.status mh.length_data = self.length_protobuf mhser = mh.SerializeToString() len_msg_header_raw = struct.pack('<i', len(mhser)) sendall(s, len_msg_header_raw) sendall(s, mhser) def recvHeader(self, s): """Receive the header from the socket""" len_msg_header_raw = recvall(s, 4) len_msg_hdr = struct.unpack('<i', len_msg_header_raw) mh = msgheader_pb2.MsgHeader() mh_raw = recvall(s, len_msg_hdr[0]) mh.ParseFromString(mh_raw) self.cmd = mh.cmd self.token = mh.token self.status = mh.status self.length_protobuf = mh.length_data; class Msg: """A message consists of a fixed length MsgHeader followed by a protobuf. This class sends and receives messages, when sending the status will be zero and when receiving the protobuf field will be an empty string if there was no protobuf. """ def __init__(self): """Initialize the protobuf to None and header to an empty""" self.protobuf = None self.header = MsgHeader() # Keep a local copy of header for convenience self.cmd = 0 self.token = 0 self.status = 0 def sendMsg(self, s, cmd, token, protobuf=''): """Send a message to a socket Args: s: socket cmd: command to send token: token to send, will be returned unmodified protobuf: optional protobuf to send Returns nothing """ self.cmd = cmd self.token = token self.status = 0 self.protobuf = protobuf self.header.cmd = self.cmd self.header.token = self.token self.header.status = self.status if (len(protobuf) > 0): self.header.length_protobuf = len(protobuf) else: self.header.length_protobuf = 0 self.protobuf = '' self.header.sendHeader(s) if (self.header.length_protobuf > 0): sendall(s, self.protobuf) def recvMsg(self, s): """Receive a message from a socket Args: s: socket Returns: nothing """ self.header.recvHeader(s) self.cmd = self.header.cmd self.token = self.header.token self.status = self.header.status if (self.header.length_protobuf > 0): self.protobuf = recvall(s, self.header.length_protobuf) else: self.protobuf = '' def main(argv): """Create a socket and connect. Before using you'll need to forward the port used by mock_ril, 54312 to a port on the PC. The following worked for me: adb forward tcp:11111 tcp:54312. Then you can execute this test using: tms.py 127.0.0.1 11111 """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host = sys.argv[1] # server address print "host=%s" % host port = int(sys.argv[2]) # server port print "port=%d" % port s.connect((host, port)) # Create an object which is serializable to a protobuf rrs = ctrl_pb2.CtrlRspRadioState() rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE print "rrs.state=%d" % (rrs.state) # Serialize rrs_ser = rrs.SerializeToString() print "len(rrs_ser)=%d" % (len(rrs_ser)) # Deserialize rrs_new = ctrl_pb2.CtrlRspRadioState() rrs_new.ParseFromString(rrs_ser) print "rrs_new.state=%d" % (rrs_new.state) # Do an echo test req = Msg() req.sendMsg(s, 0, 1234567890123, rrs_ser) resp = Msg() resp.recvMsg(s) response = ctrl_pb2.CtrlRspRadioState() response.ParseFromString(resp.protobuf) print "cmd=%d" % (resp.cmd) print "token=%d" % (resp.token) print "status=%d" % (resp.status) print "len(protobuf)=%d" % (len(resp.protobuf)) print "response.state=%d" % (response.state) if ((resp.cmd == 0) & (resp.token == 1234567890123) & (resp.status == 0) & (response.state == 1)): print "SUCCESS: echo ok" else: print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1" # Test CTRL_GET_RADIO_STATE req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4) resp = Msg() resp.recvMsg(s) print "cmd=%d" % (resp.cmd) print "token=%d" % (resp.token) print "status=%d" % (resp.status) print "length_protobuf=%d" % (len(resp.protobuf)) if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE): response = ctrl_pb2.CtrlRspRadioState() response.ParseFromString(resp.protobuf) print "SUCCESS: response.state=%d" % (response.state) else: print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE" # Close socket print "closing socket" s.close() if __name__ == '__main__': main(sys.argv)