• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#***************************************************************************
4#                                  _   _ ____  _
5#  Project                     ___| | | |  _ \| |
6#                             / __| | | | |_) | |
7#                            | (__| |_| |  _ <| |___
8#                             \___|\___/|_| \_\_____|
9#
10# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11#
12# This software is licensed as described in the file COPYING, which
13# you should have received as part of this distribution. The terms
14# are also available at https://curl.se/docs/copyright.html.
15#
16# You may opt to use, copy, modify, merge, publish, distribute and/or sell
17# copies of the Software, and permit persons to whom the Software is
18# furnished to do so, under the terms of the COPYING file.
19#
20# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21# KIND, either express or implied.
22#
23# SPDX-License-Identifier: curl
24#
25###########################################################################
26#
27import logging
28import os
29import re
30import socket
31import subprocess
32import sys
33from configparser import ConfigParser, ExtendedInterpolation
34from typing import Optional
35
36from .certs import CertificateSpec, TestCA, Credentials
37from .ports import alloc_ports
38
39
40log = logging.getLogger(__name__)
41
42
43def init_config_from(conf_path):
44    if os.path.isfile(conf_path):
45        config = ConfigParser(interpolation=ExtendedInterpolation())
46        config.read(conf_path)
47        return config
48    return None
49
50
51TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
52DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini'))
53
54TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
55CURL = os.path.join(TOP_PATH, 'src/curl')
56
57
58class EnvConfig:
59
60    def __init__(self):
61        self.tests_dir = TESTS_HTTPD_PATH
62        self.gen_dir = os.path.join(self.tests_dir, 'gen')
63        self.config = DEF_CONFIG
64        # check cur and its features
65        self.curl = CURL
66        self.curl_props = {
67            'version': None,
68            'os': None,
69            'features': [],
70            'protocols': [],
71            'libs': [],
72            'lib_versions': [],
73        }
74        self.curl_protos = []
75        p = subprocess.run(args=[self.curl, '-V'],
76                           capture_output=True, text=True)
77        if p.returncode != 0:
78            assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
79        for l in p.stdout.splitlines(keepends=False):
80            if l.startswith('curl '):
81                m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
82                if m:
83                    self.curl_props['version'] = m.group('version')
84                    self.curl_props['os'] = m.group('os')
85                    self.curl_props['lib_versions'] = [
86                        lib.lower() for lib in m.group('libs').split(' ')
87                    ]
88                    self.curl_props['libs'] = [
89                        re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions']
90                    ]
91            if l.startswith('Features: '):
92                self.curl_props['features'] = [
93                    feat.lower() for feat in l[10:].split(' ')
94                ]
95            if l.startswith('Protocols: '):
96                self.curl_props['protocols'] = [
97                    prot.lower() for prot in l[11:].split(' ')
98                ]
99
100        self.ports = alloc_ports(port_specs={
101            'http': socket.SOCK_STREAM,
102            'https': socket.SOCK_STREAM,
103            'proxy': socket.SOCK_STREAM,
104            'proxys': socket.SOCK_STREAM,
105            'caddy': socket.SOCK_STREAM,
106            'caddys': socket.SOCK_STREAM,
107        })
108        self.httpd = self.config['httpd']['httpd']
109        self.apachectl = self.config['httpd']['apachectl']
110        self.apxs = self.config['httpd']['apxs']
111        if len(self.apxs) == 0:
112            self.apxs = None
113        self._httpd_version = None
114
115        self.examples_pem = {
116            'key': 'xxx',
117            'cert': 'xxx',
118        }
119        self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
120        self.tld = 'http.curl.se'
121        self.domain1 = f"one.{self.tld}"
122        self.domain2 = f"two.{self.tld}"
123        self.proxy_domain = f"proxy.{self.tld}"
124        self.cert_specs = [
125            CertificateSpec(domains=[self.domain1, 'localhost'], key_type='rsa2048'),
126            CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
127            CertificateSpec(domains=[self.proxy_domain], key_type='rsa2048'),
128            CertificateSpec(name="clientsX", sub_specs=[
129               CertificateSpec(name="user1", client=True),
130            ]),
131        ]
132
133        self.nghttpx = self.config['nghttpx']['nghttpx']
134        if len(self.nghttpx.strip()) == 0:
135            self.nghttpx = None
136        self._nghttpx_version = None
137        self.nghttpx_with_h3 = False
138        if self.nghttpx is not None:
139            p = subprocess.run(args=[self.nghttpx, '-v'],
140                               capture_output=True, text=True)
141            if p.returncode != 0:
142                # not a working nghttpx
143                self.nghttpx = None
144            else:
145                self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip())
146                self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
147                log.debug(f'nghttpx -v: {p.stdout}')
148
149        self.caddy = self.config['caddy']['caddy']
150        self._caddy_version = None
151        if len(self.caddy.strip()) == 0:
152            self.caddy = None
153        if self.caddy is not None:
154            try:
155                p = subprocess.run(args=[self.caddy, 'version'],
156                                   capture_output=True, text=True)
157                if p.returncode != 0:
158                    # not a working caddy
159                    self.caddy = None
160                self._caddy_version = re.sub(r' .*', '', p.stdout.strip())
161            except:
162                self.caddy = None
163
164    @property
165    def httpd_version(self):
166        if self._httpd_version is None and self.apxs is not None:
167            p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
168                               capture_output=True, text=True)
169            if p.returncode != 0:
170                raise Exception(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
171            self._httpd_version = p.stdout.strip()
172        return self._httpd_version
173
174    def _versiontuple(self, v):
175        v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
176        return tuple(map(int, v.split('.')))
177
178    def httpd_is_at_least(self, minv):
179        hv = self._versiontuple(self.httpd_version)
180        return hv >= self._versiontuple(minv)
181
182    def is_complete(self) -> bool:
183        return os.path.isfile(self.httpd) and \
184               os.path.isfile(self.apachectl) and \
185               self.apxs is not None and \
186               os.path.isfile(self.apxs)
187
188    def get_incomplete_reason(self) -> Optional[str]:
189        if not os.path.isfile(self.httpd):
190            return f'httpd ({self.httpd}) not found'
191        if not os.path.isfile(self.apachectl):
192            return f'apachectl ({self.apachectl}) not found'
193        if self.apxs is None:
194            return f"apxs (provided by apache2-dev) not found"
195        if not os.path.isfile(self.apxs):
196            return f"apxs ({self.apxs}) not found"
197        return None
198
199    @property
200    def nghttpx_version(self):
201        return self._nghttpx_version
202
203    @property
204    def caddy_version(self):
205        return self._caddy_version
206
207
208class Env:
209
210    CONFIG = EnvConfig()
211
212    @staticmethod
213    def setup_incomplete() -> bool:
214        return not Env.CONFIG.is_complete()
215
216    @staticmethod
217    def incomplete_reason() -> Optional[str]:
218        return Env.CONFIG.get_incomplete_reason()
219
220    @staticmethod
221    def have_h3_server() -> bool:
222        return Env.CONFIG.nghttpx_with_h3
223
224    @staticmethod
225    def have_h2_curl() -> bool:
226        return 'http2' in Env.CONFIG.curl_props['features']
227
228    @staticmethod
229    def have_h3_curl() -> bool:
230        return 'http3' in Env.CONFIG.curl_props['features']
231
232    @staticmethod
233    def curl_uses_lib(libname: str) -> bool:
234        return libname.lower() in Env.CONFIG.curl_props['libs']
235
236    @staticmethod
237    def curl_has_feature(feature: str) -> bool:
238        return feature.lower() in Env.CONFIG.curl_props['features']
239
240    @staticmethod
241    def curl_lib_version(libname: str) -> str:
242        prefix = f'{libname.lower()}/'
243        for lversion in Env.CONFIG.curl_props['lib_versions']:
244            if lversion.startswith(prefix):
245                return lversion[len(prefix):]
246        return 'unknown'
247
248    @staticmethod
249    def curl_os() -> str:
250        return Env.CONFIG.curl_props['os']
251
252    @staticmethod
253    def curl_version() -> str:
254        return Env.CONFIG.curl_props['version']
255
256    @staticmethod
257    def have_h3() -> bool:
258        return Env.have_h3_curl() and Env.have_h3_server()
259
260    @staticmethod
261    def httpd_version() -> str:
262        return Env.CONFIG.httpd_version
263
264    @staticmethod
265    def nghttpx_version() -> str:
266        return Env.CONFIG.nghttpx_version
267
268    @staticmethod
269    def caddy_version() -> str:
270        return Env.CONFIG.caddy_version
271
272    @staticmethod
273    def httpd_is_at_least(minv) -> bool:
274        return Env.CONFIG.httpd_is_at_least(minv)
275
276    @staticmethod
277    def has_caddy() -> bool:
278        return Env.CONFIG.caddy is not None
279
280    def __init__(self, pytestconfig=None):
281        self._verbose = pytestconfig.option.verbose \
282            if pytestconfig is not None else 0
283        self._ca = None
284
285    def issue_certs(self):
286        if self._ca is None:
287            ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca')
288            self._ca = TestCA.create_root(name=self.CONFIG.tld,
289                                          store_dir=ca_dir,
290                                          key_type="rsa2048")
291        self._ca.issue_certs(self.CONFIG.cert_specs)
292
293    def setup(self):
294        os.makedirs(self.gen_dir, exist_ok=True)
295        os.makedirs(self.htdocs_dir, exist_ok=True)
296        self.issue_certs()
297
298    def get_credentials(self, domain) -> Optional[Credentials]:
299        creds = self.ca.get_credentials_for_name(domain)
300        if len(creds) > 0:
301            return creds[0]
302        return None
303
304    @property
305    def verbose(self) -> int:
306        return self._verbose
307
308    @property
309    def gen_dir(self) -> str:
310        return self.CONFIG.gen_dir
311
312    @property
313    def ca(self):
314        return self._ca
315
316    @property
317    def htdocs_dir(self) -> str:
318        return self.CONFIG.htdocs_dir
319
320    @property
321    def domain1(self) -> str:
322        return self.CONFIG.domain1
323
324    @property
325    def domain2(self) -> str:
326        return self.CONFIG.domain2
327
328    @property
329    def proxy_domain(self) -> str:
330        return self.CONFIG.proxy_domain
331
332    @property
333    def http_port(self) -> int:
334        return self.CONFIG.ports['http']
335
336    @property
337    def https_port(self) -> int:
338        return self.CONFIG.ports['https']
339
340    @property
341    def h3_port(self) -> int:
342        return self.https_port
343
344    @property
345    def proxy_port(self) -> str:
346        return self.CONFIG.ports['proxy']
347
348    @property
349    def proxys_port(self) -> str:
350        return self.CONFIG.ports['proxys']
351
352    @property
353    def caddy(self) -> str:
354        return self.CONFIG.caddy
355
356    @property
357    def caddy_https_port(self) -> int:
358        return self.CONFIG.ports['caddys']
359
360    @property
361    def caddy_http_port(self) -> int:
362        return self.CONFIG.ports['caddy']
363
364    @property
365    def curl(self) -> str:
366        return self.CONFIG.curl
367
368    @property
369    def httpd(self) -> str:
370        return self.CONFIG.httpd
371
372    @property
373    def apachectl(self) -> str:
374        return self.CONFIG.apachectl
375
376    @property
377    def apxs(self) -> str:
378        return self.CONFIG.apxs
379
380    @property
381    def nghttpx(self) -> Optional[str]:
382        return self.CONFIG.nghttpx
383
384    def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
385        if alpn_proto is None or \
386                alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
387            return f'{domain}:{self.https_port}'
388        if alpn_proto in ['h3']:
389            return f'{domain}:{self.h3_port}'
390        return f'{domain}:{self.http_port}'
391
392    def make_data_file(self, indir: str, fname: str, fsize: int) -> str:
393        fpath = os.path.join(indir, fname)
394        s10 = "0123456789"
395        s = (101 * s10) + s10[0:3]
396        with open(fpath, 'w') as fd:
397            for i in range(int(fsize / 1024)):
398                fd.write(f"{i:09d}-{s}\n")
399            remain = int(fsize % 1024)
400            if remain != 0:
401                i = int(fsize / 1024) + 1
402                s = f"{i:09d}-{s}\n"
403                fd.write(s[0:remain])
404        return fpath
405