1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Nils Weiss <nils@we155.de> 5 6 7# """ Default imports required for setup of CAN interfaces """ 8 9import os 10import subprocess 11import sys 12 13from platform import python_implementation 14 15from scapy.main import load_layer, load_contrib 16from scapy.config import conf 17from scapy.error import log_runtime, Scapy_Exception 18from scapy.consts import LINUX 19 20load_layer("can", globals_dict=globals()) 21conf.contribs['CAN']['swap-bytes'] = False 22 23# ############################################################################ 24# """ Define interface names for automotive tests """ 25# ############################################################################ 26iface0 = "vcan0" 27iface1 = "vcan1" 28 29try: 30 _root = os.geteuid() == 0 31except AttributeError: 32 _root = False 33 34_not_pypy = "pypy" not in python_implementation().lower() 35_socket_can_support = False 36 37 38def test_and_setup_socket_can(iface_name): 39 # type: (str) -> None 40 if 0 != subprocess.call(("cansend %s 000#" % iface_name).split()): 41 # iface_name is not enabled 42 if 0 != subprocess.call("modprobe vcan".split()): 43 raise Exception("modprobe vcan failed") 44 if 0 != subprocess.call( 45 ("ip link add name %s type vcan" % iface_name).split()): 46 log_runtime.debug( 47 "add %s failed: Maybe it was already up?" % iface_name) 48 if 0 != subprocess.call( 49 ("ip link set dev %s up" % iface_name).split()): 50 raise Exception("could not bring up %s" % iface_name) 51 52 if 0 != subprocess.call(("cansend %s 000#12" % iface_name).split()): 53 raise Exception("cansend doesn't work") 54 55 sys.__stderr__.write("SocketCAN setup done!\n") 56 57 58if LINUX and _root and _not_pypy: 59 try: 60 test_and_setup_socket_can(iface0) 61 test_and_setup_socket_can(iface1) 62 log_runtime.debug("CAN should work now") 63 _socket_can_support = True 64 except Exception as e: 65 sys.__stderr__.write("ERROR %s!\n" % e) 66 67 68sys.__stderr__.write("SocketCAN support: %s\n" % _socket_can_support) 69 70 71# ############################################################################ 72# """ Define helper functions for CANSocket creation on all platforms """ 73# ############################################################################ 74if _socket_can_support: 75 from scapy.contrib.cansocket_native import * # noqa: F403 76 new_can_socket = NativeCANSocket 77 new_can_socket0 = lambda: NativeCANSocket(iface0) 78 new_can_socket1 = lambda: NativeCANSocket(iface1) 79 can_socket_string_list = ["-c", iface0] 80 sys.__stderr__.write("Using NativeCANSocket\n") 81 82else: 83 from scapy.contrib.cansocket_python_can import * # noqa: F403 84 new_can_socket = lambda iface: PythonCANSocket(bustype='virtual', channel=iface) # noqa: E501 85 new_can_socket0 = lambda: PythonCANSocket(bustype='virtual', channel=iface0, timeout=0.01) # noqa: E501 86 new_can_socket1 = lambda: PythonCANSocket(bustype='virtual', channel=iface1, timeout=0.01) # noqa: E501 87 sys.__stderr__.write("Using PythonCANSocket virtual\n") 88 89 90# ############################################################################ 91# """ Test if socket creation functions work """ 92# ############################################################################ 93s = new_can_socket(iface0) 94s.close() 95del s 96 97s = new_can_socket(iface1) 98s.close() 99del s 100 101 102def cleanup_interfaces(): 103 # type: () -> bool 104 """ 105 Helper function to remove virtual CAN interfaces after test 106 107 :return: True on success 108 """ 109 if _socket_can_support: 110 if 0 != subprocess.call(["ip", "link", "delete", iface0]): 111 raise Exception("%s could not be deleted" % iface0) 112 if 0 != subprocess.call(["ip", "link", "delete", iface1]): 113 raise Exception("%s could not be deleted" % iface1) 114 return True 115 116 117def drain_bus(iface=iface0, assert_empty=True): 118 # type: (str, bool) -> None 119 """ 120 Utility function for draining a can interface, 121 asserting that no packets are there 122 123 :param iface: Interface name to drain 124 :param assert_empty: If true, raise exception in case packets were received 125 """ 126 with new_can_socket(iface) as s: 127 pkts = s.sniff(timeout=0.1) 128 if assert_empty and not len(pkts) == 0: 129 raise Scapy_Exception( 130 "Error in drain_bus. Packets found but no packets expected!") 131 132 133drain_bus(iface0) 134drain_bus(iface1) 135 136log_runtime.debug("CAN sockets should work now") 137 138# ############################################################################ 139# """ Setup and definitions for ISOTP related stuff """ 140# ############################################################################ 141 142# ############################################################################ 143# function to exit when the can-isotp kernel module is not available 144# ############################################################################ 145ISOTP_KERNEL_MODULE_AVAILABLE = False 146 147 148def exit_if_no_isotp_module(): 149 # type: () -> None 150 """ 151 Helper function to exit a test case if ISOTP kernel module is not available 152 """ 153 if not ISOTP_KERNEL_MODULE_AVAILABLE: 154 err = "TEST SKIPPED: can-isotp not available\n" 155 sys.__stderr__.write(err) 156 warning("Can't test ISOTPNativeSocket because " 157 "kernel module isn't loaded") 158 sys.exit(0) 159 160 161# ############################################################################ 162# """ Evaluate if ISOTP kernel module is installed and available """ 163# ############################################################################ 164if LINUX and _root and _socket_can_support: 165 p1 = subprocess.Popen(['lsmod'], stdout=subprocess.PIPE) 166 p2 = subprocess.Popen(['grep', '^can_isotp'], 167 stdout=subprocess.PIPE, stdin=p1.stdout) 168 p1.stdout.close() 169 if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read(): 170 p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0], 171 stdin=subprocess.PIPE) 172 p.communicate(b"01") 173 if p.returncode == 0: 174 ISOTP_KERNEL_MODULE_AVAILABLE = True 175 176# ############################################################################ 177# """ Save configuration """ 178# ############################################################################ 179conf.contribs['ISOTP'] = \ 180 {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE} 181 182# ############################################################################ 183# """ reload ISOTP kernel module in case configuration changed """ 184# ############################################################################ 185import importlib 186if "scapy.contrib.isotp" in sys.modules: 187 importlib.reload(scapy.contrib.isotp) # type: ignore # noqa: F405 188 189load_contrib("isotp", globals_dict=globals()) 190 191if ISOTP_KERNEL_MODULE_AVAILABLE: 192 if ISOTPSocket is not ISOTPNativeSocket: # type: ignore 193 raise Scapy_Exception("Error in ISOTPSocket import!") 194else: 195 if ISOTPSocket is not ISOTPSoftSocket: # type: ignore 196 raise Scapy_Exception("Error in ISOTPSocket import!") 197