• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Nils Weiss <nils@we155.de>
5
6
7# """ Default imports required for setup of CAN interfaces  """
8
9import os
10import subprocess
11import sys
12
13from platform import python_implementation
14
15from scapy.main import load_layer, load_contrib
16from scapy.config import conf
17from scapy.error import log_runtime, Scapy_Exception
18from scapy.consts import LINUX
19
20load_layer("can", globals_dict=globals())
21conf.contribs['CAN']['swap-bytes'] = False
22
23# ############################################################################
24# """ Define interface names for automotive tests  """
25# ############################################################################
26iface0 = "vcan0"
27iface1 = "vcan1"
28
29try:
30    _root = os.geteuid() == 0
31except AttributeError:
32    _root = False
33
34_not_pypy = "pypy" not in python_implementation().lower()
35_socket_can_support = False
36
37
38def test_and_setup_socket_can(iface_name):
39    # type: (str) -> None
40    if 0 != subprocess.call(("cansend %s 000#" % iface_name).split()):
41        # iface_name is not enabled
42        if 0 != subprocess.call("modprobe vcan".split()):
43            raise Exception("modprobe vcan failed")
44        if 0 != subprocess.call(
45                ("ip link add name %s type vcan" % iface_name).split()):
46            log_runtime.debug(
47                "add %s failed: Maybe it was already up?" % iface_name)
48        if 0 != subprocess.call(
49                ("ip link set dev %s up" % iface_name).split()):
50            raise Exception("could not bring up %s" % iface_name)
51
52    if 0 != subprocess.call(("cansend %s 000#12" % iface_name).split()):
53        raise Exception("cansend doesn't work")
54
55    sys.__stderr__.write("SocketCAN setup done!\n")
56
57
58if LINUX and _root and _not_pypy:
59    try:
60        test_and_setup_socket_can(iface0)
61        test_and_setup_socket_can(iface1)
62        log_runtime.debug("CAN should work now")
63        _socket_can_support = True
64    except Exception as e:
65        sys.__stderr__.write("ERROR %s!\n" % e)
66
67
68sys.__stderr__.write("SocketCAN support: %s\n" % _socket_can_support)
69
70
71# ############################################################################
72# """ Define helper functions for CANSocket creation on all platforms """
73# ############################################################################
74if _socket_can_support:
75    from scapy.contrib.cansocket_native import *  # noqa: F403
76    new_can_socket = NativeCANSocket
77    new_can_socket0 = lambda: NativeCANSocket(iface0)
78    new_can_socket1 = lambda: NativeCANSocket(iface1)
79    can_socket_string_list = ["-c", iface0]
80    sys.__stderr__.write("Using NativeCANSocket\n")
81
82else:
83    from scapy.contrib.cansocket_python_can import *  # noqa: F403
84    new_can_socket = lambda iface: PythonCANSocket(bustype='virtual', channel=iface)  # noqa: E501
85    new_can_socket0 = lambda: PythonCANSocket(bustype='virtual', channel=iface0, timeout=0.01)  # noqa: E501
86    new_can_socket1 = lambda: PythonCANSocket(bustype='virtual', channel=iface1, timeout=0.01)  # noqa: E501
87    sys.__stderr__.write("Using PythonCANSocket virtual\n")
88
89
90# ############################################################################
91# """ Test if socket creation functions work """
92# ############################################################################
93s = new_can_socket(iface0)
94s.close()
95del s
96
97s = new_can_socket(iface1)
98s.close()
99del s
100
101
102def cleanup_interfaces():
103    # type: () -> bool
104    """
105    Helper function to remove virtual CAN interfaces after test
106
107    :return: True on success
108    """
109    if _socket_can_support:
110        if 0 != subprocess.call(["ip", "link", "delete", iface0]):
111            raise Exception("%s could not be deleted" % iface0)
112        if 0 != subprocess.call(["ip", "link", "delete", iface1]):
113            raise Exception("%s could not be deleted" % iface1)
114    return True
115
116
117def drain_bus(iface=iface0, assert_empty=True):
118    # type: (str, bool) -> None
119    """
120    Utility function for draining a can interface,
121    asserting that no packets are there
122
123    :param iface: Interface name to drain
124    :param assert_empty: If true, raise exception in case packets were received
125    """
126    with new_can_socket(iface) as s:
127        pkts = s.sniff(timeout=0.1)
128        if assert_empty and not len(pkts) == 0:
129            raise Scapy_Exception(
130                "Error in drain_bus. Packets found but no packets expected!")
131
132
133drain_bus(iface0)
134drain_bus(iface1)
135
136log_runtime.debug("CAN sockets should work now")
137
138# ############################################################################
139# """ Setup and definitions for ISOTP related stuff """
140# ############################################################################
141
142# ############################################################################
143# function to exit when the can-isotp kernel module is not available
144# ############################################################################
145ISOTP_KERNEL_MODULE_AVAILABLE = False
146
147
148def exit_if_no_isotp_module():
149    # type: () -> None
150    """
151    Helper function to exit a test case if ISOTP kernel module is not available
152    """
153    if not ISOTP_KERNEL_MODULE_AVAILABLE:
154        err = "TEST SKIPPED: can-isotp not available\n"
155        sys.__stderr__.write(err)
156        warning("Can't test ISOTPNativeSocket because "
157                "kernel module isn't loaded")
158        sys.exit(0)
159
160
161# ############################################################################
162# """ Evaluate if ISOTP kernel module is installed and available """
163# ############################################################################
164if LINUX and _root and _socket_can_support:
165    p1 = subprocess.Popen(['lsmod'], stdout=subprocess.PIPE)
166    p2 = subprocess.Popen(['grep', '^can_isotp'],
167                          stdout=subprocess.PIPE, stdin=p1.stdout)
168    p1.stdout.close()
169    if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read():
170        p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0],
171                             stdin=subprocess.PIPE)
172        p.communicate(b"01")
173        if p.returncode == 0:
174            ISOTP_KERNEL_MODULE_AVAILABLE = True
175
176# ############################################################################
177# """ Save configuration """
178# ############################################################################
179conf.contribs['ISOTP'] = \
180    {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE}
181
182# ############################################################################
183# """ reload ISOTP kernel module in case configuration changed """
184# ############################################################################
185import importlib
186if "scapy.contrib.isotp" in sys.modules:
187    importlib.reload(scapy.contrib.isotp)  # type: ignore  # noqa: F405
188
189load_contrib("isotp", globals_dict=globals())
190
191if ISOTP_KERNEL_MODULE_AVAILABLE:
192    if ISOTPSocket is not ISOTPNativeSocket:  # type: ignore
193        raise Scapy_Exception("Error in ISOTPSocket import!")
194else:
195    if ISOTPSocket is not ISOTPSoftSocket:  # type: ignore
196        raise Scapy_Exception("Error in ISOTPSocket import!")
197