1# SPDX-License-Identifier: GPL-2.0-or-later 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 5 6# flake8: noqa E266 7# (We keep comment boxes, it's then one-line comments) 8 9""" 10C API calls to Windows DLLs 11""" 12 13import ctypes 14import ctypes.wintypes 15from ctypes import ( 16 POINTER, 17 Structure, 18 WINFUNCTYPE, 19 byref, 20 create_string_buffer, 21) 22from socket import AddressFamily 23 24from scapy.config import conf 25from scapy.consts import WINDOWS_XP 26from scapy.data import MTU 27 28# Typing imports 29from typing import ( 30 Any, 31 Dict, 32 IO, 33 List, 34 Optional, 35 Tuple, 36) 37 38ANY_SIZE = 65500 # FIXME quite inefficient :/ 39NO_ERROR = 0x0 40 41CHAR = ctypes.c_char 42DWORD = ctypes.wintypes.DWORD 43BOOL = ctypes.wintypes.BOOL 44BOOLEAN = ctypes.wintypes.BOOLEAN 45ULONG = ctypes.wintypes.ULONG 46ULONGLONG = ctypes.c_ulonglong 47HANDLE = ctypes.wintypes.HANDLE 48LPVOID = ctypes.wintypes.LPVOID 49LPWSTR = ctypes.wintypes.LPWSTR 50VOID = ctypes.c_void_p 51INT = ctypes.c_int 52UINT = ctypes.wintypes.UINT 53UINT8 = ctypes.c_uint8 54UINT16 = ctypes.c_uint16 55UINT32 = ctypes.c_uint32 56UINT64 = ctypes.c_uint64 57BYTE = ctypes.c_byte 58UCHAR = UBYTE = ctypes.c_ubyte 59SHORT = ctypes.c_short 60USHORT = ctypes.c_ushort 61 62 63# UTILS 64 65 66def _resolve_list(list_obj): 67 # type: (Any) -> List[Dict[str, Any]] 68 current = list_obj 69 _list = [] 70 while current and hasattr(current, "contents"): 71 _list.append(_struct_to_dict(current.contents)) 72 current = current.contents.next 73 return _list 74 75 76def _struct_to_dict(struct_obj): 77 # type: (Any) -> Dict[str, Any] 78 results = {} # type: Dict[str, Any] 79 for fname, ctype in struct_obj.__class__._fields_: 80 val = getattr(struct_obj, fname) 81 if fname == "next": 82 # Already covered by the trick below 83 continue 84 if issubclass(ctype, (Structure, ctypes.Union)): 85 results[fname] = _struct_to_dict(val) 86 elif val and hasattr(val, "contents"): 87 # Let's resolve recursive pointers 88 if hasattr(val.contents, "next"): 89 results[fname] = _resolve_list(val) 90 else: 91 results[fname] = val 92 else: 93 results[fname] = val 94 return results 95 96############################## 97####### WinAPI handles ####### 98############################## 99 100_winapi_SetConsoleTitle = ctypes.windll.kernel32.SetConsoleTitleW 101_winapi_SetConsoleTitle.restype = BOOL 102_winapi_SetConsoleTitle.argtypes = [LPWSTR] 103 104def _windows_title(title=None): 105 # type: (Optional[str]) -> None 106 """ 107 Updates the terminal title with the default one or with `title` 108 if provided. 109 """ 110 if conf.interactive: 111 _winapi_SetConsoleTitle(title or "Scapy v{}".format(conf.version)) 112 113 114SC_HANDLE = HANDLE 115 116class SERVICE_STATUS(Structure): 117 """https://docs.microsoft.com/en-us/windows/desktop/api/winsvc/ns-winsvc-_service_status""" # noqa: E501 118 _fields_ = [("dwServiceType", DWORD), 119 ("dwCurrentState", DWORD), 120 ("dwControlsAccepted", DWORD), 121 ("dwWin32ExitCode", DWORD), 122 ("dwServiceSpecificExitCode", DWORD), 123 ("dwCheckPoint", DWORD), 124 ("dwWaitHint", DWORD)] 125 126 127OpenServiceW = ctypes.windll.Advapi32.OpenServiceW 128OpenServiceW.restype = SC_HANDLE 129OpenServiceW.argtypes = [SC_HANDLE, LPWSTR, DWORD] 130 131CloseServiceHandle = ctypes.windll.Advapi32.CloseServiceHandle 132CloseServiceHandle.restype = BOOL 133CloseServiceHandle.argtypes = [SC_HANDLE] 134 135OpenSCManagerW = ctypes.windll.Advapi32.OpenSCManagerW 136OpenSCManagerW.restype = SC_HANDLE 137OpenSCManagerW.argtypes = [LPWSTR, LPWSTR, DWORD] 138 139QueryServiceStatus = ctypes.windll.Advapi32.QueryServiceStatus 140QueryServiceStatus.restype = BOOL 141QueryServiceStatus.argtypes = [SC_HANDLE, POINTER(SERVICE_STATUS)] 142 143def get_service_status(service): 144 # type: (str) -> Dict[str, int] 145 """Returns content of QueryServiceStatus for a service""" 146 SERVICE_QUERY_STATUS = 0x0004 147 schSCManager = OpenSCManagerW( 148 None, # Local machine 149 None, # SERVICES_ACTIVE_DATABASE 150 SERVICE_QUERY_STATUS 151 ) 152 service = OpenServiceW( 153 schSCManager, 154 service, 155 SERVICE_QUERY_STATUS 156 ) 157 status = SERVICE_STATUS() 158 QueryServiceStatus( 159 service, 160 status 161 ) 162 result = _struct_to_dict(status) 163 CloseServiceHandle(service) 164 CloseServiceHandle(schSCManager) 165 return result 166 167 168############################## 169###### Define IPHLPAPI ###### 170############################## 171 172 173iphlpapi = ctypes.windll.iphlpapi 174 175############################## 176########### Common ########### 177############################## 178 179 180class in_addr(Structure): 181 _fields_ = [("byte", UBYTE * 4)] 182 183 184class in6_addr(Structure): 185 _fields_ = [("byte", UBYTE * 16)] 186 187 188class sockaddr_in(Structure): 189 _fields_ = [("sin_family", SHORT), 190 ("sin_port", USHORT), 191 ("sin_addr", in_addr), 192 ("sin_zero", 8 * CHAR)] 193 194 195class sockaddr_in6(Structure): 196 _fields_ = [("sin6_family", SHORT), 197 ("sin6_port", USHORT), 198 ("sin6_flowinfo", ULONG), 199 ("sin6_addr", in6_addr), 200 ("sin6_scope_id", ULONG)] 201 202 203class SOCKADDR_INET(ctypes.Union): 204 _fields_ = [("Ipv4", sockaddr_in), 205 ("Ipv6", sockaddr_in6), 206 ("si_family", USHORT)] 207 208 209############################## 210##### Adapters Addresses ##### 211############################## 212 213 214# Our GetAdaptersAddresses implementation is inspired by 215# @sphaero 's gist: https://gist.github.com/sphaero/f9da6ebb9a7a6f679157 216# published under a MPL 2.0 License (GPLv2 compatible) 217 218# from iptypes.h 219MAX_ADAPTER_ADDRESS_LENGTH = 8 220MAX_DHCPV6_DUID_LENGTH = 130 221 222GAA_FLAG_INCLUDE_PREFIX = 0x0010 223GAA_FLAG_INCLUDE_ALL_INTERFACES = 0x0100 224# for now, just use void * for pointers to unused structures 225PIP_ADAPTER_WINS_SERVER_ADDRESS_LH = VOID 226PIP_ADAPTER_GATEWAY_ADDRESS_LH = VOID 227PIP_ADAPTER_DNS_SUFFIX = VOID 228 229IF_OPER_STATUS = UINT 230IF_LUID = UINT64 231 232NET_IF_COMPARTMENT_ID = UINT32 233GUID = BYTE * 16 234NET_IF_NETWORK_GUID = GUID 235NET_IF_CONNECTION_TYPE = UINT # enum 236TUNNEL_TYPE = UINT # enum 237 238 239class SOCKET_ADDRESS(ctypes.Structure): 240 _fields_ = [('address', POINTER(SOCKADDR_INET)), 241 ('length', INT)] 242 243 244class _IP_ADAPTER_ADDRESSES_METRIC(Structure): 245 _fields_ = [('length', ULONG), 246 ('interface_index', DWORD)] 247 248 249class IP_ADAPTER_UNICAST_ADDRESS(Structure): 250 pass 251 252 253PIP_ADAPTER_UNICAST_ADDRESS = POINTER(IP_ADAPTER_UNICAST_ADDRESS) 254if WINDOWS_XP: 255 IP_ADAPTER_UNICAST_ADDRESS._fields_ = [ 256 ("length", ULONG), 257 ("flags", DWORD), 258 ("next", PIP_ADAPTER_UNICAST_ADDRESS), 259 ("address", SOCKET_ADDRESS), 260 ("prefix_origin", INT), 261 ("suffix_origin", INT), 262 ("dad_state", INT), 263 ("valid_lifetime", ULONG), 264 ("preferred_lifetime", ULONG), 265 ("lease_lifetime", ULONG), 266 ] 267else: 268 IP_ADAPTER_UNICAST_ADDRESS._fields_ = [ 269 ("length", ULONG), 270 ("flags", DWORD), 271 ("next", PIP_ADAPTER_UNICAST_ADDRESS), 272 ("address", SOCKET_ADDRESS), 273 ("prefix_origin", INT), 274 ("suffix_origin", INT), 275 ("dad_state", INT), 276 ("valid_lifetime", ULONG), 277 ("preferred_lifetime", ULONG), 278 ("lease_lifetime", ULONG), 279 ("on_link_prefix_length", UBYTE) 280 ] 281 282 283class IP_ADAPTER_ANYCAST_ADDRESS(Structure): 284 pass 285 286 287PIP_ADAPTER_ANYCAST_ADDRESS = POINTER(IP_ADAPTER_ANYCAST_ADDRESS) 288IP_ADAPTER_ANYCAST_ADDRESS._fields_ = [ 289 ("length", ULONG), 290 ("flags", DWORD), 291 ("next", PIP_ADAPTER_ANYCAST_ADDRESS), 292 ("address", SOCKET_ADDRESS), 293] 294 295 296class IP_ADAPTER_MULTICAST_ADDRESS(Structure): 297 pass 298 299 300PIP_ADAPTER_MULTICAST_ADDRESS = POINTER(IP_ADAPTER_MULTICAST_ADDRESS) 301IP_ADAPTER_MULTICAST_ADDRESS._fields_ = [ 302 ("length", ULONG), 303 ("flags", DWORD), 304 ("next", PIP_ADAPTER_MULTICAST_ADDRESS), 305 ("address", SOCKET_ADDRESS), 306] 307 308 309class IP_ADAPTER_DNS_SERVER_ADDRESS(Structure): 310 pass 311 312 313PIP_ADAPTER_DNS_SERVER_ADDRESS = POINTER(IP_ADAPTER_DNS_SERVER_ADDRESS) 314IP_ADAPTER_DNS_SERVER_ADDRESS._fields_ = [ 315 ("length", ULONG), 316 ("flags", DWORD), 317 ("next", PIP_ADAPTER_DNS_SERVER_ADDRESS), 318 ("address", SOCKET_ADDRESS), 319] 320 321 322class IP_ADAPTER_PREFIX(Structure): 323 pass 324 325 326PIP_ADAPTER_PREFIX = ctypes.POINTER(IP_ADAPTER_PREFIX) 327IP_ADAPTER_PREFIX._fields_ = [ 328 ("alignment", ULONGLONG), 329 ("next", PIP_ADAPTER_PREFIX), 330 ("address", SOCKET_ADDRESS), 331 ("prefix_length", ULONG) 332] 333 334 335class IP_ADAPTER_ADDRESSES(Structure): 336 pass 337 338 339LP_IP_ADAPTER_ADDRESSES = POINTER(IP_ADAPTER_ADDRESSES) 340 341if WINDOWS_XP: 342 IP_ADAPTER_ADDRESSES._fields_ = [ 343 ('length', ULONG), 344 ('interface_index', DWORD), 345 ('next', LP_IP_ADAPTER_ADDRESSES), 346 ('adapter_name', ctypes.c_char_p), 347 ('first_unicast_address', PIP_ADAPTER_UNICAST_ADDRESS), 348 ('first_anycast_address', PIP_ADAPTER_ANYCAST_ADDRESS), 349 ('first_multicast_address', PIP_ADAPTER_MULTICAST_ADDRESS), 350 ('first_dns_server_address', PIP_ADAPTER_DNS_SERVER_ADDRESS), 351 ('dns_suffix', ctypes.c_wchar_p), 352 ('description', ctypes.c_wchar_p), 353 ('friendly_name', ctypes.c_wchar_p), 354 ('physical_address', BYTE * MAX_ADAPTER_ADDRESS_LENGTH), 355 ('physical_address_length', ULONG), 356 ('flags', ULONG), 357 ('mtu', ULONG), 358 ('interface_type', DWORD), 359 ('oper_status', IF_OPER_STATUS), 360 ('ipv6_interface_index', DWORD), 361 ('zone_indices', ULONG * 16), 362 ('first_prefix', PIP_ADAPTER_PREFIX), 363 ] 364else: 365 IP_ADAPTER_ADDRESSES._fields_ = [ 366 ('length', ULONG), 367 ('interface_index', DWORD), 368 ('next', LP_IP_ADAPTER_ADDRESSES), 369 ('adapter_name', ctypes.c_char_p), 370 ('first_unicast_address', PIP_ADAPTER_UNICAST_ADDRESS), 371 ('first_anycast_address', PIP_ADAPTER_ANYCAST_ADDRESS), 372 ('first_multicast_address', PIP_ADAPTER_MULTICAST_ADDRESS), 373 ('first_dns_server_address', PIP_ADAPTER_DNS_SERVER_ADDRESS), 374 ('dns_suffix', ctypes.c_wchar_p), 375 ('description', ctypes.c_wchar_p), 376 ('friendly_name', ctypes.c_wchar_p), 377 ('physical_address', BYTE * MAX_ADAPTER_ADDRESS_LENGTH), 378 ('physical_address_length', ULONG), 379 ('flags', ULONG), 380 ('mtu', ULONG), 381 ('interface_type', DWORD), 382 ('oper_status', IF_OPER_STATUS), 383 ('ipv6_interface_index', DWORD), 384 ('zone_indices', ULONG * 16), 385 ('first_prefix', PIP_ADAPTER_PREFIX), 386 ('transmit_link_speed', ULONGLONG), 387 ('receive_link_speed', ULONGLONG), 388 ('first_wins_server_address', PIP_ADAPTER_WINS_SERVER_ADDRESS_LH), 389 ('first_gateway_address', PIP_ADAPTER_GATEWAY_ADDRESS_LH), 390 ('ipv4_metric', ULONG), 391 ('ipv6_metric', ULONG), 392 ('luid', IF_LUID), 393 ('dhcpv4_server', SOCKET_ADDRESS), 394 ('compartment_id', NET_IF_COMPARTMENT_ID), 395 ('network_guid', NET_IF_NETWORK_GUID), 396 ('connection_type', NET_IF_CONNECTION_TYPE), 397 ('tunnel_type', TUNNEL_TYPE), 398 ('dhcpv6_server', SOCKET_ADDRESS), 399 ('dhcpv6_client_duid', BYTE * MAX_DHCPV6_DUID_LENGTH), 400 ('dhcpv6_client_duid_length', ULONG), 401 ('dhcpv6_iaid', ULONG), 402 ('first_dns_suffix', PIP_ADAPTER_DNS_SUFFIX)] 403 404# Func 405 406_GetAdaptersAddresses = WINFUNCTYPE(ULONG, ULONG, ULONG, 407 POINTER(VOID), 408 LP_IP_ADAPTER_ADDRESSES, 409 POINTER(ULONG))( 410 ('GetAdaptersAddresses', iphlpapi)) 411 412 413def GetAdaptersAddresses(AF=AddressFamily.AF_UNSPEC): 414 # type: (int) -> List[Dict[str, Any]] 415 """Return all Windows Adapters addresses from iphlpapi""" 416 # We get the size first 417 size = ULONG() 418 flags = ULONG(GAA_FLAG_INCLUDE_PREFIX | GAA_FLAG_INCLUDE_ALL_INTERFACES) 419 res = _GetAdaptersAddresses(AF, flags, 420 None, None, 421 byref(size)) 422 if res != 0x6f: # BUFFER OVERFLOW -> populate size 423 raise RuntimeError("Error getting structure length (%d)" % res) 424 # Now let's build our buffer 425 pointer_type = POINTER(IP_ADAPTER_ADDRESSES) 426 buffer = create_string_buffer(size.value) 427 AdapterAddresses = ctypes.cast(buffer, pointer_type) 428 # And call GetAdaptersAddresses 429 res = _GetAdaptersAddresses(AF, flags, 430 None, AdapterAddresses, 431 byref(size)) 432 if res != NO_ERROR: 433 raise RuntimeError("Error retrieving table (%d)" % res) 434 results = _resolve_list(AdapterAddresses) 435 del AdapterAddresses 436 return results 437 438############################## 439####### Routing tables ####### 440############################## 441 442### V1 ### 443 444 445class MIB_IPFORWARDROW(Structure): 446 _fields_ = [('ForwardDest', DWORD), 447 ('ForwardMask', DWORD), 448 ('ForwardPolicy', DWORD), 449 ('ForwardNextHop', DWORD), 450 ('ForwardIfIndex', DWORD), 451 ('ForwardType', DWORD), 452 ('ForwardProto', DWORD), 453 ('ForwardAge', DWORD), 454 ('ForwardNextHopAS', DWORD), 455 ('ForwardMetric1', DWORD), 456 ('ForwardMetric2', DWORD), 457 ('ForwardMetric3', DWORD), 458 ('ForwardMetric4', DWORD), 459 ('ForwardMetric5', DWORD)] 460 461 462class MIB_IPFORWARDTABLE(Structure): 463 _fields_ = [('NumEntries', DWORD), 464 ('Table', MIB_IPFORWARDROW * ANY_SIZE)] 465 466 467PMIB_IPFORWARDTABLE = POINTER(MIB_IPFORWARDTABLE) 468 469# Func 470 471_GetIpForwardTable = WINFUNCTYPE(DWORD, 472 PMIB_IPFORWARDTABLE, POINTER(ULONG), BOOL)( 473 ('GetIpForwardTable', iphlpapi)) 474 475 476def GetIpForwardTable(): 477 # type: () -> List[Dict[str, Any]] 478 """Return all Windows routes (IPv4 only) from iphlpapi""" 479 # We get the size first 480 size = ULONG() 481 res = _GetIpForwardTable(None, byref(size), False) 482 if res != 0x7a: # ERROR_INSUFFICIENT_BUFFER -> populate size 483 raise RuntimeError("Error getting structure length (%d)" % res) 484 # Now let's build our buffer 485 pointer_type = PMIB_IPFORWARDTABLE 486 buffer = create_string_buffer(size.value) 487 pIpForwardTable = ctypes.cast(buffer, pointer_type) 488 # And call GetAdaptersAddresses 489 res = _GetIpForwardTable(pIpForwardTable, byref(size), True) 490 if res != NO_ERROR: 491 raise RuntimeError("Error retrieving table (%d)" % res) 492 results = [] 493 for i in range(pIpForwardTable.contents.NumEntries): 494 results.append(_struct_to_dict(pIpForwardTable.contents.Table[i])) 495 del pIpForwardTable 496 return results 497 498### V2 ### 499 500 501NET_IFINDEX = ULONG 502NL_ROUTE_PROTOCOL = INT 503NL_ROUTE_ORIGIN = INT 504 505 506class NET_LUID(Structure): 507 _fields_ = [("Value", ULONGLONG)] 508 509 510class IP_ADDRESS_PREFIX(Structure): 511 _fields_ = [("Prefix", SOCKADDR_INET), 512 ("PrefixLength", UINT8)] 513 514 515class MIB_IPFORWARD_ROW2(Structure): 516 _fields_ = [("InterfaceLuid", NET_LUID), 517 ("InterfaceIndex", NET_IFINDEX), 518 ("DestinationPrefix", IP_ADDRESS_PREFIX), 519 ("NextHop", SOCKADDR_INET), 520 ("SitePrefixLength", UCHAR), 521 ("ValidLifetime", ULONG), 522 ("PreferredLifetime", ULONG), 523 ("Metric", ULONG), 524 ("Protocol", NL_ROUTE_PROTOCOL), 525 ("Loopback", BOOLEAN), 526 ("AutoconfigureAddress", BOOLEAN), 527 ("Publish", BOOLEAN), 528 ("Immortal", BOOLEAN), 529 ("Age", ULONG), 530 ("Origin", NL_ROUTE_ORIGIN)] 531 532 533class MIB_IPFORWARD_TABLE2(Structure): 534 _fields_ = [("NumEntries", ULONG), 535 ("Table", MIB_IPFORWARD_ROW2 * ANY_SIZE)] 536 537 538PMIB_IPFORWARD_TABLE2 = POINTER(MIB_IPFORWARD_TABLE2) 539 540# Func 541 542if not WINDOWS_XP: 543 # GetIpForwardTable2 does not exist under Windows XP 544 _GetIpForwardTable2 = WINFUNCTYPE( 545 ULONG, USHORT, 546 POINTER(PMIB_IPFORWARD_TABLE2))( 547 ('GetIpForwardTable2', iphlpapi) 548 ) 549 _FreeMibTable = WINFUNCTYPE(None, PMIB_IPFORWARD_TABLE2)( 550 ('FreeMibTable', iphlpapi) 551 ) 552 553 554def GetIpForwardTable2(AF=AddressFamily.AF_UNSPEC): 555 # type: (AddressFamily) -> List[Dict[str, Any]] 556 """Return all Windows routes (IPv4/IPv6) from iphlpapi""" 557 if WINDOWS_XP: 558 raise OSError("Not available on Windows XP !") 559 table = PMIB_IPFORWARD_TABLE2() 560 res = _GetIpForwardTable2(AF, byref(table)) 561 if res != NO_ERROR: 562 raise RuntimeError("Error retrieving table (%d)" % res) 563 results = [] 564 for i in range(table.contents.NumEntries): 565 results.append(_struct_to_dict(table.contents.Table[i])) 566 _FreeMibTable(table) 567 return results 568 569 570############## 571#### FIFO #### 572############## 573 574class _SECURITY_ATTRIBUTES(Structure): 575 _fields_ = [("nLength", DWORD), 576 ("lpSecurityDescriptor", LPVOID), 577 ("bInheritHandle", BOOL)] 578 579 580LPSECURITY_ATTRIBUTES = POINTER(_SECURITY_ATTRIBUTES) 581 582 583def _get_win_fifo() -> Tuple[str, Any]: 584 """Create a windows fifo and returns the (client file, server fd) 585 """ 586 from scapy.volatile import RandString 587 f = r"\\.\pipe\scapy%s" % str(RandString(6)) 588 buffer = create_string_buffer(ctypes.sizeof(_SECURITY_ATTRIBUTES)) 589 sec = ctypes.cast(buffer, LPSECURITY_ATTRIBUTES) 590 sec.contents.nLength = ctypes.sizeof(_SECURITY_ATTRIBUTES) 591 res = ctypes.windll.kernel32.CreateNamedPipeA( 592 create_string_buffer(f.encode()), 593 0x00000003 | 0x40000000, 594 0, 595 1, 65536, 65536, 596 300, 597 sec, 598 ) 599 if res == -1: 600 raise OSError(ctypes.FormatError()) 601 return f, res 602 603 604def _win_fifo_open(fd: Any) -> IO[bytes]: 605 """Connect NamedPipe and return a fake open() file 606 """ 607 ctypes.windll.kernel32.ConnectNamedPipe(fd, None) 608 609 class _opened(IO[bytes]): 610 def read(self, x: int = MTU) -> bytes: 611 buf = ctypes.create_string_buffer(x) 612 res = ctypes.windll.kernel32.ReadFile( 613 fd, 614 buf, 615 x, 616 None, 617 None, 618 ) 619 if res == 0: 620 raise OSError(ctypes.FormatError()) 621 return buf.raw 622 def close(self) -> None: 623 # ignore failures 624 ctypes.windll.kernel32.CloseHandle(fd) 625 return _opened() # type: ignore 626