1 """Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3 See: RFC 1014
4
5 """
6
7 import struct
8 from io import BytesIO
9 from functools import wraps
10 import warnings
11
12 warnings._deprecated(__name__, remove=(3, 13))
13
14 __all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
15
16 # exceptions
17 class ESC[4;38;5;81mError(ESC[4;38;5;149mException):
18 """Exception class for this module. Use:
19
20 except xdrlib.Error as var:
21 # var has the Error instance for the exception
22
23 Public ivars:
24 msg -- contains the message
25
26 """
27 def __init__(self, msg):
28 self.msg = msg
29 def __repr__(self):
30 return repr(self.msg)
31 def __str__(self):
32 return str(self.msg)
33
34
35 class ESC[4;38;5;81mConversionError(ESC[4;38;5;149mError):
36 pass
37
38 def raise_conversion_error(function):
39 """ Wrap any raised struct.errors in a ConversionError. """
40
41 @wraps(function)
42 def result(self, value):
43 try:
44 return function(self, value)
45 except struct.error as e:
46 raise ConversionError(e.args[0]) from None
47 return result
48
49
50 class ESC[4;38;5;81mPacker:
51 """Pack various data representations into a buffer."""
52
53 def __init__(self):
54 self.reset()
55
56 def reset(self):
57 self.__buf = BytesIO()
58
59 def get_buffer(self):
60 return self.__buf.getvalue()
61 # backwards compatibility
62 get_buf = get_buffer
63
64 @raise_conversion_error
65 def pack_uint(self, x):
66 self.__buf.write(struct.pack('>L', x))
67
68 @raise_conversion_error
69 def pack_int(self, x):
70 self.__buf.write(struct.pack('>l', x))
71
72 pack_enum = pack_int
73
74 def pack_bool(self, x):
75 if x: self.__buf.write(b'\0\0\0\1')
76 else: self.__buf.write(b'\0\0\0\0')
77
78 def pack_uhyper(self, x):
79 try:
80 self.pack_uint(x>>32 & 0xffffffff)
81 except (TypeError, struct.error) as e:
82 raise ConversionError(e.args[0]) from None
83 try:
84 self.pack_uint(x & 0xffffffff)
85 except (TypeError, struct.error) as e:
86 raise ConversionError(e.args[0]) from None
87
88 pack_hyper = pack_uhyper
89
90 @raise_conversion_error
91 def pack_float(self, x):
92 self.__buf.write(struct.pack('>f', x))
93
94 @raise_conversion_error
95 def pack_double(self, x):
96 self.__buf.write(struct.pack('>d', x))
97
98 def pack_fstring(self, n, s):
99 if n < 0:
100 raise ValueError('fstring size must be nonnegative')
101 data = s[:n]
102 n = ((n+3)//4)*4
103 data = data + (n - len(data)) * b'\0'
104 self.__buf.write(data)
105
106 pack_fopaque = pack_fstring
107
108 def pack_string(self, s):
109 n = len(s)
110 self.pack_uint(n)
111 self.pack_fstring(n, s)
112
113 pack_opaque = pack_string
114 pack_bytes = pack_string
115
116 def pack_list(self, list, pack_item):
117 for item in list:
118 self.pack_uint(1)
119 pack_item(item)
120 self.pack_uint(0)
121
122 def pack_farray(self, n, list, pack_item):
123 if len(list) != n:
124 raise ValueError('wrong array size')
125 for item in list:
126 pack_item(item)
127
128 def pack_array(self, list, pack_item):
129 n = len(list)
130 self.pack_uint(n)
131 self.pack_farray(n, list, pack_item)
132
133
134
135 class ESC[4;38;5;81mUnpacker:
136 """Unpacks various data representations from the given buffer."""
137
138 def __init__(self, data):
139 self.reset(data)
140
141 def reset(self, data):
142 self.__buf = data
143 self.__pos = 0
144
145 def get_position(self):
146 return self.__pos
147
148 def set_position(self, position):
149 self.__pos = position
150
151 def get_buffer(self):
152 return self.__buf
153
154 def done(self):
155 if self.__pos < len(self.__buf):
156 raise Error('unextracted data remains')
157
158 def unpack_uint(self):
159 i = self.__pos
160 self.__pos = j = i+4
161 data = self.__buf[i:j]
162 if len(data) < 4:
163 raise EOFError
164 return struct.unpack('>L', data)[0]
165
166 def unpack_int(self):
167 i = self.__pos
168 self.__pos = j = i+4
169 data = self.__buf[i:j]
170 if len(data) < 4:
171 raise EOFError
172 return struct.unpack('>l', data)[0]
173
174 unpack_enum = unpack_int
175
176 def unpack_bool(self):
177 return bool(self.unpack_int())
178
179 def unpack_uhyper(self):
180 hi = self.unpack_uint()
181 lo = self.unpack_uint()
182 return int(hi)<<32 | lo
183
184 def unpack_hyper(self):
185 x = self.unpack_uhyper()
186 if x >= 0x8000000000000000:
187 x = x - 0x10000000000000000
188 return x
189
190 def unpack_float(self):
191 i = self.__pos
192 self.__pos = j = i+4
193 data = self.__buf[i:j]
194 if len(data) < 4:
195 raise EOFError
196 return struct.unpack('>f', data)[0]
197
198 def unpack_double(self):
199 i = self.__pos
200 self.__pos = j = i+8
201 data = self.__buf[i:j]
202 if len(data) < 8:
203 raise EOFError
204 return struct.unpack('>d', data)[0]
205
206 def unpack_fstring(self, n):
207 if n < 0:
208 raise ValueError('fstring size must be nonnegative')
209 i = self.__pos
210 j = i + (n+3)//4*4
211 if j > len(self.__buf):
212 raise EOFError
213 self.__pos = j
214 return self.__buf[i:i+n]
215
216 unpack_fopaque = unpack_fstring
217
218 def unpack_string(self):
219 n = self.unpack_uint()
220 return self.unpack_fstring(n)
221
222 unpack_opaque = unpack_string
223 unpack_bytes = unpack_string
224
225 def unpack_list(self, unpack_item):
226 list = []
227 while (x := self.unpack_uint()) != 0:
228 if x != 1:
229 raise ConversionError('0 or 1 expected, got %r' % (x,))
230 item = unpack_item()
231 list.append(item)
232 return list
233
234 def unpack_farray(self, n, unpack_item):
235 list = []
236 for i in range(n):
237 list.append(unpack_item())
238 return list
239
240 def unpack_array(self, unpack_item):
241 n = self.unpack_uint()
242 return self.unpack_farray(n, unpack_item)