• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3See: RFC 1014
4
5"""
6
7import struct
8from io import BytesIO
9from functools import wraps
10
11__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
12
13# exceptions
14class Error(Exception):
15    """Exception class for this module. Use:
16
17    except xdrlib.Error as var:
18        # var has the Error instance for the exception
19
20    Public ivars:
21        msg -- contains the message
22
23    """
24    def __init__(self, msg):
25        self.msg = msg
26    def __repr__(self):
27        return repr(self.msg)
28    def __str__(self):
29        return str(self.msg)
30
31
32class ConversionError(Error):
33    pass
34
35def raise_conversion_error(function):
36    """ Wrap any raised struct.errors in a ConversionError. """
37
38    @wraps(function)
39    def result(self, value):
40        try:
41            return function(self, value)
42        except struct.error as e:
43            raise ConversionError(e.args[0]) from None
44    return result
45
46
47class Packer:
48    """Pack various data representations into a buffer."""
49
50    def __init__(self):
51        self.reset()
52
53    def reset(self):
54        self.__buf = BytesIO()
55
56    def get_buffer(self):
57        return self.__buf.getvalue()
58    # backwards compatibility
59    get_buf = get_buffer
60
61    @raise_conversion_error
62    def pack_uint(self, x):
63        self.__buf.write(struct.pack('>L', x))
64
65    @raise_conversion_error
66    def pack_int(self, x):
67        self.__buf.write(struct.pack('>l', x))
68
69    pack_enum = pack_int
70
71    def pack_bool(self, x):
72        if x: self.__buf.write(b'\0\0\0\1')
73        else: self.__buf.write(b'\0\0\0\0')
74
75    def pack_uhyper(self, x):
76        try:
77            self.pack_uint(x>>32 & 0xffffffff)
78        except (TypeError, struct.error) as e:
79            raise ConversionError(e.args[0]) from None
80        try:
81            self.pack_uint(x & 0xffffffff)
82        except (TypeError, struct.error) as e:
83            raise ConversionError(e.args[0]) from None
84
85    pack_hyper = pack_uhyper
86
87    @raise_conversion_error
88    def pack_float(self, x):
89        self.__buf.write(struct.pack('>f', x))
90
91    @raise_conversion_error
92    def pack_double(self, x):
93        self.__buf.write(struct.pack('>d', x))
94
95    def pack_fstring(self, n, s):
96        if n < 0:
97            raise ValueError('fstring size must be nonnegative')
98        data = s[:n]
99        n = ((n+3)//4)*4
100        data = data + (n - len(data)) * b'\0'
101        self.__buf.write(data)
102
103    pack_fopaque = pack_fstring
104
105    def pack_string(self, s):
106        n = len(s)
107        self.pack_uint(n)
108        self.pack_fstring(n, s)
109
110    pack_opaque = pack_string
111    pack_bytes = pack_string
112
113    def pack_list(self, list, pack_item):
114        for item in list:
115            self.pack_uint(1)
116            pack_item(item)
117        self.pack_uint(0)
118
119    def pack_farray(self, n, list, pack_item):
120        if len(list) != n:
121            raise ValueError('wrong array size')
122        for item in list:
123            pack_item(item)
124
125    def pack_array(self, list, pack_item):
126        n = len(list)
127        self.pack_uint(n)
128        self.pack_farray(n, list, pack_item)
129
130
131
132class Unpacker:
133    """Unpacks various data representations from the given buffer."""
134
135    def __init__(self, data):
136        self.reset(data)
137
138    def reset(self, data):
139        self.__buf = data
140        self.__pos = 0
141
142    def get_position(self):
143        return self.__pos
144
145    def set_position(self, position):
146        self.__pos = position
147
148    def get_buffer(self):
149        return self.__buf
150
151    def done(self):
152        if self.__pos < len(self.__buf):
153            raise Error('unextracted data remains')
154
155    def unpack_uint(self):
156        i = self.__pos
157        self.__pos = j = i+4
158        data = self.__buf[i:j]
159        if len(data) < 4:
160            raise EOFError
161        return struct.unpack('>L', data)[0]
162
163    def unpack_int(self):
164        i = self.__pos
165        self.__pos = j = i+4
166        data = self.__buf[i:j]
167        if len(data) < 4:
168            raise EOFError
169        return struct.unpack('>l', data)[0]
170
171    unpack_enum = unpack_int
172
173    def unpack_bool(self):
174        return bool(self.unpack_int())
175
176    def unpack_uhyper(self):
177        hi = self.unpack_uint()
178        lo = self.unpack_uint()
179        return int(hi)<<32 | lo
180
181    def unpack_hyper(self):
182        x = self.unpack_uhyper()
183        if x >= 0x8000000000000000:
184            x = x - 0x10000000000000000
185        return x
186
187    def unpack_float(self):
188        i = self.__pos
189        self.__pos = j = i+4
190        data = self.__buf[i:j]
191        if len(data) < 4:
192            raise EOFError
193        return struct.unpack('>f', data)[0]
194
195    def unpack_double(self):
196        i = self.__pos
197        self.__pos = j = i+8
198        data = self.__buf[i:j]
199        if len(data) < 8:
200            raise EOFError
201        return struct.unpack('>d', data)[0]
202
203    def unpack_fstring(self, n):
204        if n < 0:
205            raise ValueError('fstring size must be nonnegative')
206        i = self.__pos
207        j = i + (n+3)//4*4
208        if j > len(self.__buf):
209            raise EOFError
210        self.__pos = j
211        return self.__buf[i:i+n]
212
213    unpack_fopaque = unpack_fstring
214
215    def unpack_string(self):
216        n = self.unpack_uint()
217        return self.unpack_fstring(n)
218
219    unpack_opaque = unpack_string
220    unpack_bytes = unpack_string
221
222    def unpack_list(self, unpack_item):
223        list = []
224        while 1:
225            x = self.unpack_uint()
226            if x == 0: break
227            if x != 1:
228                raise ConversionError('0 or 1 expected, got %r' % (x,))
229            item = unpack_item()
230            list.append(item)
231        return list
232
233    def unpack_farray(self, n, unpack_item):
234        list = []
235        for i in range(n):
236            list.append(unpack_item())
237        return list
238
239    def unpack_array(self, unpack_item):
240        n = self.unpack_uint()
241        return self.unpack_farray(n, unpack_item)
242