• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3See: RFC 1014
4
5"""
6
7import struct
8from io import BytesIO
9from functools import wraps
10import warnings
11
12warnings._deprecated(__name__, remove=(3, 13))
13
14__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
15
16# exceptions
17class Error(Exception):
18    """Exception class for this module. Use:
19
20    except xdrlib.Error as var:
21        # var has the Error instance for the exception
22
23    Public ivars:
24        msg -- contains the message
25
26    """
27    def __init__(self, msg):
28        self.msg = msg
29    def __repr__(self):
30        return repr(self.msg)
31    def __str__(self):
32        return str(self.msg)
33
34
35class ConversionError(Error):
36    pass
37
38def raise_conversion_error(function):
39    """ Wrap any raised struct.errors in a ConversionError. """
40
41    @wraps(function)
42    def result(self, value):
43        try:
44            return function(self, value)
45        except struct.error as e:
46            raise ConversionError(e.args[0]) from None
47    return result
48
49
50class Packer:
51    """Pack various data representations into a buffer."""
52
53    def __init__(self):
54        self.reset()
55
56    def reset(self):
57        self.__buf = BytesIO()
58
59    def get_buffer(self):
60        return self.__buf.getvalue()
61    # backwards compatibility
62    get_buf = get_buffer
63
64    @raise_conversion_error
65    def pack_uint(self, x):
66        self.__buf.write(struct.pack('>L', x))
67
68    @raise_conversion_error
69    def pack_int(self, x):
70        self.__buf.write(struct.pack('>l', x))
71
72    pack_enum = pack_int
73
74    def pack_bool(self, x):
75        if x: self.__buf.write(b'\0\0\0\1')
76        else: self.__buf.write(b'\0\0\0\0')
77
78    def pack_uhyper(self, x):
79        try:
80            self.pack_uint(x>>32 & 0xffffffff)
81        except (TypeError, struct.error) as e:
82            raise ConversionError(e.args[0]) from None
83        try:
84            self.pack_uint(x & 0xffffffff)
85        except (TypeError, struct.error) as e:
86            raise ConversionError(e.args[0]) from None
87
88    pack_hyper = pack_uhyper
89
90    @raise_conversion_error
91    def pack_float(self, x):
92        self.__buf.write(struct.pack('>f', x))
93
94    @raise_conversion_error
95    def pack_double(self, x):
96        self.__buf.write(struct.pack('>d', x))
97
98    def pack_fstring(self, n, s):
99        if n < 0:
100            raise ValueError('fstring size must be nonnegative')
101        data = s[:n]
102        n = ((n+3)//4)*4
103        data = data + (n - len(data)) * b'\0'
104        self.__buf.write(data)
105
106    pack_fopaque = pack_fstring
107
108    def pack_string(self, s):
109        n = len(s)
110        self.pack_uint(n)
111        self.pack_fstring(n, s)
112
113    pack_opaque = pack_string
114    pack_bytes = pack_string
115
116    def pack_list(self, list, pack_item):
117        for item in list:
118            self.pack_uint(1)
119            pack_item(item)
120        self.pack_uint(0)
121
122    def pack_farray(self, n, list, pack_item):
123        if len(list) != n:
124            raise ValueError('wrong array size')
125        for item in list:
126            pack_item(item)
127
128    def pack_array(self, list, pack_item):
129        n = len(list)
130        self.pack_uint(n)
131        self.pack_farray(n, list, pack_item)
132
133
134
135class Unpacker:
136    """Unpacks various data representations from the given buffer."""
137
138    def __init__(self, data):
139        self.reset(data)
140
141    def reset(self, data):
142        self.__buf = data
143        self.__pos = 0
144
145    def get_position(self):
146        return self.__pos
147
148    def set_position(self, position):
149        self.__pos = position
150
151    def get_buffer(self):
152        return self.__buf
153
154    def done(self):
155        if self.__pos < len(self.__buf):
156            raise Error('unextracted data remains')
157
158    def unpack_uint(self):
159        i = self.__pos
160        self.__pos = j = i+4
161        data = self.__buf[i:j]
162        if len(data) < 4:
163            raise EOFError
164        return struct.unpack('>L', data)[0]
165
166    def unpack_int(self):
167        i = self.__pos
168        self.__pos = j = i+4
169        data = self.__buf[i:j]
170        if len(data) < 4:
171            raise EOFError
172        return struct.unpack('>l', data)[0]
173
174    unpack_enum = unpack_int
175
176    def unpack_bool(self):
177        return bool(self.unpack_int())
178
179    def unpack_uhyper(self):
180        hi = self.unpack_uint()
181        lo = self.unpack_uint()
182        return int(hi)<<32 | lo
183
184    def unpack_hyper(self):
185        x = self.unpack_uhyper()
186        if x >= 0x8000000000000000:
187            x = x - 0x10000000000000000
188        return x
189
190    def unpack_float(self):
191        i = self.__pos
192        self.__pos = j = i+4
193        data = self.__buf[i:j]
194        if len(data) < 4:
195            raise EOFError
196        return struct.unpack('>f', data)[0]
197
198    def unpack_double(self):
199        i = self.__pos
200        self.__pos = j = i+8
201        data = self.__buf[i:j]
202        if len(data) < 8:
203            raise EOFError
204        return struct.unpack('>d', data)[0]
205
206    def unpack_fstring(self, n):
207        if n < 0:
208            raise ValueError('fstring size must be nonnegative')
209        i = self.__pos
210        j = i + (n+3)//4*4
211        if j > len(self.__buf):
212            raise EOFError
213        self.__pos = j
214        return self.__buf[i:i+n]
215
216    unpack_fopaque = unpack_fstring
217
218    def unpack_string(self):
219        n = self.unpack_uint()
220        return self.unpack_fstring(n)
221
222    unpack_opaque = unpack_string
223    unpack_bytes = unpack_string
224
225    def unpack_list(self, unpack_item):
226        list = []
227        while 1:
228            x = self.unpack_uint()
229            if x == 0: break
230            if x != 1:
231                raise ConversionError('0 or 1 expected, got %r' % (x,))
232            item = unpack_item()
233            list.append(item)
234        return list
235
236    def unpack_farray(self, n, unpack_item):
237        list = []
238        for i in range(n):
239            list.append(unpack_item())
240        return list
241
242    def unpack_array(self, unpack_item):
243        n = self.unpack_uint()
244        return self.unpack_farray(n, unpack_item)
245