1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard 5# 2015, 2016, 2017 Maxence Tury 6 7""" 8Stream ciphers. 9""" 10 11from scapy.config import conf 12from scapy.layers.tls.crypto.common import CipherError 13 14if conf.crypto_valid: 15 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms 16 from cryptography.hazmat.backends import default_backend 17 try: 18 # cryptography > 43.0 19 from cryptography.hazmat.decrepit.ciphers import ( 20 algorithms as decrepit_algorithms, 21 ) 22 except ImportError: 23 decrepit_algorithms = algorithms 24 25 26_tls_stream_cipher_algs = {} 27 28 29class _StreamCipherMetaclass(type): 30 """ 31 Cipher classes are automatically registered through this metaclass. 32 Furthermore, their name attribute is extracted from their class name. 33 """ 34 def __new__(cls, ciph_name, bases, dct): 35 if ciph_name != "_StreamCipher": 36 dct["name"] = ciph_name[7:] # remove leading "Cipher_" 37 the_class = super(_StreamCipherMetaclass, cls).__new__(cls, ciph_name, 38 bases, dct) 39 if ciph_name != "_StreamCipher": 40 _tls_stream_cipher_algs[ciph_name[7:]] = the_class 41 return the_class 42 43 44class _StreamCipher(metaclass=_StreamCipherMetaclass): 45 type = "stream" 46 47 def __init__(self, key=None): 48 """ 49 Note that we have to keep the encryption/decryption state in unique 50 encryptor and decryptor objects. This differs from _BlockCipher. 51 52 In order to do connection state snapshots, we need to be able to 53 recreate past cipher contexts. This is why we feed _enc_updated_with 54 and _dec_updated_with every time encrypt() or decrypt() is called. 55 """ 56 self.ready = {"key": True} 57 if key is None: 58 self.ready["key"] = False 59 if hasattr(self, "expanded_key_len"): 60 tmp_len = self.expanded_key_len 61 else: 62 tmp_len = self.key_len 63 key = b"\0" * tmp_len 64 65 # we use super() in order to avoid any deadlock with __setattr__ 66 super(_StreamCipher, self).__setattr__("key", key) 67 68 self._cipher = Cipher(self.pc_cls(key), 69 mode=None, 70 backend=default_backend()) 71 self.encryptor = self._cipher.encryptor() 72 self.decryptor = self._cipher.decryptor() 73 self._enc_updated_with = b"" 74 self._dec_updated_with = b"" 75 76 def __setattr__(self, name, val): 77 """ 78 We have to keep the encryptor/decryptor for a long time, 79 however they have to be updated every time the key is changed. 80 """ 81 if name == "key": 82 if self._cipher is not None: 83 self._cipher.algorithm.key = val 84 self.encryptor = self._cipher.encryptor() 85 self.decryptor = self._cipher.decryptor() 86 self.ready["key"] = True 87 super(_StreamCipher, self).__setattr__(name, val) 88 89 def encrypt(self, data): 90 if False in self.ready.values(): 91 raise CipherError(data) 92 self._enc_updated_with += data 93 return self.encryptor.update(data) 94 95 def decrypt(self, data): 96 if False in self.ready.values(): 97 raise CipherError(data) 98 self._dec_updated_with += data 99 return self.decryptor.update(data) 100 101 def snapshot(self): 102 c = self.__class__(self.key) 103 c.ready = self.ready.copy() 104 c.encryptor.update(self._enc_updated_with) 105 c.decryptor.update(self._dec_updated_with) 106 c._enc_updated_with = self._enc_updated_with 107 c._dec_updated_with = self._dec_updated_with 108 return c 109 110 111if conf.crypto_valid: 112 class Cipher_RC4_128(_StreamCipher): 113 pc_cls = decrepit_algorithms.ARC4 114 key_len = 16 115 116 class Cipher_RC4_40(Cipher_RC4_128): 117 expanded_key_len = 16 118 key_len = 5 119 120 121class Cipher_NULL(_StreamCipher): 122 key_len = 0 123 124 def __init__(self, key=None): 125 self.ready = {"key": True} 126 self._cipher = None 127 # we use super() in order to avoid any deadlock with __setattr__ 128 super(Cipher_NULL, self).__setattr__("key", key) 129 130 def snapshot(self): 131 c = self.__class__(self.key) 132 c.ready = self.ready.copy() 133 return c 134 135 def encrypt(self, data): 136 return data 137 138 def decrypt(self, data): 139 return data 140