# Copyright (c) 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from autotest_lib.client.common_lib import error import os class FrameSender(object): """Context manager for sending management frames.""" _sender_count = 0 def __init__(self, router, frame_type, channel, ssid_prefix=None, num_bss=None, frame_count=None, delay=None, dest_addr=None, probe_resp_footer=None, instance=0): """ @param router: LinuxRouter object router to send frames from. @param frame_type: int management frame type. @param channel: int targeted channel. @param ssid_prefix: string SSID prefix for BSSes in the frames. @param num_bss: int number of BSSes configured for sending frames. @param frame_count: int number of frames to send, frame_count of 0 implies infinite number of frames. @param delay: int delay in between frames in milliseconds. @param dest_addr: MAC address of the destination address (DA). @param probe_resp_footer: footer bytes for probe responses. @param instance: int hostapd instance on router to send frames from. """ if router.board == "panther": raise error.TestNAError('Panther router does not support manual ' 'beacon frame generation') self._router = router self._channel = channel self._frame_type = frame_type self._ssid_prefix = ssid_prefix self._num_bss = num_bss self._frame_count = frame_count self._delay = delay self._dest_addr = dest_addr self._probe_resp_footer = probe_resp_footer self._ap_interface = router.hostapd_instances[instance].interface self._injection_interface = None self._pid = None self._index = FrameSender._sender_count FrameSender._sender_count += 1 def __enter__(self): self._injection_interface = self._router.get_configured_interface( 'monitor', same_phy_as=self._ap_interface) self._pid = self._router.send_management_frame( self._injection_interface, self._frame_type, self._channel, ssid_prefix=self._ssid_prefix, num_bss=self._num_bss, frame_count=self._frame_count, delay=self._delay, dest_addr=self._dest_addr, probe_resp_footer=self._probe_resp_footer) return self def __exit__(self, exception, value, traceback): if self._injection_interface: self._router.release_interface(self._injection_interface) if self._pid: # Kill process and wait for termination. self._router.host.run( 'kill {pid};' ' for i in $(seq 1 10); do' ' kill -0 {pid} || break; sleep 0.2;' ' done'.format(pid=self._pid), ignore_status=True) self._router.host.get_file( os.path.join( self._router.logdir, self._router.MGMT_FRAME_SENDER_LOG_FILE), 'debug/frame_sender_%d.log' % self._index)