• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright (C) 2010 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License")
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Test server
19
20A detailed description of tms.
21"""
22
23__author__ = 'wink@google.com (Wink Saville)'
24
25import socket
26import struct
27import sys
28import time
29
30import ctrl_pb2
31import ril_pb2
32import msgheader_pb2
33
34def recvall(s, count):
35  """Receive all of the data otherwise return none.
36
37  Args:
38    s: socket
39    count: number of bytes
40
41  Returns:
42    data received
43    None if no data is received
44  """
45  all_data = []
46  while (count > 0):
47    data = s.recv(count)
48    if (len(data) == 0):
49      return None
50    count -= len(data)
51    all_data.append(data)
52  result = ''.join(all_data);
53  return result
54
55def sendall(s, data):
56  """Send all of the data
57
58  Args:
59    s: socket
60    count: number of bytes
61
62  Returns:
63    Nothing
64  """
65  s.sendall(data)
66
67class MsgHeader:
68  """A fixed length message header; cmd, token, status and length of protobuf."""
69  def __init__(self):
70    self.cmd = 0
71    self.token = 0
72    self.status = 0
73    self.length_protobuf = 0
74
75  def sendHeader(self, s):
76    """Send the header to the socket
77
78    Args:
79      s: socket
80
81    Returns
82      nothing
83    """
84    mh = msgheader_pb2.MsgHeader()
85    mh.cmd = self.cmd
86    mh.token = self.token
87    mh.status = self.status
88    mh.length_data = self.length_protobuf
89    mhser = mh.SerializeToString()
90    len_msg_header_raw = struct.pack('<i', len(mhser))
91    sendall(s, len_msg_header_raw)
92    sendall(s, mhser)
93
94  def recvHeader(self, s):
95    """Receive the header from the socket"""
96    len_msg_header_raw = recvall(s, 4)
97    len_msg_hdr = struct.unpack('<i', len_msg_header_raw)
98    mh = msgheader_pb2.MsgHeader()
99    mh_raw = recvall(s, len_msg_hdr[0])
100    mh.ParseFromString(mh_raw)
101    self.cmd = mh.cmd
102    self.token = mh.token
103    self.status = mh.status
104    self.length_protobuf = mh.length_data;
105
106class Msg:
107  """A message consists of a fixed length MsgHeader followed by a protobuf.
108
109  This class sends and receives messages, when sending the status
110  will be zero and when receiving the protobuf field will be an
111  empty string if there was no protobuf.
112  """
113  def __init__(self):
114    """Initialize the protobuf to None and header to an empty"""
115    self.protobuf = None
116    self.header = MsgHeader()
117
118    # Keep a local copy of header for convenience
119    self.cmd = 0
120    self.token = 0
121    self.status = 0
122
123  def sendMsg(self, s, cmd, token, protobuf=''):
124    """Send a message to a socket
125
126    Args:
127      s: socket
128      cmd: command to send
129      token: token to send, will be returned unmodified
130      protobuf: optional protobuf to send
131
132    Returns
133      nothing
134    """
135    self.cmd = cmd
136    self.token = token
137    self.status = 0
138    self.protobuf = protobuf
139
140    self.header.cmd = self.cmd
141    self.header.token = self.token
142    self.header.status = self.status
143    if (len(protobuf) > 0):
144      self.header.length_protobuf = len(protobuf)
145    else:
146      self.header.length_protobuf = 0
147      self.protobuf = ''
148    self.header.sendHeader(s)
149    if (self.header.length_protobuf > 0):
150      sendall(s, self.protobuf)
151
152  def recvMsg(self, s):
153    """Receive a message from a socket
154
155    Args:
156      s: socket
157
158    Returns:
159      nothing
160    """
161    self.header.recvHeader(s)
162    self.cmd = self.header.cmd
163    self.token = self.header.token
164    self.status = self.header.status
165    if (self.header.length_protobuf > 0):
166      self.protobuf = recvall(s, self.header.length_protobuf)
167    else:
168      self.protobuf = ''
169
170def main(argv):
171  """Create a socket and connect.
172
173  Before using you'll need to forward the port
174  used by mock_ril, 54312 to a port on the PC.
175  The following worked for me:
176
177    adb forward tcp:11111 tcp:54312.
178
179  Then you can execute this test using:
180
181    tms.py 127.0.0.1 11111
182  """
183  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
184  host = sys.argv[1]        # server address
185  print "host=%s" % host
186  port = int(sys.argv[2])   # server port
187  print "port=%d" % port
188  s.connect((host, port))
189
190  # Create an object which is serializable to a protobuf
191  rrs = ctrl_pb2.CtrlRspRadioState()
192  rrs.state = ril_pb2.RADIOSTATE_UNAVAILABLE
193  print "rrs.state=%d" % (rrs.state)
194
195  # Serialize
196  rrs_ser = rrs.SerializeToString()
197  print "len(rrs_ser)=%d" % (len(rrs_ser))
198
199  # Deserialize
200  rrs_new = ctrl_pb2.CtrlRspRadioState()
201  rrs_new.ParseFromString(rrs_ser)
202  print "rrs_new.state=%d" % (rrs_new.state)
203
204
205  # Do an echo test
206  req = Msg()
207  req.sendMsg(s, 0, 1234567890123, rrs_ser)
208  resp = Msg()
209  resp.recvMsg(s)
210  response = ctrl_pb2.CtrlRspRadioState()
211  response.ParseFromString(resp.protobuf)
212
213  print "cmd=%d" % (resp.cmd)
214  print "token=%d" % (resp.token)
215  print "status=%d" % (resp.status)
216  print "len(protobuf)=%d" % (len(resp.protobuf))
217  print "response.state=%d" % (response.state)
218  if ((resp.cmd == 0) & (resp.token == 1234567890123) &
219        (resp.status == 0) & (response.state == 1)):
220    print "SUCCESS: echo ok"
221  else:
222    print "ERROR: expecting cmd=0 token=1234567890123 status=0 state=1"
223
224  # Test CTRL_GET_RADIO_STATE
225  req.sendMsg(s, ctrl_pb2.CTRL_CMD_GET_RADIO_STATE, 4)
226  resp = Msg()
227  resp.recvMsg(s)
228
229  print "cmd=%d" % (resp.cmd)
230  print "token=%d" % (resp.token)
231  print "status=%d" % (resp.status)
232  print "length_protobuf=%d" % (len(resp.protobuf))
233
234  if (resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE):
235      response = ctrl_pb2.CtrlRspRadioState()
236      response.ParseFromString(resp.protobuf)
237      print "SUCCESS: response.state=%d" % (response.state)
238  else:
239      print "ERROR: expecting resp.cmd == ctrl_pb2.CTRL_CMD_GET_RADIO_STATE"
240
241  # Close socket
242  print "closing socket"
243  s.close()
244
245
246if __name__ == '__main__':
247  main(sys.argv)
248