1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3#*************************************************************************** 4# _ _ ____ _ 5# Project ___| | | | _ \| | 6# / __| | | | |_) | | 7# | (__| |_| | _ <| |___ 8# \___|\___/|_| \_\_____| 9# 10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 11# 12# This software is licensed as described in the file COPYING, which 13# you should have received as part of this distribution. The terms 14# are also available at https://curl.se/docs/copyright.html. 15# 16# You may opt to use, copy, modify, merge, publish, distribute and/or sell 17# copies of the Software, and permit persons to whom the Software is 18# furnished to do so, under the terms of the COPYING file. 19# 20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 21# KIND, either express or implied. 22# 23# SPDX-License-Identifier: curl 24# 25########################################################################### 26# 27import logging 28import os 29import re 30import socket 31import subprocess 32import sys 33from configparser import ConfigParser, ExtendedInterpolation 34from typing import Optional 35 36from .certs import CertificateSpec, TestCA, Credentials 37from .ports import alloc_ports 38 39 40log = logging.getLogger(__name__) 41 42 43def init_config_from(conf_path): 44 if os.path.isfile(conf_path): 45 config = ConfigParser(interpolation=ExtendedInterpolation()) 46 config.read(conf_path) 47 return config 48 return None 49 50 51TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__)) 52DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini')) 53 54TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH)) 55CURL = os.path.join(TOP_PATH, 'src/curl') 56 57 58class EnvConfig: 59 60 def __init__(self): 61 self.tests_dir = TESTS_HTTPD_PATH 62 self.gen_dir = os.path.join(self.tests_dir, 'gen') 63 self.config = DEF_CONFIG 64 # check cur and its features 65 self.curl = CURL 66 self.curl_props = { 67 'version': None, 68 'os': None, 69 'features': [], 70 'protocols': [], 71 'libs': [], 72 'lib_versions': [], 73 } 74 self.curl_protos = [] 75 p = subprocess.run(args=[self.curl, '-V'], 76 capture_output=True, text=True) 77 if p.returncode != 0: 78 assert False, f'{self.curl} -V failed with exit code: {p.returncode}' 79 for l in p.stdout.splitlines(keepends=False): 80 if l.startswith('curl '): 81 m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l) 82 if m: 83 self.curl_props['version'] = m.group('version') 84 self.curl_props['os'] = m.group('os') 85 self.curl_props['lib_versions'] = [ 86 lib.lower() for lib in m.group('libs').split(' ') 87 ] 88 self.curl_props['libs'] = [ 89 re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions'] 90 ] 91 if l.startswith('Features: '): 92 self.curl_props['features'] = [ 93 feat.lower() for feat in l[10:].split(' ') 94 ] 95 if l.startswith('Protocols: '): 96 self.curl_props['protocols'] = [ 97 prot.lower() for prot in l[11:].split(' ') 98 ] 99 100 self.ports = alloc_ports(port_specs={ 101 'http': socket.SOCK_STREAM, 102 'https': socket.SOCK_STREAM, 103 'proxy': socket.SOCK_STREAM, 104 'proxys': socket.SOCK_STREAM, 105 'caddy': socket.SOCK_STREAM, 106 'caddys': socket.SOCK_STREAM, 107 }) 108 self.httpd = self.config['httpd']['httpd'] 109 self.apachectl = self.config['httpd']['apachectl'] 110 self.apxs = self.config['httpd']['apxs'] 111 if len(self.apxs) == 0: 112 self.apxs = None 113 self._httpd_version = None 114 115 self.examples_pem = { 116 'key': 'xxx', 117 'cert': 'xxx', 118 } 119 self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs') 120 self.tld = 'http.curl.se' 121 self.domain1 = f"one.{self.tld}" 122 self.domain2 = f"two.{self.tld}" 123 self.proxy_domain = f"proxy.{self.tld}" 124 self.cert_specs = [ 125 CertificateSpec(domains=[self.domain1, 'localhost'], key_type='rsa2048'), 126 CertificateSpec(domains=[self.domain2], key_type='rsa2048'), 127 CertificateSpec(domains=[self.proxy_domain], key_type='rsa2048'), 128 CertificateSpec(name="clientsX", sub_specs=[ 129 CertificateSpec(name="user1", client=True), 130 ]), 131 ] 132 133 self.nghttpx = self.config['nghttpx']['nghttpx'] 134 if len(self.nghttpx.strip()) == 0: 135 self.nghttpx = None 136 self._nghttpx_version = None 137 self.nghttpx_with_h3 = False 138 if self.nghttpx is not None: 139 p = subprocess.run(args=[self.nghttpx, '-v'], 140 capture_output=True, text=True) 141 if p.returncode != 0: 142 # not a working nghttpx 143 self.nghttpx = None 144 else: 145 self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip()) 146 self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None 147 log.debug(f'nghttpx -v: {p.stdout}') 148 149 self.caddy = self.config['caddy']['caddy'] 150 self._caddy_version = None 151 if len(self.caddy.strip()) == 0: 152 self.caddy = None 153 if self.caddy is not None: 154 try: 155 p = subprocess.run(args=[self.caddy, 'version'], 156 capture_output=True, text=True) 157 if p.returncode != 0: 158 # not a working caddy 159 self.caddy = None 160 self._caddy_version = re.sub(r' .*', '', p.stdout.strip()) 161 except: 162 self.caddy = None 163 164 @property 165 def httpd_version(self): 166 if self._httpd_version is None and self.apxs is not None: 167 p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'], 168 capture_output=True, text=True) 169 if p.returncode != 0: 170 raise Exception(f'{self.apxs} failed to query HTTPD_VERSION: {p}') 171 self._httpd_version = p.stdout.strip() 172 return self._httpd_version 173 174 def _versiontuple(self, v): 175 v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) 176 return tuple(map(int, v.split('.'))) 177 178 def httpd_is_at_least(self, minv): 179 hv = self._versiontuple(self.httpd_version) 180 return hv >= self._versiontuple(minv) 181 182 def is_complete(self) -> bool: 183 return os.path.isfile(self.httpd) and \ 184 os.path.isfile(self.apachectl) and \ 185 self.apxs is not None and \ 186 os.path.isfile(self.apxs) 187 188 def get_incomplete_reason(self) -> Optional[str]: 189 if not os.path.isfile(self.httpd): 190 return f'httpd ({self.httpd}) not found' 191 if not os.path.isfile(self.apachectl): 192 return f'apachectl ({self.apachectl}) not found' 193 if self.apxs is None: 194 return f"apxs (provided by apache2-dev) not found" 195 if not os.path.isfile(self.apxs): 196 return f"apxs ({self.apxs}) not found" 197 return None 198 199 @property 200 def nghttpx_version(self): 201 return self._nghttpx_version 202 203 @property 204 def caddy_version(self): 205 return self._caddy_version 206 207 208class Env: 209 210 CONFIG = EnvConfig() 211 212 @staticmethod 213 def setup_incomplete() -> bool: 214 return not Env.CONFIG.is_complete() 215 216 @staticmethod 217 def incomplete_reason() -> Optional[str]: 218 return Env.CONFIG.get_incomplete_reason() 219 220 @staticmethod 221 def have_h3_server() -> bool: 222 return Env.CONFIG.nghttpx_with_h3 223 224 @staticmethod 225 def have_h2_curl() -> bool: 226 return 'http2' in Env.CONFIG.curl_props['features'] 227 228 @staticmethod 229 def have_h3_curl() -> bool: 230 return 'http3' in Env.CONFIG.curl_props['features'] 231 232 @staticmethod 233 def curl_uses_lib(libname: str) -> bool: 234 return libname.lower() in Env.CONFIG.curl_props['libs'] 235 236 @staticmethod 237 def curl_has_feature(feature: str) -> bool: 238 return feature.lower() in Env.CONFIG.curl_props['features'] 239 240 @staticmethod 241 def curl_lib_version(libname: str) -> str: 242 prefix = f'{libname.lower()}/' 243 for lversion in Env.CONFIG.curl_props['lib_versions']: 244 if lversion.startswith(prefix): 245 return lversion[len(prefix):] 246 return 'unknown' 247 248 @staticmethod 249 def curl_os() -> str: 250 return Env.CONFIG.curl_props['os'] 251 252 @staticmethod 253 def curl_version() -> str: 254 return Env.CONFIG.curl_props['version'] 255 256 @staticmethod 257 def have_h3() -> bool: 258 return Env.have_h3_curl() and Env.have_h3_server() 259 260 @staticmethod 261 def httpd_version() -> str: 262 return Env.CONFIG.httpd_version 263 264 @staticmethod 265 def nghttpx_version() -> str: 266 return Env.CONFIG.nghttpx_version 267 268 @staticmethod 269 def caddy_version() -> str: 270 return Env.CONFIG.caddy_version 271 272 @staticmethod 273 def httpd_is_at_least(minv) -> bool: 274 return Env.CONFIG.httpd_is_at_least(minv) 275 276 @staticmethod 277 def has_caddy() -> bool: 278 return Env.CONFIG.caddy is not None 279 280 def __init__(self, pytestconfig=None): 281 self._verbose = pytestconfig.option.verbose \ 282 if pytestconfig is not None else 0 283 self._ca = None 284 285 def issue_certs(self): 286 if self._ca is None: 287 ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca') 288 self._ca = TestCA.create_root(name=self.CONFIG.tld, 289 store_dir=ca_dir, 290 key_type="rsa2048") 291 self._ca.issue_certs(self.CONFIG.cert_specs) 292 293 def setup(self): 294 os.makedirs(self.gen_dir, exist_ok=True) 295 os.makedirs(self.htdocs_dir, exist_ok=True) 296 self.issue_certs() 297 298 def get_credentials(self, domain) -> Optional[Credentials]: 299 creds = self.ca.get_credentials_for_name(domain) 300 if len(creds) > 0: 301 return creds[0] 302 return None 303 304 @property 305 def verbose(self) -> int: 306 return self._verbose 307 308 @property 309 def gen_dir(self) -> str: 310 return self.CONFIG.gen_dir 311 312 @property 313 def ca(self): 314 return self._ca 315 316 @property 317 def htdocs_dir(self) -> str: 318 return self.CONFIG.htdocs_dir 319 320 @property 321 def domain1(self) -> str: 322 return self.CONFIG.domain1 323 324 @property 325 def domain2(self) -> str: 326 return self.CONFIG.domain2 327 328 @property 329 def proxy_domain(self) -> str: 330 return self.CONFIG.proxy_domain 331 332 @property 333 def http_port(self) -> int: 334 return self.CONFIG.ports['http'] 335 336 @property 337 def https_port(self) -> int: 338 return self.CONFIG.ports['https'] 339 340 @property 341 def h3_port(self) -> int: 342 return self.https_port 343 344 @property 345 def proxy_port(self) -> str: 346 return self.CONFIG.ports['proxy'] 347 348 @property 349 def proxys_port(self) -> str: 350 return self.CONFIG.ports['proxys'] 351 352 @property 353 def caddy(self) -> str: 354 return self.CONFIG.caddy 355 356 @property 357 def caddy_https_port(self) -> int: 358 return self.CONFIG.ports['caddys'] 359 360 @property 361 def caddy_http_port(self) -> int: 362 return self.CONFIG.ports['caddy'] 363 364 @property 365 def curl(self) -> str: 366 return self.CONFIG.curl 367 368 @property 369 def httpd(self) -> str: 370 return self.CONFIG.httpd 371 372 @property 373 def apachectl(self) -> str: 374 return self.CONFIG.apachectl 375 376 @property 377 def apxs(self) -> str: 378 return self.CONFIG.apxs 379 380 @property 381 def nghttpx(self) -> Optional[str]: 382 return self.CONFIG.nghttpx 383 384 def authority_for(self, domain: str, alpn_proto: Optional[str] = None): 385 if alpn_proto is None or \ 386 alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']: 387 return f'{domain}:{self.https_port}' 388 if alpn_proto in ['h3']: 389 return f'{domain}:{self.h3_port}' 390 return f'{domain}:{self.http_port}' 391 392 def make_data_file(self, indir: str, fname: str, fsize: int) -> str: 393 fpath = os.path.join(indir, fname) 394 s10 = "0123456789" 395 s = (101 * s10) + s10[0:3] 396 with open(fpath, 'w') as fd: 397 for i in range(int(fsize / 1024)): 398 fd.write(f"{i:09d}-{s}\n") 399 remain = int(fsize % 1024) 400 if remain != 0: 401 i = int(fsize / 1024) + 1 402 s = f"{i:09d}-{s}\n" 403 fd.write(s[0:remain]) 404 return fpath 405