/kernel/tests/net/test/ |
D | netlink.py | 72 def _NlAttr(self, nla_type, data): argument 73 datalen = len(data) 77 return NLAttr((nla_len, nla_type)).Pack() + data + padding 104 def _ReadNlAttr(self, data): argument 106 nla, data = cstruct.Read(data, NLAttr) 111 nla_data, data = data[:datalen], data[padded_len:] 113 return nla, nla_data, data 115 def _ParseAttributes(self, command, msg, data, nested=0): argument 134 while data: 135 nla, nla_data, data = self._ReadNlAttr(data) [all …]
|
D | genetlink.py | 60 def _SendCommand(self, family, command, version, data, flags): argument 62 self._SendNlRequest(family, genlmsghdr.Pack() + data, flags) 75 def _DecodeOps(self, data): argument 78 while data: 80 datalen, index, data = data[:2], data[2:4], data[4:] 82 nla, nla_data, data = self._ReadNlAttr(data) 87 nla, nla_data, data = self._ReadNlAttr(data) 101 data = struct.unpack("=H", nla_data)[0] 103 data = nla_data.strip("\x00") 105 data = struct.unpack("=I", nla_data)[0] [all …]
|
D | csocket.py | 153 msg_level, msg_type, data = opt 154 if isinstance(data, int): 155 data = struct.pack("=I", data) 156 elif isinstance(data, ctypes.c_uint32): 157 data = struct.pack("=I", data.value) 158 elif not isinstance(data, str): 160 msg_level, msg_type, type(data))) 162 datalen = len(data) 166 msg_control += data + padding 178 data, buf = buf[:datalen], buf[padlen + datalen:] [all …]
|
D | tcp_metrics.py | 70 data = inet_ntop(AF_INET, nla_data) 72 data = inet_ntop(AF_INET6, nla_data) 74 data = struct.unpack("=Q", nla_data)[0] 76 data = struct.unpack("=I", nla_data)[0] 78 data = struct.unpack("=H", nla_data)[0] 80 data = nla_data 82 data = nla_data.encode("hex") 84 return name, data 86 def MaybeDebugCommand(self, command, unused_flags, data): argument 89 parsed = self._ParseNLMsg(data, genetlink.Genlmsghdr) [all …]
|
D | iproute.py | 313 data = struct.unpack("=I", nla_data)[0] 315 data = struct.unpack("!I", nla_data)[0] 317 data = struct.unpack("=i", nla_data)[0] 319 data = ord(nla_data) 322 data = socket.inet_ntop(msg.family, nla_data) 325 data = nla_data.strip("\x00") 327 data = self._ParseAttributes(-RTA_METRICS, None, nla_data, nested + 1) 329 data = self._ParseAttributes(-IFLA_LINKINFO, None, nla_data, nested + 1) 331 data = self._ParseAttributes(-IFLA_INFO_DATA, None, nla_data) 333 data = RTACacheinfo(nla_data) [all …]
|
D | pf_key.py | 167 def ParseExtension(exttype, data): argument 187 ext, attrs = cstruct.Read(data, struct_type) 189 ext, attrs, = data, "" 290 def ParseExtensions(data): argument 293 while data: 294 ext, data = cstruct.Read(data, SadbExt) 296 extdata, data = data[:datalen], data[datalen:] 306 msg, data = cstruct.Read(received, SadbMsg) 308 extensions, data = data[:extlen], data[extlen:]
|
D | tcp_nuke_addr_test.py | 89 data = "foo" 91 self.assertEquals(len(data), s1.send(data)) 92 self.assertEquals(data, s2.recv(4096))
|
D | sock_diag.py | 131 data = ord(nla_data) 133 data = nla_data.strip("\x00") 135 data = InetDiagMeminfo(nla_data) 138 data = TcpInfo(nla_data) 140 data = SkMeminfo(nla_data) 142 data = struct.unpack("=I", nla_data)[0] 144 data = self.DecodeBytecode(nla_data) 146 data = [] 156 data.append(addr) 158 data = nla_data [all …]
|
D | xfrm.py | 339 def MaybeDebugCommand(self, command, flags, data): argument 359 print "%s %s" % (cmdname, str(self._ParseNLMsg(data, struct_type))) 368 data = cstruct.Read(nla_data, XfrmAlgo)[0] 370 data = cstruct.Read(nla_data, XfrmAlgoAuth)[0] 372 data = cstruct.Read(nla_data, XfrmEncapTmpl)[0] 374 data = cstruct.Read(nla_data, XfrmMark)[0] 376 data = struct.unpack("=I", nla_data)[0] 378 data = cstruct.Read(nla_data, XfrmUserTmpl)[0] 380 data = struct.unpack("=I", nla_data)[0] 382 data = nla_data [all …]
|
D | csocket_test.py | 41 data, addr = csocket.Recvfrom(s, 4096, 0) 42 self.assertEqual("foo", data) 73 data, addr, cmsg = csocket.Recvmsg(s, 4096, 1024, 0) 74 self.assertEqual("foo", data)
|
D | cstruct.py | 161 def _Parse(self, data): argument 162 data = data[:self._length] 163 values = list(struct.unpack(self._format, data)) 273 def Read(data, struct_type): argument 275 return struct_type(data), data[length:]
|
D | xfrm_algorithm_test.py | 242 data = accepted.recv(2048) 243 self.assertEquals("hello request", data) 253 data, peer = sock.recvfrom(2048) 256 self.assertEquals("hello request", data) 285 data = sock_left.recv(2048) 286 self.assertEquals("hello response", data)
|
D | leak_test.py | 45 data, addr = csocket.Recvfrom(s, 4096) 46 self.assertEqual("", data)
|
D | ping6_test.py | 264 def assertValidPingResponse(self, s, data): argument 278 self.assertGreater(len(data), 7, "Not enough data for ping packet") 280 self.assertTrue(data.startswith("\x08\x00"), "Not an IPv4 echo request") 282 self.assertTrue(data.startswith("\x80\x00"), "Not an IPv6 echo request") 306 self.assertEqual(len(data), len(rcvd)) 307 self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) 346 data = net_test.IPV4_PING + "foobarbaz" 347 s.send(data) 348 self.assertValidPingResponse(s, data) 404 scapy.ICMPv6EchoRequest(id=port, seq=1, data="foobarbaz")) [all …]
|
D | qtaguid_test.py | 92 data, sockaddr = s.recvfrom(4096) 93 self.assertEqual("foo", data) 107 data, sockaddr = s.recvfrom(4096) 108 self.assertEqual("foo", data) 119 data, sockaddr = s.recvfrom(4096) 120 self.assertEqual("foo", data)
|
D | tcp_test.py | 119 desc, data = packets.ACK(version, myaddr, remoteaddr, establishing_ack, 122 self.ExpectPacketOn(netid, msg + ": expecting %s" % desc, data) 124 desc, fin = packets.FIN(version, remoteaddr, myaddr, data)
|
D | xfrm_tunnel_test.py | 171 data, src = read_sock.recvfrom(4096) 172 self.assertEquals(net_test.UDP_PAYLOAD, data) 665 data, src = read_sock.recvfrom(4096) 667 self.assertEquals(net_test.UDP_PAYLOAD, data) 750 data, src = read_sock.recvfrom(4096) 751 self.assertEquals(net_test.UDP_PAYLOAD, data)
|
D | bpf_test.py | 72 data, retaddr = csocket.Recvfrom(sock, 4096, 0) 73 assert "foo" == data
|
D | multinetwork_test.py | 440 desc, data = packets.ACK(version, myaddr, remoteaddr, establishing_ack, 443 self.ExpectPacketOn(netid, msg + ": expecting %s" % desc, data) 447 ack = packets.ACK(version, remoteaddr, myaddr, data)[1]
|
D | xfrm_test.py | 320 data, src = twisted_socket.recvfrom(4096) 321 self.assertEquals(net_test.UDP_PAYLOAD, data)
|