• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3## This file is part of Scapy
4## This program is published under a GPLv2 license
5
6"""
7TLS server used in unit tests.
8
9When some expected_data is provided, a TLS client (e.g. openssl s_client)
10should send some application data after the handshake. If this data matches our
11expected_data, then we leave with exit code 0. Else we leave with exit code 1.
12If no expected_data was provided and the handshake was ok, we exit with 0.
13"""
14
15from ast import literal_eval
16import os
17import sys
18from contextlib import contextmanager
19from io import BytesIO, StringIO
20
21from scapy.modules import six
22
23basedir = os.path.abspath(os.path.join(os.path.dirname(__file__),
24                                       os.path.pardir, os.path.pardir))
25sys.path = [basedir] + sys.path
26
27from scapy.layers.tls.automaton_srv import TLSServerAutomaton
28
29
30@contextmanager
31def captured_output():
32    new_out, new_err = (StringIO(), StringIO()) if six.PY3 else (BytesIO(), BytesIO())
33    old_out, old_err = sys.stdout, sys.stderr
34    try:
35        sys.stdout, sys.stderr = new_out, new_err
36        yield sys.stdout, sys.stderr
37    finally:
38        sys.stdout, sys.stderr = old_out, old_err
39
40def check_output_for_data(out, err, expected_data):
41    errored = err.getvalue()
42    if errored:
43        return (False, errored)
44    output = out.getvalue().strip()
45    if expected_data:
46        for data in output.split('> Received: ')[1:]:
47            for line in literal_eval(data).split(b'\n'):
48                if line == expected_data:
49                    return (True, output)
50        return (False, output)
51    else:
52        return (True, None)
53
54def run_tls_test_server(expected_data, q):
55    correct = False
56    with captured_output() as (out, err):
57        # Prepare automaton
58        crt_basedir = os.path.join(basedir, 'test', 'tls', 'pki')
59        t = TLSServerAutomaton(mycert=os.path.join(crt_basedir, 'srv_cert.pem'),
60                               mykey=os.path.join(crt_basedir, 'srv_key.pem'))
61        # Sync threads
62        q.put(True)
63        # Run server automaton
64        t.run()
65        # Return correct answer
66        correct, out_e = check_output_for_data(out, err, expected_data)
67    # Return data
68    q.put(out_e)
69    if correct:
70        sys.exit(0)
71    else:
72        sys.exit(1)
73