1 import sys
2 import os
3 import unittest
4 from array import array
5 from weakref import proxy
6
7 import io
8 import _pyio as pyio
9
10 from test.support import gc_collect
11 from test.support.os_helper import TESTFN
12 from test.support import os_helper
13 from test.support import warnings_helper
14 from collections import UserList
15
16 class ESC[4;38;5;81mAutoFileTests:
17 # file tests for which a test file is automatically set up
18
19 def setUp(self):
20 self.f = self.open(TESTFN, 'wb')
21
22 def tearDown(self):
23 if self.f:
24 self.f.close()
25 os_helper.unlink(TESTFN)
26
27 def testWeakRefs(self):
28 # verify weak references
29 p = proxy(self.f)
30 p.write(b'teststring')
31 self.assertEqual(self.f.tell(), p.tell())
32 self.f.close()
33 self.f = None
34 gc_collect() # For PyPy or other GCs.
35 self.assertRaises(ReferenceError, getattr, p, 'tell')
36
37 def testAttributes(self):
38 # verify expected attributes exist
39 f = self.f
40 f.name # merely shouldn't blow up
41 f.mode # ditto
42 f.closed # ditto
43
44 def testReadinto(self):
45 # verify readinto
46 self.f.write(b'12')
47 self.f.close()
48 a = array('b', b'x'*10)
49 self.f = self.open(TESTFN, 'rb')
50 n = self.f.readinto(a)
51 self.assertEqual(b'12', a.tobytes()[:n])
52
53 def testReadinto_text(self):
54 # verify readinto refuses text files
55 a = array('b', b'x'*10)
56 self.f.close()
57 self.f = self.open(TESTFN, encoding="utf-8")
58 if hasattr(self.f, "readinto"):
59 self.assertRaises(TypeError, self.f.readinto, a)
60
61 def testWritelinesUserList(self):
62 # verify writelines with instance sequence
63 l = UserList([b'1', b'2'])
64 self.f.writelines(l)
65 self.f.close()
66 self.f = self.open(TESTFN, 'rb')
67 buf = self.f.read()
68 self.assertEqual(buf, b'12')
69
70 def testWritelinesIntegers(self):
71 # verify writelines with integers
72 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
73
74 def testWritelinesIntegersUserList(self):
75 # verify writelines with integers in UserList
76 l = UserList([1,2,3])
77 self.assertRaises(TypeError, self.f.writelines, l)
78
79 def testWritelinesNonString(self):
80 # verify writelines with non-string object
81 class ESC[4;38;5;81mNonString:
82 pass
83
84 self.assertRaises(TypeError, self.f.writelines,
85 [NonString(), NonString()])
86
87 def testErrors(self):
88 f = self.f
89 self.assertEqual(f.name, TESTFN)
90 self.assertFalse(f.isatty())
91 self.assertFalse(f.closed)
92
93 if hasattr(f, "readinto"):
94 self.assertRaises((OSError, TypeError), f.readinto, "")
95 f.close()
96 self.assertTrue(f.closed)
97
98 def testMethods(self):
99 methods = [('fileno', ()),
100 ('flush', ()),
101 ('isatty', ()),
102 ('__next__', ()),
103 ('read', ()),
104 ('write', (b"",)),
105 ('readline', ()),
106 ('readlines', ()),
107 ('seek', (0,)),
108 ('tell', ()),
109 ('write', (b"",)),
110 ('writelines', ([],)),
111 ('__iter__', ()),
112 ]
113 methods.append(('truncate', ()))
114
115 # __exit__ should close the file
116 self.f.__exit__(None, None, None)
117 self.assertTrue(self.f.closed)
118
119 for methodname, args in methods:
120 method = getattr(self.f, methodname)
121 # should raise on closed file
122 self.assertRaises(ValueError, method, *args)
123
124 # file is closed, __exit__ shouldn't do anything
125 self.assertEqual(self.f.__exit__(None, None, None), None)
126 # it must also return None if an exception was given
127 try:
128 1/0
129 except:
130 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
131
132 def testReadWhenWriting(self):
133 self.assertRaises(OSError, self.f.read)
134
135 class ESC[4;38;5;81mCAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
136 open = io.open
137
138 class ESC[4;38;5;81mPyAutoFileTests(ESC[4;38;5;149mAutoFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
139 open = staticmethod(pyio.open)
140
141
142 class ESC[4;38;5;81mOtherFileTests:
143
144 def tearDown(self):
145 os_helper.unlink(TESTFN)
146
147 def testModeStrings(self):
148 # check invalid mode strings
149 self.open(TESTFN, 'wb').close()
150 for mode in ("", "aU", "wU+", "U+", "+U", "rU+"):
151 try:
152 f = self.open(TESTFN, mode)
153 except ValueError:
154 pass
155 else:
156 f.close()
157 self.fail('%r is an invalid file mode' % mode)
158
159 def testStdin(self):
160 if sys.platform == 'osf1V5':
161 # This causes the interpreter to exit on OSF1 v5.1.
162 self.skipTest(
163 ' sys.stdin.seek(-1) may crash the interpreter on OSF1.'
164 ' Test manually.')
165
166 if not sys.stdin.isatty():
167 # Issue 14853: stdin becomes seekable when redirected to a file
168 self.skipTest('stdin must be a TTY in this test')
169
170 with self.assertRaises((IOError, ValueError)):
171 sys.stdin.seek(-1)
172 with self.assertRaises((IOError, ValueError)):
173 sys.stdin.truncate()
174
175 def testBadModeArgument(self):
176 # verify that we get a sensible error message for bad mode argument
177 bad_mode = "qwerty"
178 try:
179 f = self.open(TESTFN, bad_mode)
180 except ValueError as msg:
181 if msg.args[0] != 0:
182 s = str(msg)
183 if TESTFN in s or bad_mode not in s:
184 self.fail("bad error message for invalid mode: %s" % s)
185 # if msg.args[0] == 0, we're probably on Windows where there may be
186 # no obvious way to discover why open() failed.
187 else:
188 f.close()
189 self.fail("no error for invalid mode: %s" % bad_mode)
190
191 def _checkBufferSize(self, s):
192 try:
193 f = self.open(TESTFN, 'wb', s)
194 f.write(str(s).encode("ascii"))
195 f.close()
196 f.close()
197 f = self.open(TESTFN, 'rb', s)
198 d = int(f.read().decode("ascii"))
199 f.close()
200 f.close()
201 except OSError as msg:
202 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
203 self.assertEqual(d, s)
204
205 def testSetBufferSize(self):
206 # make sure that explicitly setting the buffer size doesn't cause
207 # misbehaviour especially with repeated close() calls
208 for s in (-1, 0, 512):
209 with warnings_helper.check_no_warnings(self,
210 message='line buffering',
211 category=RuntimeWarning):
212 self._checkBufferSize(s)
213
214 # test that attempts to use line buffering in binary mode cause
215 # a warning
216 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'):
217 self._checkBufferSize(1)
218
219 def testTruncateOnWindows(self):
220 # SF bug <http://www.python.org/sf/801631>
221 # "file.truncate fault on windows"
222
223 f = self.open(TESTFN, 'wb')
224
225 try:
226 f.write(b'12345678901') # 11 bytes
227 f.close()
228
229 f = self.open(TESTFN,'rb+')
230 data = f.read(5)
231 if data != b'12345':
232 self.fail("Read on file opened for update failed %r" % data)
233 if f.tell() != 5:
234 self.fail("File pos after read wrong %d" % f.tell())
235
236 f.truncate()
237 if f.tell() != 5:
238 self.fail("File pos after ftruncate wrong %d" % f.tell())
239
240 f.close()
241 size = os.path.getsize(TESTFN)
242 if size != 5:
243 self.fail("File size after ftruncate wrong %d" % size)
244 finally:
245 f.close()
246
247 def testIteration(self):
248 # Test the complex interaction when mixing file-iteration and the
249 # various read* methods.
250 dataoffset = 16384
251 filler = b"ham\n"
252 assert not dataoffset % len(filler), \
253 "dataoffset must be multiple of len(filler)"
254 nchunks = dataoffset // len(filler)
255 testlines = [
256 b"spam, spam and eggs\n",
257 b"eggs, spam, ham and spam\n",
258 b"saussages, spam, spam and eggs\n",
259 b"spam, ham, spam and eggs\n",
260 b"spam, spam, spam, spam, spam, ham, spam\n",
261 b"wonderful spaaaaaam.\n"
262 ]
263 methods = [("readline", ()), ("read", ()), ("readlines", ()),
264 ("readinto", (array("b", b" "*100),))]
265
266 # Prepare the testfile
267 bag = self.open(TESTFN, "wb")
268 bag.write(filler * nchunks)
269 bag.writelines(testlines)
270 bag.close()
271 # Test for appropriate errors mixing read* and iteration
272 for methodname, args in methods:
273 f = self.open(TESTFN, 'rb')
274 self.assertEqual(next(f), filler)
275 meth = getattr(f, methodname)
276 meth(*args) # This simply shouldn't fail
277 f.close()
278
279 # Test to see if harmless (by accident) mixing of read* and
280 # iteration still works. This depends on the size of the internal
281 # iteration buffer (currently 8192,) but we can test it in a
282 # flexible manner. Each line in the bag o' ham is 4 bytes
283 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
284 # exactly on the buffer boundary for any power-of-2 buffersize
285 # between 4 and 16384 (inclusive).
286 f = self.open(TESTFN, 'rb')
287 for i in range(nchunks):
288 next(f)
289 testline = testlines.pop(0)
290 try:
291 line = f.readline()
292 except ValueError:
293 self.fail("readline() after next() with supposedly empty "
294 "iteration-buffer failed anyway")
295 if line != testline:
296 self.fail("readline() after next() with empty buffer "
297 "failed. Got %r, expected %r" % (line, testline))
298 testline = testlines.pop(0)
299 buf = array("b", b"\x00" * len(testline))
300 try:
301 f.readinto(buf)
302 except ValueError:
303 self.fail("readinto() after next() with supposedly empty "
304 "iteration-buffer failed anyway")
305 line = buf.tobytes()
306 if line != testline:
307 self.fail("readinto() after next() with empty buffer "
308 "failed. Got %r, expected %r" % (line, testline))
309
310 testline = testlines.pop(0)
311 try:
312 line = f.read(len(testline))
313 except ValueError:
314 self.fail("read() after next() with supposedly empty "
315 "iteration-buffer failed anyway")
316 if line != testline:
317 self.fail("read() after next() with empty buffer "
318 "failed. Got %r, expected %r" % (line, testline))
319 try:
320 lines = f.readlines()
321 except ValueError:
322 self.fail("readlines() after next() with supposedly empty "
323 "iteration-buffer failed anyway")
324 if lines != testlines:
325 self.fail("readlines() after next() with empty buffer "
326 "failed. Got %r, expected %r" % (line, testline))
327 f.close()
328
329 # Reading after iteration hit EOF shouldn't hurt either
330 f = self.open(TESTFN, 'rb')
331 try:
332 for line in f:
333 pass
334 try:
335 f.readline()
336 f.readinto(buf)
337 f.read()
338 f.readlines()
339 except ValueError:
340 self.fail("read* failed after next() consumed file")
341 finally:
342 f.close()
343
344 class ESC[4;38;5;81mCOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
345 open = io.open
346
347 class ESC[4;38;5;81mPyOtherFileTests(ESC[4;38;5;149mOtherFileTests, ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
348 open = staticmethod(pyio.open)
349
350
351 if __name__ == '__main__':
352 unittest.main()