#!/usr/bin/python # # Copyright 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Generic netlink interface to TCP metrics.""" from socket import * # pylint: disable=wildcard-import import struct import cstruct import genetlink import net_test import netlink ### TCP metrics constants. See include/uapi/linux/tcp_metrics.h. # Family name and version TCP_METRICS_GENL_NAME = "tcp_metrics" TCP_METRICS_GENL_VERSION = 1 # Message types. TCP_METRICS_CMD_GET = 1 TCP_METRICS_CMD_DEL = 2 # Attributes. TCP_METRICS_ATTR_UNSPEC = 0 TCP_METRICS_ATTR_ADDR_IPV4 = 1 TCP_METRICS_ATTR_ADDR_IPV6 = 2 TCP_METRICS_ATTR_AGE = 3 TCP_METRICS_ATTR_TW_TSVAL = 4 TCP_METRICS_ATTR_TW_TS_STAMP = 5 TCP_METRICS_ATTR_VALS = 6 TCP_METRICS_ATTR_FOPEN_MSS = 7 TCP_METRICS_ATTR_FOPEN_SYN_DROPS = 8 TCP_METRICS_ATTR_FOPEN_SYN_DROP_TS = 9 TCP_METRICS_ATTR_FOPEN_COOKIE = 10 TCP_METRICS_ATTR_SADDR_IPV4 = 11 TCP_METRICS_ATTR_SADDR_IPV6 = 12 TCP_METRICS_ATTR_PAD = 13 class TcpMetrics(genetlink.GenericNetlink): NL_DEBUG = ["ALL"] def __init__(self): super(TcpMetrics, self).__init__() # Generic netlink family IDs are dynamically assigned. Find ours. ctrl = genetlink.GenericNetlinkControl() self.family = ctrl.GetFamily(TCP_METRICS_GENL_NAME) def _Decode(self, command, msg, nla_type, nla_data): """Decodes TCP metrics netlink attributes to human-readable format.""" name = self._GetConstantName(__name__, nla_type, "TCP_METRICS_ATTR_") if name in ["TCP_METRICS_ATTR_ADDR_IPV4", "TCP_METRICS_ATTR_SADDR_IPV4"]: data = inet_ntop(AF_INET, nla_data) elif name in ["TCP_METRICS_ATTR_ADDR_IPV6", "TCP_METRICS_ATTR_SADDR_IPV6"]: data = inet_ntop(AF_INET6, nla_data) elif name in ["TCP_METRICS_ATTR_AGE"]: data = struct.unpack("=Q", nla_data)[0] elif name in ["TCP_METRICS_ATTR_TW_TSVAL", "TCP_METRICS_ATTR_TW_TS_STAMP"]: data = struct.unpack("=I", nla_data)[0] elif name == "TCP_METRICS_ATTR_FOPEN_MSS": data = struct.unpack("=H", nla_data)[0] elif name == "TCP_METRICS_ATTR_FOPEN_COOKIE": data = nla_data else: data = nla_data.encode("hex") return name, data def MaybeDebugCommand(self, command, unused_flags, data): if "ALL" not in self.NL_DEBUG and command not in self.NL_DEBUG: return parsed = self._ParseNLMsg(data, genetlink.Genlmsghdr) def _NlAttrSaddr(self, address): if ":" not in address: family = AF_INET nla_type = TCP_METRICS_ATTR_SADDR_IPV4 else: family = AF_INET6 nla_type = TCP_METRICS_ATTR_SADDR_IPV6 return self._NlAttrIPAddress(nla_type, family, address) def _NlAttrTcpMetricsAddr(self, address, is_source): version = net_test.GetAddressVersion(address) family = net_test.GetAddressFamily(version) if version == 5: address = address.replace("::ffff:", "") nla_name = "TCP_METRICS_ATTR_%s_IPV%d" % ( "SADDR" if is_source else "ADDR", version) nla_type = globals()[nla_name] return self._NlAttrIPAddress(nla_type, family, address) def _NlAttrAddr(self, address): return self._NlAttrTcpMetricsAddr(address, False) def _NlAttrSaddr(self, address): return self._NlAttrTcpMetricsAddr(address, True) def DumpMetrics(self): """Dumps all TCP metrics.""" return self._Dump(self.family, TCP_METRICS_CMD_GET, 1) def GetMetrics(self, saddr, daddr): """Returns TCP metrics for the specified src/dst pair.""" data = self._NlAttrSaddr(saddr) + self._NlAttrAddr(daddr) self._SendCommand(self.family, TCP_METRICS_CMD_GET, 1, data, netlink.NLM_F_REQUEST) hdr, attrs = self._GetMsg(genetlink.Genlmsghdr) return attrs def DelMetrics(self, saddr, daddr): """Deletes TCP metrics for the specified src/dst pair.""" data = self._NlAttrSaddr(saddr) + self._NlAttrAddr(daddr) self._SendCommand(self.family, TCP_METRICS_CMD_DEL, 1, data, netlink.NLM_F_REQUEST) if __name__ == "__main__": t = TcpMetrics() print(t.DumpMetrics())