1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6# scapy.contrib.description = Helper class for tracking Ecu states (Ecu) 7# scapy.contrib.status = loads 8 9import time 10import random 11import copy 12import itertools 13 14from collections import defaultdict 15from types import GeneratorType 16from threading import Lock 17 18from scapy.compat import orb 19from scapy.packet import Raw, Packet 20from scapy.plist import PacketList 21from scapy.sessions import DefaultSession 22from scapy.ansmachine import AnsweringMachine 23from scapy.supersocket import SuperSocket 24from scapy.error import Scapy_Exception 25 26# Typing imports 27from typing import ( 28 Any, 29 Union, 30 Iterable, 31 Callable, 32 List, 33 Optional, 34 Tuple, 35 Type, 36 cast, 37 Dict, 38) 39 40 41__all__ = ["EcuState", "Ecu", "EcuResponse", "EcuSession", 42 "EcuAnsweringMachine"] 43 44 45class EcuState(object): 46 """ 47 Stores the state of an Ecu. The state is defined by a protocol, for 48 example UDS or GMLAN. 49 A EcuState supports comparison and serialization (command()). 50 """ 51 __slots__ = ["__dict__", "__cache__"] 52 53 def __init__(self, **kwargs): 54 # type: (Any) -> None 55 self.__cache__ = None # type: Optional[Tuple[List[EcuState], List[Any]]] # noqa: E501 56 for k, v in kwargs.items(): 57 if isinstance(v, GeneratorType): 58 v = list(v) 59 self.__setitem__(k, v) 60 61 def _expand(self): 62 # type: () -> List[EcuState] 63 values = list(self.__dict__.values()) 64 keys = list(self.__dict__.keys()) 65 if self.__cache__ is None or self.__cache__[1] != values: 66 expanded = list() 67 for x in itertools.product(*[self._flatten(v) for v in values]): 68 kwargs = {} 69 for i, k in enumerate(keys): 70 if x[i] is None: 71 continue 72 kwargs[k] = x[i] 73 expanded.append(EcuState(**kwargs)) 74 self.__cache__ = (expanded, values) 75 return self.__cache__[0] 76 77 @staticmethod 78 def _flatten(x): 79 # type: (Any) -> List[Any] 80 if isinstance(x, (str, bytes)): 81 return [x] 82 elif hasattr(x, "__iter__") and hasattr(x, "__len__") and len(x) == 1: 83 return list(*x) 84 elif not hasattr(x, "__iter__"): 85 return [x] 86 flattened = list() 87 for y in x: 88 if hasattr(x, "__iter__"): 89 flattened += EcuState._flatten(y) 90 else: 91 flattened += [y] 92 return flattened 93 94 def __delitem__(self, key): 95 # type: (str) -> None 96 self.__cache__ = None 97 del self.__dict__[key] 98 99 def __len__(self): 100 # type: () -> int 101 return len(self.__dict__.keys()) 102 103 def __getitem__(self, item): 104 # type: (str) -> Any 105 return self.__dict__[item] 106 107 def __setitem__(self, key, value): 108 # type: (str, Any) -> None 109 self.__cache__ = None 110 self.__dict__[key] = value 111 112 def __repr__(self): 113 # type: () -> str 114 return "".join(str(k) + str(v) for k, v in 115 sorted(self.__dict__.items(), key=lambda t: t[0])) 116 117 def __eq__(self, other): 118 # type: (object) -> bool 119 other = cast(EcuState, other) 120 if len(self.__dict__) != len(other.__dict__): 121 return False 122 try: 123 return all(self.__dict__[k] == other.__dict__[k] 124 for k in self.__dict__.keys()) 125 except KeyError: 126 return False 127 128 def __contains__(self, item): 129 # type: (EcuState) -> bool 130 if not isinstance(item, EcuState): 131 return False 132 return all(s in self._expand() for s in item._expand()) 133 134 def __ne__(self, other): 135 # type: (object) -> bool 136 return not other == self 137 138 def __lt__(self, other): 139 # type: (EcuState) -> bool 140 if self == other: 141 return False 142 143 if len(self) < len(other): 144 return True 145 146 if len(self) > len(other): 147 return False 148 149 common = set(self.__dict__.keys()).intersection( 150 set(other.__dict__.keys())) 151 152 for k in sorted(common): 153 if not isinstance(other.__dict__[k], type(self.__dict__[k])): 154 raise TypeError( 155 "Can't compare %s with %s for the EcuState element %s" % 156 (type(self.__dict__[k]), type(other.__dict__[k]), k)) 157 if self.__dict__[k] < other.__dict__[k]: 158 return True 159 if self.__dict__[k] > other.__dict__[k]: 160 return False 161 162 if len(common) < len(self.__dict__): 163 self_diffs = set(self.__dict__.keys()).difference( 164 set(other.__dict__.keys())) 165 other_diffs = set(other.__dict__.keys()).difference( 166 set(self.__dict__.keys())) 167 168 for s, o in zip(self_diffs, other_diffs): 169 if s < o: 170 return True 171 172 return False 173 174 raise TypeError("EcuStates should be identical. Something bad happen. " 175 "self: %s other: %s" % (self.__dict__, other.__dict__)) 176 177 def __hash__(self): 178 # type: () -> int 179 return hash(repr(self)) 180 181 def reset(self): 182 # type: () -> None 183 self.__cache__ = None 184 keys = list(self.__dict__.keys()) 185 for k in keys: 186 del self.__dict__[k] 187 188 def command(self): 189 # type: () -> str 190 return "EcuState(" + ", ".join( 191 ["%s=%s" % (k, repr(v)) for k, v in sorted( 192 self.__dict__.items(), key=lambda t: t[0])]) + ")" 193 194 @staticmethod 195 def extend_pkt_with_modifier(cls): 196 # type: (Type[Packet]) -> Callable[[Callable[[Packet, Packet, EcuState], None]], None] # noqa: E501 197 """ 198 Decorator to add a function as 'modify_ecu_state' method to a given 199 class. This allows dynamic modifications and additions to a protocol. 200 :param cls: A packet class to be modified 201 :return: Decorator function 202 """ 203 if len(cls.fields_desc) == 0: 204 raise Scapy_Exception("Packets without fields can't be extended.") 205 206 if hasattr(cls, "modify_ecu_state"): 207 raise Scapy_Exception( 208 "Class already extended. Can't override existing method.") 209 210 def decorator_function(f): 211 # type: (Callable[[Packet, Packet, EcuState], None]) -> None 212 setattr(cls, "modify_ecu_state", f) 213 214 return decorator_function 215 216 @staticmethod 217 def is_modifier_pkt(pkt): 218 # type: (Packet) -> bool 219 """ 220 Helper function to determine if a Packet contains a layer that 221 modifies the EcuState. 222 :param pkt: Packet to be analyzed 223 :return: True if pkt contains layer that implements modify_ecu_state 224 """ 225 return any(hasattr(layer, "modify_ecu_state") 226 for layer in pkt.layers()) 227 228 @staticmethod 229 def get_modified_ecu_state(response, request, state, modify_in_place=False): # noqa: E501 230 # type: (Packet, Packet, EcuState, bool) -> EcuState 231 """ 232 Helper function to get a modified EcuState from a Packet and a 233 previous EcuState. An EcuState is always modified after a response 234 Packet is received. In some protocols, the belonging request packet 235 is necessary to determine the precise state of the Ecu 236 237 :param response: Response packet that supports `modify_ecu_state` 238 :param request: Belonging request of the response that modifies Ecu 239 :param state: The previous/current EcuState 240 :param modify_in_place: If True, the given EcuState will be modified 241 :return: The modified EcuState or a modified copy 242 """ 243 if modify_in_place: 244 new_state = state 245 else: 246 new_state = copy.copy(state) 247 248 for layer in response.layers(): 249 if not hasattr(layer, "modify_ecu_state"): 250 continue 251 try: 252 layer.modify_ecu_state(response, request, new_state) 253 except TypeError: 254 layer.modify_ecu_state.im_func(response, request, new_state) 255 return new_state 256 257 258class Ecu(object): 259 """An Ecu object can be used to 260 * track the states of an Ecu. 261 * to log all modification to an Ecu. 262 * to extract supported responses of a real Ecu. 263 264 Example: 265 >>> print("This ecu logs, tracks and creates supported responses") 266 >>> my_virtual_ecu = Ecu() 267 >>> my_virtual_ecu.update(PacketList([...])) 268 >>> my_virtual_ecu.supported_responses 269 >>> print("Another ecu just tracks") 270 >>> my_tracking_ecu = Ecu(logging=False, store_supported_responses=False) 271 >>> my_tracking_ecu.update(PacketList([...])) 272 >>> print("Another ecu just logs all modifications to it") 273 >>> my_logging_ecu = Ecu(verbose=False, store_supported_responses=False) 274 >>> my_logging_ecu.update(PacketList([...])) 275 >>> my_logging_ecu.log 276 >>> print("Another ecu just creates supported responses") 277 >>> my_response_ecu = Ecu(verbose=False, logging=False) 278 >>> my_response_ecu.update(PacketList([...])) 279 >>> my_response_ecu.supported_responses 280 281 Parameters to initialize an Ecu object 282 283 :param logging: Turn logging on or off. Default is on. 284 :param verbose: Turn tracking on or off. Default is on. 285 :param store_supported_responses: Create a list of supported responses if True. 286 :param lookahead: Configuration for lookahead when computing supported responses 287 """ # noqa: E501 288 def __init__(self, logging=True, verbose=True, 289 store_supported_responses=True, lookahead=10): 290 # type: (bool, bool, bool, int) -> None 291 self.state = EcuState() 292 self.verbose = verbose 293 self.logging = logging 294 self.store_supported_responses = store_supported_responses 295 self.lookahead = lookahead 296 self.log = defaultdict(list) # type: Dict[str, List[Any]] 297 self.__supported_responses = list() # type: List[EcuResponse] 298 self.__unanswered_packets = PacketList() 299 300 def reset(self): 301 # type: () -> None 302 """ 303 Resets the internal state to a default EcuState. 304 """ 305 self.state = EcuState(session=1) 306 307 def update(self, p): 308 # type: (Union[Packet, PacketList]) -> None 309 """ 310 Processes a Packet or a list of Packets, according to the chosen 311 configuration. 312 :param p: Packet or list of Packets 313 """ 314 if isinstance(p, PacketList): 315 for pkt in p: 316 self.update(pkt) 317 elif not isinstance(p, Packet): 318 raise TypeError("Provide a Packet object for an update") 319 else: 320 self.__update(p) 321 322 def __update(self, pkt): 323 # type: (Packet) -> None 324 """ 325 Processes a Packet according to the chosen configuration. 326 :param pkt: Packet to be processed 327 """ 328 if self.verbose: 329 print(repr(self), repr(pkt)) 330 if self.logging: 331 self.__update_log(pkt) 332 self.__update_supported_responses(pkt) 333 334 def __update_log(self, pkt): 335 # type: (Packet) -> None 336 """ 337 Checks if a packet or a layer of this packet supports the function 338 `get_log`. If `get_log` is supported, this function will be executed 339 and the returned log information is stored in the intern log of this 340 Ecu object. 341 :param pkt: A Packet to be processed for log information. 342 """ 343 for layer in pkt.layers(): 344 if not hasattr(layer, "get_log"): 345 continue 346 try: 347 log_key, log_value = layer.get_log(pkt) 348 except TypeError: 349 log_key, log_value = layer.get_log.im_func(pkt) 350 351 self.log[log_key].append((pkt.time, log_value)) 352 353 def __update_supported_responses(self, pkt): 354 # type: (Packet) -> None 355 """ 356 Stores a given packet as supported response, if a matching request 357 packet is found in a list of the latest unanswered packets. For 358 performance improvements, this list of unanswered packets only contains 359 a fixed number of packets, defined by the `lookahead` parameter of 360 this Ecu. 361 :param pkt: Packet to be processed. 362 """ 363 self.__unanswered_packets.append(pkt) 364 reduced_plist = self.__unanswered_packets[-self.lookahead:] 365 answered, unanswered = reduced_plist.sr(lookahead=self.lookahead) 366 self.__unanswered_packets = unanswered 367 368 for req, resp in answered: 369 added = False 370 current_state = copy.copy(self.state) 371 EcuState.get_modified_ecu_state(resp, req, self.state, True) 372 373 if not self.store_supported_responses: 374 continue 375 376 for sup_resp in self.__supported_responses: 377 if resp == sup_resp.key_response: 378 if sup_resp.states is not None and \ 379 self.state not in sup_resp.states: 380 sup_resp.states.append(current_state) 381 added = True 382 break 383 384 if added: 385 continue 386 387 ecu_resp = EcuResponse(current_state, responses=resp) 388 if self.verbose: 389 print("[+] ", repr(ecu_resp)) 390 self.__supported_responses.append(ecu_resp) 391 392 @staticmethod 393 def sort_key_func(resp): 394 # type: (EcuResponse) -> Tuple[bool, int, int, int] 395 """ 396 This sorts responses in the following order: 397 1. Positive responses first 398 2. Lower ServiceIDs first 399 3. Less supported states first 400 4. Longer (more specific) responses first 401 :param resp: EcuResponse to be sorted 402 :return: Tuple as sort key 403 """ 404 first_layer = cast(Packet, resp.key_response[0]) # type: ignore 405 service = orb(bytes(first_layer)[0]) 406 return (service == 0x7f, 407 service, 408 0xffffffff - len(resp.states or []), 409 0xffffffff - len(resp.key_response)) 410 411 @property 412 def supported_responses(self): 413 # type: () -> List[EcuResponse] 414 """ 415 Returns a sorted list of supported responses. The sort is done in a way 416 to provide the best possible results, if this list of supported 417 responses is used to simulate an real world Ecu with the 418 EcuAnsweringMachine object. 419 :return: A sorted list of EcuResponse objects 420 """ 421 self.__supported_responses.sort(key=self.sort_key_func) 422 return self.__supported_responses 423 424 @property 425 def unanswered_packets(self): 426 # type: () -> PacketList 427 """ 428 A list of all unanswered packets, which were processed by this Ecu 429 object. 430 :return: PacketList of unanswered packets 431 """ 432 return self.__unanswered_packets 433 434 def __repr__(self): 435 # type: () -> str 436 return repr(self.state) 437 438 @staticmethod 439 def extend_pkt_with_logging(cls): 440 # type: (Type[Packet]) -> Callable[[Callable[[Packet], Tuple[str, Any]]], None] # noqa: E501 441 """ 442 Decorator to add a function as 'get_log' method to a given 443 class. This allows dynamic modifications and additions to a protocol. 444 :param cls: A packet class to be modified 445 :return: Decorator function 446 """ 447 448 def decorator_function(f): 449 # type: (Callable[[Packet], Tuple[str, Any]]) -> None 450 setattr(cls, "get_log", f) 451 452 return decorator_function 453 454 455class EcuSession(DefaultSession): 456 """ 457 Tracks modification to an Ecu object 'on-the-flow'. 458 459 The parameters for the internal Ecu object are obtained from the kwargs 460 dict. 461 462 `logging`: Turn logging on or off. Default is on. 463 `verbose`: Turn tracking on or off. Default is on. 464 `store_supported_responses`: Create a list of supported responses, if True. 465 466 Example: 467 >>> sniff(session=EcuSession) 468 469 """ 470 def __init__(self, *args, **kwargs): 471 # type: (Any, Any) -> None 472 self.ecu = Ecu(logging=kwargs.pop("logging", True), 473 verbose=kwargs.pop("verbose", True), 474 store_supported_responses=kwargs.pop("store_supported_responses", True)) # noqa: E501 475 super(EcuSession, self).__init__(*args, **kwargs) 476 477 def process(self, pkt: Packet) -> Optional[Packet]: 478 if not pkt: 479 return None 480 self.ecu.update(pkt) 481 return pkt 482 483 484class EcuResponse: 485 """Encapsulates responses and the according EcuStates. 486 A list of this objects can be used to configure an EcuAnsweringMachine. 487 This is useful, if you want to clone the behaviour of a real Ecu. 488 489 Example: 490 >>> EcuResponse(EcuState(session=2, security_level=2), responses=UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"deadbeef1")) 491 >>> EcuResponse([EcuState(session=range(2, 5), security_level=2), EcuState(session=3, security_level=5)], responses=UDS()/UDS_RDBIPR(dataIdentifier=9)/Raw(b"deadbeef4")) 492 493 Initialize an EcuResponse capsule 494 495 :param state: EcuState or list of EcuStates in which this response 496 is allowed to be sent. If no state provided, the response 497 packet will always be send. 498 :param responses: A Packet or a list of Packet objects. By default the 499 last packet is asked if it answers an incoming 500 packet. This allows to send for example 501 `requestCorrectlyReceived-ResponsePending` packets. 502 :param answers: Optional argument to provide a custom answer here: 503 `lambda resp, req: return resp.answers(req)` 504 This allows the modification of a response depending 505 on a request. Custom SecurityAccess mechanisms can 506 be implemented in this way or generic NegativeResponse 507 messages which answers to everything can be realized 508 in this way. 509 """ # noqa: E501 510 def __init__(self, state=None, responses=Raw(b"\x7f\x10"), answers=None): 511 # type: (Optional[Union[EcuState, Iterable[EcuState]]], Union[Iterable[Packet], PacketList, Packet], Optional[Callable[[Packet, Packet], bool]]) -> None # noqa: E501 512 if state is None: 513 self.__states = None # type: Optional[List[EcuState]] 514 else: 515 if hasattr(state, "__iter__"): 516 state = cast(List[EcuState], state) 517 self.__states = state 518 else: 519 self.__states = [state] 520 521 if isinstance(responses, PacketList): 522 self.__responses = responses # type: PacketList 523 elif isinstance(responses, Packet): 524 self.__responses = PacketList([responses]) 525 elif hasattr(responses, "__iter__"): 526 responses = cast(List[Packet], responses) 527 self.__responses = PacketList(responses) 528 else: 529 raise TypeError( 530 "Can't handle type %s as response" % type(responses)) 531 532 self.__custom_answers = answers 533 534 @property 535 def states(self): 536 # type: () -> Optional[List[EcuState]] 537 return self.__states 538 539 @property 540 def responses(self): 541 # type: () -> PacketList 542 return self.__responses 543 544 @property 545 def key_response(self): 546 # type: () -> Packet 547 pkt = self.__responses[-1] # type: Packet 548 return pkt 549 550 def supports_state(self, state): 551 # type: (EcuState) -> bool 552 if self.__states is None or len(self.__states) == 0: 553 return True 554 else: 555 return any(s == state or state in s for s in self.__states) 556 557 def answers(self, other): 558 # type: (Packet) -> Union[int, bool] 559 if self.__custom_answers is not None: 560 return self.__custom_answers(self.key_response, other) 561 else: 562 return self.key_response.answers(other) 563 564 def __repr__(self): 565 # type: () -> str 566 return "%s, responses=%s" % \ 567 (repr(self.__states), 568 [resp.summary() for resp in self.__responses]) 569 570 def __eq__(self, other): 571 # type: (object) -> bool 572 other = cast(EcuResponse, other) 573 574 responses_equal = \ 575 len(self.responses) == len(other.responses) and \ 576 all(bytes(x) == bytes(y) for x, y in zip(self.responses, 577 other.responses)) 578 if self.__states is None: 579 return responses_equal 580 else: 581 return any(other.supports_state(s) for s in self.__states) and \ 582 responses_equal 583 584 def __ne__(self, other): 585 # type: (object) -> bool 586 # Python 2.7 compat 587 return not self == other 588 589 def command(self): 590 # type: () -> str 591 if self.__states is not None: 592 return "EcuResponse(%s, responses=%s)" % ( 593 "[" + ", ".join(s.command() for s in self.__states) + "]", 594 "[" + ", ".join(p.command() for p in self.__responses) + "]") 595 else: 596 return "EcuResponse(responses=%s)" % "[" + ", ".join( 597 p.command() for p in self.__responses) + "]" 598 599 __hash__ = None # type: ignore 600 601 602class EcuAnsweringMachine(AnsweringMachine[PacketList]): 603 """AnsweringMachine which emulates the basic behaviour of a real world ECU. 604 Provide a list of ``EcuResponse`` objects to configure the behaviour of a 605 AnsweringMachine. 606 607 Usage: 608 >>> resp = EcuResponse(session=range(0,255), security_level=0, responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10)) 609 >>> sock = ISOTPSocket(can_iface, tx_id=0x700, rx_id=0x600, basecls=UDS) 610 >>> answering_machine = EcuAnsweringMachine(supported_responses=[resp], main_socket=sock, basecls=UDS) 611 >>> sim = threading.Thread(target=answering_machine, kwargs={'count': 4, 'timeout':5}) 612 >>> sim.start() 613 """ # noqa: E501 614 function_name = "EcuAnsweringMachine" 615 sniff_options_list = ["store", "opened_socket", "count", "filter", "prn", 616 "stop_filter", "timeout"] 617 618 def parse_options( 619 self, 620 supported_responses=None, # type: Optional[List[EcuResponse]] 621 main_socket=None, # type: Optional[SuperSocket] 622 broadcast_socket=None, # type: Optional[SuperSocket] 623 basecls=Raw, # type: Type[Packet] 624 timeout=None, # type: Optional[Union[int, float]] 625 initial_ecu_state=None # type: Optional[EcuState] 626 ): 627 # type: (...) -> None 628 """ 629 :param supported_responses: List of ``EcuResponse`` objects to define 630 the behaviour. The default response is 631 ``generalReject``. 632 :param main_socket: Defines the object of the socket to send 633 and receive packets. 634 :param broadcast_socket: Defines the object of the broadcast socket. 635 Listen-only, responds with the main_socket. 636 `None` to disable broadcast capabilities. 637 :param basecls: Provide a basecls of the used protocol 638 :param timeout: Specifies the timeout for sniffing in seconds. 639 """ 640 self._main_socket = main_socket # type: Optional[SuperSocket] 641 self._sockets = [self._main_socket] 642 643 if broadcast_socket is not None: 644 self._sockets.append(broadcast_socket) 645 646 self._initial_ecu_state = initial_ecu_state or EcuState(session=1) 647 self._ecu_state_mutex = Lock() 648 self._ecu_state = copy.copy(self._initial_ecu_state) 649 650 self._basecls = basecls # type: Type[Packet] 651 self._supported_responses = supported_responses 652 653 self.sniff_options["timeout"] = timeout 654 self.sniff_options["opened_socket"] = self._sockets 655 656 @property 657 def state(self): 658 # type: () -> EcuState 659 return self._ecu_state 660 661 def reset_state(self): 662 # type: () -> None 663 with self._ecu_state_mutex: 664 self._ecu_state = copy.copy(self._initial_ecu_state) 665 666 def is_request(self, req): 667 # type: (Packet) -> bool 668 return isinstance(req, self._basecls) 669 670 def make_reply(self, req): 671 # type: (Packet) -> PacketList 672 """ 673 Checks if a given request can be answered by the internal list of 674 EcuResponses. First, it's evaluated if the internal EcuState of this 675 AnsweringMachine is supported by an EcuResponse, next it's evaluated if 676 a request answers the key_response of this EcuResponse object. The 677 first fitting EcuResponse is used. If this EcuResponse modified the 678 EcuState, the internal EcuState of this AnsweringMachine is updated, 679 and the list of response Packets of the selected EcuResponse is 680 returned. If no EcuResponse if found, a PacketList with a generic 681 NegativeResponse is returned. 682 :param req: A request packet 683 :return: A list of response packets 684 """ 685 if self._supported_responses is not None: 686 for resp in self._supported_responses: 687 if not isinstance(resp, EcuResponse): 688 raise TypeError("Unsupported type for response. " 689 "Please use `EcuResponse` objects.") 690 691 with self._ecu_state_mutex: 692 if not resp.supports_state(self._ecu_state): 693 continue 694 695 if not resp.answers(req): 696 continue 697 698 EcuState.get_modified_ecu_state( 699 resp.key_response, req, self._ecu_state, True) 700 701 return resp.responses 702 703 return PacketList([self._basecls( 704 b"\x7f" + bytes(req)[0:1] + b"\x10")]) 705 706 def send_reply(self, reply, send_function=None): 707 # type: (PacketList, Optional[Any]) -> None 708 """ 709 Sends all Packets of a EcuResponse object. This allows to send multiple 710 packets up on a request. If the list contains more than one packet, 711 a random time between each packet is waited until the next packet will 712 be sent. 713 :param reply: List of packets to be sent. 714 """ 715 for p in reply: 716 if len(reply) > 1: 717 time.sleep(random.uniform(0.01, 0.5)) 718 if self._main_socket: 719 self._main_socket.send(p) 720