• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-or-later
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
5
6# flake8: noqa E266
7# (We keep comment boxes, it's then one-line comments)
8
9"""
10C API calls to Windows DLLs
11"""
12
13import ctypes
14import ctypes.wintypes
15from ctypes import (
16    POINTER,
17    Structure,
18    WINFUNCTYPE,
19    byref,
20    create_string_buffer,
21)
22from socket import AddressFamily
23
24from scapy.config import conf
25from scapy.consts import WINDOWS_XP
26from scapy.data import MTU
27
28# Typing imports
29from typing import (
30    Any,
31    Dict,
32    IO,
33    List,
34    Optional,
35    Tuple,
36)
37
38ANY_SIZE = 65500  # FIXME quite inefficient :/
39NO_ERROR = 0x0
40
41CHAR = ctypes.c_char
42DWORD = ctypes.wintypes.DWORD
43BOOL = ctypes.wintypes.BOOL
44BOOLEAN = ctypes.wintypes.BOOLEAN
45ULONG = ctypes.wintypes.ULONG
46ULONGLONG = ctypes.c_ulonglong
47HANDLE = ctypes.wintypes.HANDLE
48LPVOID = ctypes.wintypes.LPVOID
49LPWSTR = ctypes.wintypes.LPWSTR
50VOID = ctypes.c_void_p
51INT = ctypes.c_int
52UINT = ctypes.wintypes.UINT
53UINT8 = ctypes.c_uint8
54UINT16 = ctypes.c_uint16
55UINT32 = ctypes.c_uint32
56UINT64 = ctypes.c_uint64
57BYTE = ctypes.c_byte
58UCHAR = UBYTE = ctypes.c_ubyte
59SHORT = ctypes.c_short
60USHORT = ctypes.c_ushort
61
62
63# UTILS
64
65
66def _resolve_list(list_obj):
67    # type: (Any) -> List[Dict[str, Any]]
68    current = list_obj
69    _list = []
70    while current and hasattr(current, "contents"):
71        _list.append(_struct_to_dict(current.contents))
72        current = current.contents.next
73    return _list
74
75
76def _struct_to_dict(struct_obj):
77    # type: (Any) -> Dict[str, Any]
78    results = {}  # type: Dict[str, Any]
79    for fname, ctype in struct_obj.__class__._fields_:
80        val = getattr(struct_obj, fname)
81        if fname == "next":
82            # Already covered by the trick below
83            continue
84        if issubclass(ctype, (Structure, ctypes.Union)):
85            results[fname] = _struct_to_dict(val)
86        elif val and hasattr(val, "contents"):
87            # Let's resolve recursive pointers
88            if hasattr(val.contents, "next"):
89                results[fname] = _resolve_list(val)
90            else:
91                results[fname] = val
92        else:
93            results[fname] = val
94    return results
95
96##############################
97####### WinAPI handles #######
98##############################
99
100_winapi_SetConsoleTitle = ctypes.windll.kernel32.SetConsoleTitleW
101_winapi_SetConsoleTitle.restype = BOOL
102_winapi_SetConsoleTitle.argtypes = [LPWSTR]
103
104def _windows_title(title=None):
105    # type: (Optional[str]) -> None
106    """
107    Updates the terminal title with the default one or with `title`
108    if provided.
109    """
110    if conf.interactive:
111        _winapi_SetConsoleTitle(title or "Scapy v{}".format(conf.version))
112
113
114SC_HANDLE = HANDLE
115
116class SERVICE_STATUS(Structure):
117    """https://docs.microsoft.com/en-us/windows/desktop/api/winsvc/ns-winsvc-_service_status"""  # noqa: E501
118    _fields_ = [("dwServiceType", DWORD),
119                ("dwCurrentState", DWORD),
120                ("dwControlsAccepted", DWORD),
121                ("dwWin32ExitCode", DWORD),
122                ("dwServiceSpecificExitCode", DWORD),
123                ("dwCheckPoint", DWORD),
124                ("dwWaitHint", DWORD)]
125
126
127OpenServiceW = ctypes.windll.Advapi32.OpenServiceW
128OpenServiceW.restype = SC_HANDLE
129OpenServiceW.argtypes = [SC_HANDLE, LPWSTR, DWORD]
130
131CloseServiceHandle = ctypes.windll.Advapi32.CloseServiceHandle
132CloseServiceHandle.restype = BOOL
133CloseServiceHandle.argtypes = [SC_HANDLE]
134
135OpenSCManagerW = ctypes.windll.Advapi32.OpenSCManagerW
136OpenSCManagerW.restype = SC_HANDLE
137OpenSCManagerW.argtypes = [LPWSTR, LPWSTR, DWORD]
138
139QueryServiceStatus = ctypes.windll.Advapi32.QueryServiceStatus
140QueryServiceStatus.restype = BOOL
141QueryServiceStatus.argtypes = [SC_HANDLE, POINTER(SERVICE_STATUS)]
142
143def get_service_status(service):
144    # type: (str) -> Dict[str, int]
145    """Returns content of QueryServiceStatus for a service"""
146    SERVICE_QUERY_STATUS = 0x0004
147    schSCManager = OpenSCManagerW(
148        None,  # Local machine
149        None,  # SERVICES_ACTIVE_DATABASE
150        SERVICE_QUERY_STATUS
151    )
152    service = OpenServiceW(
153        schSCManager,
154        service,
155        SERVICE_QUERY_STATUS
156    )
157    status = SERVICE_STATUS()
158    QueryServiceStatus(
159        service,
160        status
161    )
162    result = _struct_to_dict(status)
163    CloseServiceHandle(service)
164    CloseServiceHandle(schSCManager)
165    return result
166
167
168##############################
169###### Define IPHLPAPI  ######
170##############################
171
172
173iphlpapi = ctypes.windll.iphlpapi
174
175##############################
176########### Common ###########
177##############################
178
179
180class in_addr(Structure):
181    _fields_ = [("byte", UBYTE * 4)]
182
183
184class in6_addr(Structure):
185    _fields_ = [("byte", UBYTE * 16)]
186
187
188class sockaddr_in(Structure):
189    _fields_ = [("sin_family", SHORT),
190                ("sin_port", USHORT),
191                ("sin_addr", in_addr),
192                ("sin_zero", 8 * CHAR)]
193
194
195class sockaddr_in6(Structure):
196    _fields_ = [("sin6_family", SHORT),
197                ("sin6_port", USHORT),
198                ("sin6_flowinfo", ULONG),
199                ("sin6_addr", in6_addr),
200                ("sin6_scope_id", ULONG)]
201
202
203class SOCKADDR_INET(ctypes.Union):
204    _fields_ = [("Ipv4", sockaddr_in),
205                ("Ipv6", sockaddr_in6),
206                ("si_family", USHORT)]
207
208
209##############################
210##### Adapters Addresses #####
211##############################
212
213
214# Our GetAdaptersAddresses implementation is inspired by
215# @sphaero 's gist: https://gist.github.com/sphaero/f9da6ebb9a7a6f679157
216# published under a MPL 2.0 License (GPLv2 compatible)
217
218# from iptypes.h
219MAX_ADAPTER_ADDRESS_LENGTH = 8
220MAX_DHCPV6_DUID_LENGTH = 130
221
222GAA_FLAG_INCLUDE_PREFIX = 0x0010
223GAA_FLAG_INCLUDE_ALL_INTERFACES = 0x0100
224# for now, just use void * for pointers to unused structures
225PIP_ADAPTER_WINS_SERVER_ADDRESS_LH = VOID
226PIP_ADAPTER_GATEWAY_ADDRESS_LH = VOID
227PIP_ADAPTER_DNS_SUFFIX = VOID
228
229IF_OPER_STATUS = UINT
230IF_LUID = UINT64
231
232NET_IF_COMPARTMENT_ID = UINT32
233GUID = BYTE * 16
234NET_IF_NETWORK_GUID = GUID
235NET_IF_CONNECTION_TYPE = UINT  # enum
236TUNNEL_TYPE = UINT  # enum
237
238
239class SOCKET_ADDRESS(ctypes.Structure):
240    _fields_ = [('address', POINTER(SOCKADDR_INET)),
241                ('length', INT)]
242
243
244class _IP_ADAPTER_ADDRESSES_METRIC(Structure):
245    _fields_ = [('length', ULONG),
246                ('interface_index', DWORD)]
247
248
249class IP_ADAPTER_UNICAST_ADDRESS(Structure):
250    pass
251
252
253PIP_ADAPTER_UNICAST_ADDRESS = POINTER(IP_ADAPTER_UNICAST_ADDRESS)
254if WINDOWS_XP:
255    IP_ADAPTER_UNICAST_ADDRESS._fields_ = [
256        ("length", ULONG),
257        ("flags", DWORD),
258        ("next", PIP_ADAPTER_UNICAST_ADDRESS),
259        ("address", SOCKET_ADDRESS),
260        ("prefix_origin", INT),
261        ("suffix_origin", INT),
262        ("dad_state", INT),
263        ("valid_lifetime", ULONG),
264        ("preferred_lifetime", ULONG),
265        ("lease_lifetime", ULONG),
266    ]
267else:
268    IP_ADAPTER_UNICAST_ADDRESS._fields_ = [
269        ("length", ULONG),
270        ("flags", DWORD),
271        ("next", PIP_ADAPTER_UNICAST_ADDRESS),
272        ("address", SOCKET_ADDRESS),
273        ("prefix_origin", INT),
274        ("suffix_origin", INT),
275        ("dad_state", INT),
276        ("valid_lifetime", ULONG),
277        ("preferred_lifetime", ULONG),
278        ("lease_lifetime", ULONG),
279        ("on_link_prefix_length", UBYTE)
280    ]
281
282
283class IP_ADAPTER_ANYCAST_ADDRESS(Structure):
284    pass
285
286
287PIP_ADAPTER_ANYCAST_ADDRESS = POINTER(IP_ADAPTER_ANYCAST_ADDRESS)
288IP_ADAPTER_ANYCAST_ADDRESS._fields_ = [
289    ("length", ULONG),
290    ("flags", DWORD),
291    ("next", PIP_ADAPTER_ANYCAST_ADDRESS),
292    ("address", SOCKET_ADDRESS),
293]
294
295
296class IP_ADAPTER_MULTICAST_ADDRESS(Structure):
297    pass
298
299
300PIP_ADAPTER_MULTICAST_ADDRESS = POINTER(IP_ADAPTER_MULTICAST_ADDRESS)
301IP_ADAPTER_MULTICAST_ADDRESS._fields_ = [
302    ("length", ULONG),
303    ("flags", DWORD),
304    ("next", PIP_ADAPTER_MULTICAST_ADDRESS),
305    ("address", SOCKET_ADDRESS),
306]
307
308
309class IP_ADAPTER_DNS_SERVER_ADDRESS(Structure):
310    pass
311
312
313PIP_ADAPTER_DNS_SERVER_ADDRESS = POINTER(IP_ADAPTER_DNS_SERVER_ADDRESS)
314IP_ADAPTER_DNS_SERVER_ADDRESS._fields_ = [
315    ("length", ULONG),
316    ("flags", DWORD),
317    ("next", PIP_ADAPTER_DNS_SERVER_ADDRESS),
318    ("address", SOCKET_ADDRESS),
319]
320
321
322class IP_ADAPTER_PREFIX(Structure):
323    pass
324
325
326PIP_ADAPTER_PREFIX = ctypes.POINTER(IP_ADAPTER_PREFIX)
327IP_ADAPTER_PREFIX._fields_ = [
328    ("alignment", ULONGLONG),
329    ("next", PIP_ADAPTER_PREFIX),
330    ("address", SOCKET_ADDRESS),
331    ("prefix_length", ULONG)
332]
333
334
335class IP_ADAPTER_ADDRESSES(Structure):
336    pass
337
338
339LP_IP_ADAPTER_ADDRESSES = POINTER(IP_ADAPTER_ADDRESSES)
340
341if WINDOWS_XP:
342    IP_ADAPTER_ADDRESSES._fields_ = [
343        ('length', ULONG),
344        ('interface_index', DWORD),
345        ('next', LP_IP_ADAPTER_ADDRESSES),
346        ('adapter_name', ctypes.c_char_p),
347        ('first_unicast_address', PIP_ADAPTER_UNICAST_ADDRESS),
348        ('first_anycast_address', PIP_ADAPTER_ANYCAST_ADDRESS),
349        ('first_multicast_address', PIP_ADAPTER_MULTICAST_ADDRESS),
350        ('first_dns_server_address', PIP_ADAPTER_DNS_SERVER_ADDRESS),
351        ('dns_suffix', ctypes.c_wchar_p),
352        ('description', ctypes.c_wchar_p),
353        ('friendly_name', ctypes.c_wchar_p),
354        ('physical_address', BYTE * MAX_ADAPTER_ADDRESS_LENGTH),
355        ('physical_address_length', ULONG),
356        ('flags', ULONG),
357        ('mtu', ULONG),
358        ('interface_type', DWORD),
359        ('oper_status', IF_OPER_STATUS),
360        ('ipv6_interface_index', DWORD),
361        ('zone_indices', ULONG * 16),
362        ('first_prefix', PIP_ADAPTER_PREFIX),
363    ]
364else:
365    IP_ADAPTER_ADDRESSES._fields_ = [
366        ('length', ULONG),
367        ('interface_index', DWORD),
368        ('next', LP_IP_ADAPTER_ADDRESSES),
369        ('adapter_name', ctypes.c_char_p),
370        ('first_unicast_address', PIP_ADAPTER_UNICAST_ADDRESS),
371        ('first_anycast_address', PIP_ADAPTER_ANYCAST_ADDRESS),
372        ('first_multicast_address', PIP_ADAPTER_MULTICAST_ADDRESS),
373        ('first_dns_server_address', PIP_ADAPTER_DNS_SERVER_ADDRESS),
374        ('dns_suffix', ctypes.c_wchar_p),
375        ('description', ctypes.c_wchar_p),
376        ('friendly_name', ctypes.c_wchar_p),
377        ('physical_address', BYTE * MAX_ADAPTER_ADDRESS_LENGTH),
378        ('physical_address_length', ULONG),
379        ('flags', ULONG),
380        ('mtu', ULONG),
381        ('interface_type', DWORD),
382        ('oper_status', IF_OPER_STATUS),
383        ('ipv6_interface_index', DWORD),
384        ('zone_indices', ULONG * 16),
385        ('first_prefix', PIP_ADAPTER_PREFIX),
386        ('transmit_link_speed', ULONGLONG),
387        ('receive_link_speed', ULONGLONG),
388        ('first_wins_server_address', PIP_ADAPTER_WINS_SERVER_ADDRESS_LH),
389        ('first_gateway_address', PIP_ADAPTER_GATEWAY_ADDRESS_LH),
390        ('ipv4_metric', ULONG),
391        ('ipv6_metric', ULONG),
392        ('luid', IF_LUID),
393        ('dhcpv4_server', SOCKET_ADDRESS),
394        ('compartment_id', NET_IF_COMPARTMENT_ID),
395        ('network_guid', NET_IF_NETWORK_GUID),
396        ('connection_type', NET_IF_CONNECTION_TYPE),
397        ('tunnel_type', TUNNEL_TYPE),
398        ('dhcpv6_server', SOCKET_ADDRESS),
399        ('dhcpv6_client_duid', BYTE * MAX_DHCPV6_DUID_LENGTH),
400        ('dhcpv6_client_duid_length', ULONG),
401        ('dhcpv6_iaid', ULONG),
402        ('first_dns_suffix', PIP_ADAPTER_DNS_SUFFIX)]
403
404# Func
405
406_GetAdaptersAddresses = WINFUNCTYPE(ULONG, ULONG, ULONG,
407                                    POINTER(VOID),
408                                    LP_IP_ADAPTER_ADDRESSES,
409                                    POINTER(ULONG))(
410                                        ('GetAdaptersAddresses', iphlpapi))
411
412
413def GetAdaptersAddresses(AF=AddressFamily.AF_UNSPEC):
414    # type: (int) -> List[Dict[str, Any]]
415    """Return all Windows Adapters addresses from iphlpapi"""
416    # We get the size first
417    size = ULONG()
418    flags = ULONG(GAA_FLAG_INCLUDE_PREFIX | GAA_FLAG_INCLUDE_ALL_INTERFACES)
419    res = _GetAdaptersAddresses(AF, flags,
420                                None, None,
421                                byref(size))
422    if res != 0x6f:  # BUFFER OVERFLOW -> populate size
423        raise RuntimeError("Error getting structure length (%d)" % res)
424    # Now let's build our buffer
425    pointer_type = POINTER(IP_ADAPTER_ADDRESSES)
426    buffer = create_string_buffer(size.value)
427    AdapterAddresses = ctypes.cast(buffer, pointer_type)
428    # And call GetAdaptersAddresses
429    res = _GetAdaptersAddresses(AF, flags,
430                                None, AdapterAddresses,
431                                byref(size))
432    if res != NO_ERROR:
433        raise RuntimeError("Error retrieving table (%d)" % res)
434    results = _resolve_list(AdapterAddresses)
435    del AdapterAddresses
436    return results
437
438##############################
439####### Routing tables #######
440##############################
441
442### V1 ###
443
444
445class MIB_IPFORWARDROW(Structure):
446    _fields_ = [('ForwardDest', DWORD),
447                ('ForwardMask', DWORD),
448                ('ForwardPolicy', DWORD),
449                ('ForwardNextHop', DWORD),
450                ('ForwardIfIndex', DWORD),
451                ('ForwardType', DWORD),
452                ('ForwardProto', DWORD),
453                ('ForwardAge', DWORD),
454                ('ForwardNextHopAS', DWORD),
455                ('ForwardMetric1', DWORD),
456                ('ForwardMetric2', DWORD),
457                ('ForwardMetric3', DWORD),
458                ('ForwardMetric4', DWORD),
459                ('ForwardMetric5', DWORD)]
460
461
462class MIB_IPFORWARDTABLE(Structure):
463    _fields_ = [('NumEntries', DWORD),
464                ('Table', MIB_IPFORWARDROW * ANY_SIZE)]
465
466
467PMIB_IPFORWARDTABLE = POINTER(MIB_IPFORWARDTABLE)
468
469# Func
470
471_GetIpForwardTable = WINFUNCTYPE(DWORD,
472                                 PMIB_IPFORWARDTABLE, POINTER(ULONG), BOOL)(
473                                     ('GetIpForwardTable', iphlpapi))
474
475
476def GetIpForwardTable():
477    # type: () -> List[Dict[str, Any]]
478    """Return all Windows routes (IPv4 only) from iphlpapi"""
479    # We get the size first
480    size = ULONG()
481    res = _GetIpForwardTable(None, byref(size), False)
482    if res != 0x7a:  # ERROR_INSUFFICIENT_BUFFER -> populate size
483        raise RuntimeError("Error getting structure length (%d)" % res)
484    # Now let's build our buffer
485    pointer_type = PMIB_IPFORWARDTABLE
486    buffer = create_string_buffer(size.value)
487    pIpForwardTable = ctypes.cast(buffer, pointer_type)
488    # And call GetAdaptersAddresses
489    res = _GetIpForwardTable(pIpForwardTable, byref(size), True)
490    if res != NO_ERROR:
491        raise RuntimeError("Error retrieving table (%d)" % res)
492    results = []
493    for i in range(pIpForwardTable.contents.NumEntries):
494        results.append(_struct_to_dict(pIpForwardTable.contents.Table[i]))
495    del pIpForwardTable
496    return results
497
498### V2 ###
499
500
501NET_IFINDEX = ULONG
502NL_ROUTE_PROTOCOL = INT
503NL_ROUTE_ORIGIN = INT
504
505
506class NET_LUID(Structure):
507    _fields_ = [("Value", ULONGLONG)]
508
509
510class IP_ADDRESS_PREFIX(Structure):
511    _fields_ = [("Prefix", SOCKADDR_INET),
512                ("PrefixLength", UINT8)]
513
514
515class MIB_IPFORWARD_ROW2(Structure):
516    _fields_ = [("InterfaceLuid", NET_LUID),
517                ("InterfaceIndex", NET_IFINDEX),
518                ("DestinationPrefix", IP_ADDRESS_PREFIX),
519                ("NextHop", SOCKADDR_INET),
520                ("SitePrefixLength", UCHAR),
521                ("ValidLifetime", ULONG),
522                ("PreferredLifetime", ULONG),
523                ("Metric", ULONG),
524                ("Protocol", NL_ROUTE_PROTOCOL),
525                ("Loopback", BOOLEAN),
526                ("AutoconfigureAddress", BOOLEAN),
527                ("Publish", BOOLEAN),
528                ("Immortal", BOOLEAN),
529                ("Age", ULONG),
530                ("Origin", NL_ROUTE_ORIGIN)]
531
532
533class MIB_IPFORWARD_TABLE2(Structure):
534    _fields_ = [("NumEntries", ULONG),
535                ("Table", MIB_IPFORWARD_ROW2 * ANY_SIZE)]
536
537
538PMIB_IPFORWARD_TABLE2 = POINTER(MIB_IPFORWARD_TABLE2)
539
540# Func
541
542if not WINDOWS_XP:
543    # GetIpForwardTable2 does not exist under Windows XP
544    _GetIpForwardTable2 = WINFUNCTYPE(
545        ULONG, USHORT,
546        POINTER(PMIB_IPFORWARD_TABLE2))(
547            ('GetIpForwardTable2', iphlpapi)
548    )
549    _FreeMibTable = WINFUNCTYPE(None, PMIB_IPFORWARD_TABLE2)(
550        ('FreeMibTable', iphlpapi)
551    )
552
553
554def GetIpForwardTable2(AF=AddressFamily.AF_UNSPEC):
555    # type: (AddressFamily) -> List[Dict[str, Any]]
556    """Return all Windows routes (IPv4/IPv6) from iphlpapi"""
557    if WINDOWS_XP:
558        raise OSError("Not available on Windows XP !")
559    table = PMIB_IPFORWARD_TABLE2()
560    res = _GetIpForwardTable2(AF, byref(table))
561    if res != NO_ERROR:
562        raise RuntimeError("Error retrieving table (%d)" % res)
563    results = []
564    for i in range(table.contents.NumEntries):
565        results.append(_struct_to_dict(table.contents.Table[i]))
566    _FreeMibTable(table)
567    return results
568
569
570##############
571#### FIFO ####
572##############
573
574class _SECURITY_ATTRIBUTES(Structure):
575    _fields_ = [("nLength", DWORD),
576                ("lpSecurityDescriptor", LPVOID),
577                ("bInheritHandle", BOOL)]
578
579
580LPSECURITY_ATTRIBUTES = POINTER(_SECURITY_ATTRIBUTES)
581
582
583def _get_win_fifo() -> Tuple[str, Any]:
584    """Create a windows fifo and returns the (client file, server fd)
585    """
586    from scapy.volatile import RandString
587    f = r"\\.\pipe\scapy%s" % str(RandString(6))
588    buffer = create_string_buffer(ctypes.sizeof(_SECURITY_ATTRIBUTES))
589    sec = ctypes.cast(buffer, LPSECURITY_ATTRIBUTES)
590    sec.contents.nLength = ctypes.sizeof(_SECURITY_ATTRIBUTES)
591    res = ctypes.windll.kernel32.CreateNamedPipeA(
592        create_string_buffer(f.encode()),
593        0x00000003 | 0x40000000,
594        0,
595        1, 65536, 65536,
596        300,
597        sec,
598    )
599    if res == -1:
600        raise OSError(ctypes.FormatError())
601    return f, res
602
603
604def _win_fifo_open(fd: Any) -> IO[bytes]:
605    """Connect NamedPipe and return a fake open() file
606    """
607    ctypes.windll.kernel32.ConnectNamedPipe(fd, None)
608
609    class _opened(IO[bytes]):
610        def read(self, x: int = MTU) -> bytes:
611            buf = ctypes.create_string_buffer(x)
612            res = ctypes.windll.kernel32.ReadFile(
613                fd,
614                buf,
615                x,
616                None,
617                None,
618            )
619            if res == 0:
620                raise OSError(ctypes.FormatError())
621            return buf.raw
622        def close(self) -> None:
623            # ignore failures
624            ctypes.windll.kernel32.CloseHandle(fd)
625    return _opened()  # type: ignore
626