1#!/usr/bin/env python3.4 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import re 18 19RTT_REGEX = re.compile(r'^\[(?P<timestamp>\S+)\] .*? time=(?P<rtt>\S+)') 20LOSS_REGEX = re.compile(r'(?P<loss>\S+)% packet loss') 21 22 23class PingResult(object): 24 """An object that contains the results of running ping command. 25 26 Attributes: 27 connected: True if a connection was made. False otherwise. 28 packet_loss_percentage: The total percentage of packets lost. 29 transmission_times: The list of PingTransmissionTimes containing the 30 timestamps gathered for transmitted packets. 31 rtts: An list-like object enumerating all round-trip-times of 32 transmitted packets. 33 timestamps: A list-like object enumerating the beginning timestamps of 34 each packet transmission. 35 ping_interarrivals: A list-like object enumerating the amount of time 36 between the beginning of each subsequent transmission. 37 """ 38 def __init__(self, ping_output): 39 self.packet_loss_percentage = 100 40 self.transmission_times = [] 41 42 self.rtts = _ListWrap(self.transmission_times, lambda entry: entry.rtt) 43 self.timestamps = _ListWrap(self.transmission_times, 44 lambda entry: entry.timestamp) 45 self.ping_interarrivals = _PingInterarrivals(self.transmission_times) 46 47 self.start_time = 0 48 for line in ping_output: 49 if 'loss' in line: 50 match = re.search(LOSS_REGEX, line) 51 self.packet_loss_percentage = float(match.group('loss')) 52 if 'time=' in line: 53 match = re.search(RTT_REGEX, line) 54 if self.start_time == 0: 55 self.start_time = float(match.group('timestamp')) 56 self.transmission_times.append( 57 PingTransmissionTimes( 58 float(match.group('timestamp')) - self.start_time, 59 float(match.group('rtt')))) 60 self.connected = len( 61 ping_output) > 1 and self.packet_loss_percentage < 100 62 63 def __getitem__(self, item): 64 if item == 'rtt': 65 return self.rtts 66 if item == 'connected': 67 return self.connected 68 if item == 'packet_loss_percentage': 69 return self.packet_loss_percentage 70 raise ValueError('Invalid key. Please use an attribute instead.') 71 72 def as_dict(self): 73 return { 74 'connected': 1 if self.connected else 0, 75 'rtt': list(self.rtts), 76 'time_stamp': list(self.timestamps), 77 'ping_interarrivals': list(self.ping_interarrivals), 78 'packet_loss_percentage': self.packet_loss_percentage 79 } 80 81 82class PingTransmissionTimes(object): 83 """A class that holds the timestamps for a packet sent via the ping command. 84 85 Attributes: 86 rtt: The round trip time for the packet sent. 87 timestamp: The timestamp the packet started its trip. 88 """ 89 def __init__(self, timestamp, rtt): 90 self.rtt = rtt 91 self.timestamp = timestamp 92 93 94class _ListWrap(object): 95 """A convenient helper class for treating list iterators as native lists.""" 96 def __init__(self, wrapped_list, func): 97 self.__wrapped_list = wrapped_list 98 self.__func = func 99 100 def __getitem__(self, key): 101 return self.__func(self.__wrapped_list[key]) 102 103 def __iter__(self): 104 for item in self.__wrapped_list: 105 yield self.__func(item) 106 107 def __len__(self): 108 return len(self.__wrapped_list) 109 110 111class _PingInterarrivals(object): 112 """A helper class for treating ping interarrivals as a native list.""" 113 def __init__(self, ping_entries): 114 self.__ping_entries = ping_entries 115 116 def __getitem__(self, key): 117 return (self.__ping_entries[key + 1].timestamp - 118 self.__ping_entries[key].timestamp) 119 120 def __iter__(self): 121 for index in range(len(self.__ping_entries) - 1): 122 yield self[index] 123 124 def __len__(self): 125 return max(0, len(self.__ping_entries) - 1) 126