1import sys 2import warnings 3 4from six import PY2, text_type 5 6from cryptography.hazmat.bindings.openssl.binding import Binding 7 8 9binding = Binding() 10binding.init_static_locks() 11ffi = binding.ffi 12lib = binding.lib 13 14 15# This is a special CFFI allocator that does not bother to zero its memory 16# after allocation. This has vastly better performance on large allocations and 17# so should be used whenever we don't need the memory zeroed out. 18no_zero_allocator = ffi.new_allocator(should_clear_after_alloc=False) 19 20 21def text(charp): 22 """ 23 Get a native string type representing of the given CFFI ``char*`` object. 24 25 :param charp: A C-style string represented using CFFI. 26 27 :return: :class:`str` 28 """ 29 if not charp: 30 return "" 31 return native(ffi.string(charp)) 32 33 34def exception_from_error_queue(exception_type): 35 """ 36 Convert an OpenSSL library failure into a Python exception. 37 38 When a call to the native OpenSSL library fails, this is usually signalled 39 by the return value, and an error code is stored in an error queue 40 associated with the current thread. The err library provides functions to 41 obtain these error codes and textual error messages. 42 """ 43 errors = [] 44 45 while True: 46 error = lib.ERR_get_error() 47 if error == 0: 48 break 49 errors.append( 50 ( 51 text(lib.ERR_lib_error_string(error)), 52 text(lib.ERR_func_error_string(error)), 53 text(lib.ERR_reason_error_string(error)), 54 ) 55 ) 56 57 raise exception_type(errors) 58 59 60def make_assert(error): 61 """ 62 Create an assert function that uses :func:`exception_from_error_queue` to 63 raise an exception wrapped by *error*. 64 """ 65 66 def openssl_assert(ok): 67 """ 68 If *ok* is not True, retrieve the error from OpenSSL and raise it. 69 """ 70 if ok is not True: 71 exception_from_error_queue(error) 72 73 return openssl_assert 74 75 76def native(s): 77 """ 78 Convert :py:class:`bytes` or :py:class:`unicode` to the native 79 :py:class:`str` type, using UTF-8 encoding if conversion is necessary. 80 81 :raise UnicodeError: The input string is not UTF-8 decodeable. 82 83 :raise TypeError: The input is neither :py:class:`bytes` nor 84 :py:class:`unicode`. 85 """ 86 if not isinstance(s, (bytes, text_type)): 87 raise TypeError("%r is neither bytes nor unicode" % s) 88 if PY2: 89 if isinstance(s, text_type): 90 return s.encode("utf-8") 91 else: 92 if isinstance(s, bytes): 93 return s.decode("utf-8") 94 return s 95 96 97def path_string(s): 98 """ 99 Convert a Python string to a :py:class:`bytes` string identifying the same 100 path and which can be passed into an OpenSSL API accepting a filename. 101 102 :param s: An instance of :py:class:`bytes` or :py:class:`unicode`. 103 104 :return: An instance of :py:class:`bytes`. 105 """ 106 if isinstance(s, bytes): 107 return s 108 elif isinstance(s, text_type): 109 return s.encode(sys.getfilesystemencoding()) 110 else: 111 raise TypeError("Path must be represented as bytes or unicode string") 112 113 114if PY2: 115 116 def byte_string(s): 117 return s 118 119 120else: 121 122 def byte_string(s): 123 return s.encode("charmap") 124 125 126# A marker object to observe whether some optional arguments are passed any 127# value or not. 128UNSPECIFIED = object() 129 130_TEXT_WARNING = ( 131 text_type.__name__ + " for {0} is no longer accepted, use bytes" 132) 133 134 135def text_to_bytes_and_warn(label, obj): 136 """ 137 If ``obj`` is text, emit a warning that it should be bytes instead and try 138 to convert it to bytes automatically. 139 140 :param str label: The name of the parameter from which ``obj`` was taken 141 (so a developer can easily find the source of the problem and correct 142 it). 143 144 :return: If ``obj`` is the text string type, a ``bytes`` object giving the 145 UTF-8 encoding of that text is returned. Otherwise, ``obj`` itself is 146 returned. 147 """ 148 if isinstance(obj, text_type): 149 warnings.warn( 150 _TEXT_WARNING.format(label), 151 category=DeprecationWarning, 152 stacklevel=3, 153 ) 154 return obj.encode("utf-8") 155 return obj 156 157 158from_buffer = ffi.from_buffer 159