1#!/usr/bin/python 2# 3# Copyright (C) 2010 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License") 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18"""Test server 19 20A detailed description of tms. 21""" 22 23__author__ = 'wink@google.com (Wink Saville)' 24 25import socket 26import struct 27import sys 28import time 29 30import ctrl_pb2 31import ril_pb2 32import msgheader_pb2 33 34def recvall(s, count): 35 """Receive all of the data otherwise return none. 36 37 Args: 38 s: socket 39 count: number of bytes 40 41 Returns: 42 data received 43 None if no data is received 44 """ 45 all_data = [] 46 while (count > 0): 47 data = s.recv(count) 48 if (len(data) == 0): 49 return None 50 count -= len(data) 51 all_data.append(data) 52 result = ''.join(all_data); 53 return result 54 55def sendall(s, data): 56 """Send all of the data 57 58 Args: 59 s: socket 60 count: number of bytes 61 62 Returns: 63 Nothing 64 """ 65 s.sendall(data) 66 67class MsgHeader: 68 """A fixed length message header; cmd, token, status and length of protobuf.""" 69 def __init__(self): 70 self.cmd = 0 71 self.token = 0 72 self.status = 0 73 self.length_protobuf = 0 74 75 def sendHeader(self, s): 76 """Send the header to the socket 77 78 Args: 79 s: socket 80 81 Returns 82 nothing 83 """ 84 mh = msgheader_pb2.MsgHeader() 85 mh.cmd = self.cmd 86 mh.token = self.token 87 mh.status = self.status 88 mh.length_data = self.length_protobuf 89 mhser = mh.SerializeToString() 90 len_msg_header_raw = struct.pack('<i', len(mhser)) 91 sendall(s, len_msg_header_raw) 92 sendall(s, mhser) 93 94 def recvHeader(self, s): 95 """Receive the header from the socket""" 96 len_msg_header_raw = recvall(s, 4) 97 len_msg_hdr = struct.unpack('<i', len_msg_header_raw) 98 mh = msgheader_pb2.MsgHeader() 99 mh_raw = recvall(s, len_msg_hdr[0]) 100 mh.ParseFromString(mh_raw) 101 self.cmd = mh.cmd 102 self.token = mh.token 103 self.status = mh.status 104 self.length_protobuf = mh.length_data; 105 106class Msg: 107 """A message consists of a fixed length MsgHeader followed by a protobuf. 108 109 This class sends and receives messages, when sending the status 110 will be zero and when receiving the protobuf field will be an 111 empty string if there was no protobuf. 112 """ 113 def __init__(self): 114 """Initialize the protobuf to None and header to an empty""" 115 self.protobuf = None 116 self.header = MsgHeader() 117 118 # Keep a local copy of header for convenience 119 self.cmd = 0 120 self.token = 0 121 self.status = 0 122 123 def sendMsg(self, s, cmd, token, protobuf=''): 124 """Send a message to a socket 125 126 Args: 127 s: socket 128 cmd: command to send 129 token: token to send, will be returned unmodified 130 protobuf: optional protobuf to send 131 132 Returns 133 nothing 134 """ 135 self.cmd = cmd 136 self.token = token 137 self.status = 0 138 self.protobuf = protobuf 139 140 self.header.cmd = self.cmd 141 self.header.token = self.token 142 self.header.status = self.status 143 if (len(protobuf) > 0): 144 self.header.length_protobuf = len(protobuf) 145 else: 146 self.header.length_protobuf = 0 147 self.protobuf = '' 148 self.header.sendHeader(s) 149 if (self.header.length_protobuf > 0): 150 sendall(s, self.protobuf) 151 152 def recvMsg(self, s): 153 """Receive a message from a socket 154 155 Args: 156 s: socket 157 158 Returns: 159 nothing 160 """ 161 self.header.recvHeader(s) 162 self.cmd = self.header.cmd 163 self.token = self.header.token 164 self.status = self.header.status 165 if (self.header.length_protobuf > 0): 166 self.protobuf = recvall(s, self.header.length_protobuf) 167 else: 168 self.protobuf = '' 169 170def main(argv): 171 """Create a socket and connect. 172 173 Before using you'll need to forward the port 174 used by mock_ril, 54312 to a port on the PC. 175 The following worked for me: 176 177 adb forward tcp:11111 tcp:54312. 178 179 Then you can execute this test using: 180 181 tms.py 127.0.0.1 11111 182 """ 183 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 184 host = sys.argv[1] # server address 185 print "host=%s" % host 186 port = int(sys.argv[2]) # server port 187 print "port=%d" % port 188 s.connect((host, port)) 189 190 # Create an object which is serializable to a protobuf 191 rrs = ctrl_pb2.CtrlRspRadioState() 192 rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE 193 print "rrs.state=%d" % (rrs.state) 194 195 # Serialize 196 rrs_ser = rrs.SerializeToString() 197 print "len(rrs_ser)=%d" % (len(rrs_ser)) 198 199 # Deserialize 200 rrs_new = ctrl_pb2.CtrlRspRadioState() 201 rrs_new.ParseFromString(rrs_ser) 202 print "rrs_new.state=%d" % (rrs_new.state) 203 204 205 # Do an echo test 206 req = Msg() 207 req.sendMsg(s, 0, 1234567890123, rrs_ser) 208 resp = Msg() 209 resp.recvMsg(s) 210 response = ctrl_pb2.CtrlRspRadioState() 211 response.ParseFromString(resp.protobuf) 212 213 print "cmd=%d" % (resp.cmd) 214 print "token=%d" % (resp.token) 215 print "status=%d" % (resp.status) 216 print "len(protobuf)=%d" % (len(resp.protobuf)) 217 print "response.state=%d" % (response.state) 218 if ((resp.cmd == 0) & (resp.token == 1234567890123) & 219 (resp.status == 0) & (response.state == 1)): 220 print "SUCCESS: echo ok" 221 else: 222 print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1" 223 224 # Test CTRL_GET_RADIO_STATE 225 req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4) 226 resp = Msg() 227 resp.recvMsg(s) 228 229 print "cmd=%d" % (resp.cmd) 230 print "token=%d" % (resp.token) 231 print "status=%d" % (resp.status) 232 print "length_protobuf=%d" % (len(resp.protobuf)) 233 234 if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE): 235 response = ctrl_pb2.CtrlRspRadioState() 236 response.ParseFromString(resp.protobuf) 237 print "SUCCESS: response.state=%d" % (response.state) 238 else: 239 print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE" 240 241 # Close socket 242 print "closing socket" 243 s.close() 244 245 246if __name__ == '__main__': 247 main(sys.argv) 248