• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
5#               2015, 2016, 2017 Maxence Tury
6
7"""
8Stream ciphers.
9"""
10
11from scapy.config import conf
12from scapy.layers.tls.crypto.common import CipherError
13
14if conf.crypto_valid:
15    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
16    from cryptography.hazmat.backends import default_backend
17    try:
18        # cryptography > 43.0
19        from cryptography.hazmat.decrepit.ciphers import (
20            algorithms as decrepit_algorithms,
21        )
22    except ImportError:
23        decrepit_algorithms = algorithms
24
25
26_tls_stream_cipher_algs = {}
27
28
29class _StreamCipherMetaclass(type):
30    """
31    Cipher classes are automatically registered through this metaclass.
32    Furthermore, their name attribute is extracted from their class name.
33    """
34    def __new__(cls, ciph_name, bases, dct):
35        if ciph_name != "_StreamCipher":
36            dct["name"] = ciph_name[7:]     # remove leading "Cipher_"
37        the_class = super(_StreamCipherMetaclass, cls).__new__(cls, ciph_name,
38                                                               bases, dct)
39        if ciph_name != "_StreamCipher":
40            _tls_stream_cipher_algs[ciph_name[7:]] = the_class
41        return the_class
42
43
44class _StreamCipher(metaclass=_StreamCipherMetaclass):
45    type = "stream"
46
47    def __init__(self, key=None):
48        """
49        Note that we have to keep the encryption/decryption state in unique
50        encryptor and decryptor objects. This differs from _BlockCipher.
51
52        In order to do connection state snapshots, we need to be able to
53        recreate past cipher contexts. This is why we feed _enc_updated_with
54        and _dec_updated_with every time encrypt() or decrypt() is called.
55        """
56        self.ready = {"key": True}
57        if key is None:
58            self.ready["key"] = False
59            if hasattr(self, "expanded_key_len"):
60                tmp_len = self.expanded_key_len
61            else:
62                tmp_len = self.key_len
63            key = b"\0" * tmp_len
64
65        # we use super() in order to avoid any deadlock with __setattr__
66        super(_StreamCipher, self).__setattr__("key", key)
67
68        self._cipher = Cipher(self.pc_cls(key),
69                              mode=None,
70                              backend=default_backend())
71        self.encryptor = self._cipher.encryptor()
72        self.decryptor = self._cipher.decryptor()
73        self._enc_updated_with = b""
74        self._dec_updated_with = b""
75
76    def __setattr__(self, name, val):
77        """
78        We have to keep the encryptor/decryptor for a long time,
79        however they have to be updated every time the key is changed.
80        """
81        if name == "key":
82            if self._cipher is not None:
83                self._cipher.algorithm.key = val
84                self.encryptor = self._cipher.encryptor()
85                self.decryptor = self._cipher.decryptor()
86            self.ready["key"] = True
87        super(_StreamCipher, self).__setattr__(name, val)
88
89    def encrypt(self, data):
90        if False in self.ready.values():
91            raise CipherError(data)
92        self._enc_updated_with += data
93        return self.encryptor.update(data)
94
95    def decrypt(self, data):
96        if False in self.ready.values():
97            raise CipherError(data)
98        self._dec_updated_with += data
99        return self.decryptor.update(data)
100
101    def snapshot(self):
102        c = self.__class__(self.key)
103        c.ready = self.ready.copy()
104        c.encryptor.update(self._enc_updated_with)
105        c.decryptor.update(self._dec_updated_with)
106        c._enc_updated_with = self._enc_updated_with
107        c._dec_updated_with = self._dec_updated_with
108        return c
109
110
111if conf.crypto_valid:
112    class Cipher_RC4_128(_StreamCipher):
113        pc_cls = decrepit_algorithms.ARC4
114        key_len = 16
115
116    class Cipher_RC4_40(Cipher_RC4_128):
117        expanded_key_len = 16
118        key_len = 5
119
120
121class Cipher_NULL(_StreamCipher):
122    key_len = 0
123
124    def __init__(self, key=None):
125        self.ready = {"key": True}
126        self._cipher = None
127        # we use super() in order to avoid any deadlock with __setattr__
128        super(Cipher_NULL, self).__setattr__("key", key)
129
130    def snapshot(self):
131        c = self.__class__(self.key)
132        c.ready = self.ready.copy()
133        return c
134
135    def encrypt(self, data):
136        return data
137
138    def decrypt(self, data):
139        return data
140