• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2022 - Google
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import json
18import logging
19
20from acts.controllers.amarisoft_lib import amarisoft_constants as const
21from acts.controllers.amarisoft_lib import amarisoft_client
22
23
24class MmeFunctions():
25  """Utilities for Amarisoft's MME Remote API.
26
27  Attributes:
28    remote: An amarisoft client.
29  """
30
31  def __init__(self, remote: amarisoft_client.AmariSoftClient):
32    self.remote = remote
33
34  def pws_write(self, local_id: str, n50: bool = False):
35    """Broadcasts emergency alert message.
36
37    Args:
38      local_id: ID of the message as defined by local identifier in MME
39        configuration file.
40      n50: If True, N50 interface is used, otherwise SBC interface is used. (see TS 23.041)
41    """
42    msg = {}
43    msg['message'] = 'pws_write'
44    msg['local_id'] = local_id
45    msg['nf'] = n50
46    dump_msg = json.dumps(msg)
47    logging.debug('pws_write dump msg = %s', dump_msg)
48    head, body = self.remote.send_message(const.PortNumber.URI_MME, dump_msg)
49    self.remote.verify_response('pws_write', head, body)
50
51  def pws_kill(self, local_id: str, n50: bool = False):
52    """Stops broadcasts emergency alert message.
53
54    Args:
55      local_id: ID of the message as defined by local identifier in MME
56        configuration file.
57      n50: If True, N50 interface is used, otherwise SBC interface is used. (see TS 23.041)
58    """
59    msg = {}
60    msg['message'] = 'pws_kill'
61    msg['local_id'] = local_id
62    msg['nf'] = n50
63    dump_msg = json.dumps(msg)
64    logging.debug('pws_kill dump msg = %s', dump_msg)
65    head, body = self.remote.send_message(const.PortNumber.URI_MME, dump_msg)
66    self.remote.verify_response('pws_kill', head, body)
67
68  def ue_del(self, imsi: str):
69    """Remove UE from the UE database and force disconnect if necessary.
70
71    Args:
72      imsi: IMSI of the UE to delete.
73    """
74    msg = {}
75    msg['message'] = 'ue_del'
76    msg['imsi'] = imsi
77    dump_msg = json.dumps(msg)
78    logging.debug('ue_del dump msg = %s', dump_msg)
79    head, body = self.remote.send_message(const.PortNumber.URI_MME, dump_msg)
80    self.remote.verify_response('ue_del', head, body)
81