1 import os
2 import string
3 import unittest
4 import shutil
5 from test.support import reap_children, unix_shell
6 from test.support.os_helper import TESTFN, unlink
7 from test.support.warnings_helper import import_deprecated
8
9 pipes = import_deprecated("pipes")
10
11
12 if os.name != 'posix':
13 raise unittest.SkipTest('pipes module only works on posix')
14
15 if not (unix_shell and os.path.exists(unix_shell)):
16 raise unittest.SkipTest('pipes module requires a shell')
17
18 TESTFN2 = TESTFN + "2"
19
20 # tr a-z A-Z is not portable, so make the ranges explicit
21 s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
22
23 class ESC[4;38;5;81mSimplePipeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
24 def tearDown(self):
25 for f in (TESTFN, TESTFN2):
26 unlink(f)
27
28 def testSimplePipe1(self):
29 if shutil.which('tr') is None:
30 self.skipTest('tr is not available')
31 t = pipes.Template()
32 t.append(s_command, pipes.STDIN_STDOUT)
33 with t.open(TESTFN, 'w') as f:
34 f.write('hello world #1')
35 with open(TESTFN) as f:
36 self.assertEqual(f.read(), 'HELLO WORLD #1')
37
38 def testSimplePipe2(self):
39 if shutil.which('tr') is None:
40 self.skipTest('tr is not available')
41 with open(TESTFN, 'w') as f:
42 f.write('hello world #2')
43 t = pipes.Template()
44 t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
45 t.copy(TESTFN, TESTFN2)
46 with open(TESTFN2) as f:
47 self.assertEqual(f.read(), 'HELLO WORLD #2')
48
49 def testSimplePipe3(self):
50 if shutil.which('tr') is None:
51 self.skipTest('tr is not available')
52 with open(TESTFN, 'w') as f:
53 f.write('hello world #2')
54 t = pipes.Template()
55 t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
56 f = t.open(TESTFN, 'r')
57 try:
58 self.assertEqual(f.read(), 'HELLO WORLD #2')
59 finally:
60 f.close()
61
62 def testEmptyPipeline1(self):
63 # copy through empty pipe
64 d = 'empty pipeline test COPY'
65 with open(TESTFN, 'w') as f:
66 f.write(d)
67 with open(TESTFN2, 'w') as f:
68 f.write('')
69 t=pipes.Template()
70 t.copy(TESTFN, TESTFN2)
71 with open(TESTFN2) as f:
72 self.assertEqual(f.read(), d)
73
74 def testEmptyPipeline2(self):
75 # read through empty pipe
76 d = 'empty pipeline test READ'
77 with open(TESTFN, 'w') as f:
78 f.write(d)
79 t=pipes.Template()
80 f = t.open(TESTFN, 'r')
81 try:
82 self.assertEqual(f.read(), d)
83 finally:
84 f.close()
85
86 def testEmptyPipeline3(self):
87 # write through empty pipe
88 d = 'empty pipeline test WRITE'
89 t = pipes.Template()
90 with t.open(TESTFN, 'w') as f:
91 f.write(d)
92 with open(TESTFN) as f:
93 self.assertEqual(f.read(), d)
94
95 def testRepr(self):
96 t = pipes.Template()
97 self.assertEqual(repr(t), "<Template instance, steps=[]>")
98 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
99 self.assertEqual(repr(t),
100 "<Template instance, steps=[('tr a-z A-Z', '--')]>")
101
102 def testSetDebug(self):
103 t = pipes.Template()
104 t.debug(False)
105 self.assertEqual(t.debugging, False)
106 t.debug(True)
107 self.assertEqual(t.debugging, True)
108
109 def testReadOpenSink(self):
110 # check calling open('r') on a pipe ending with
111 # a sink raises ValueError
112 t = pipes.Template()
113 t.append('boguscmd', pipes.SINK)
114 self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
115
116 def testWriteOpenSource(self):
117 # check calling open('w') on a pipe ending with
118 # a source raises ValueError
119 t = pipes.Template()
120 t.prepend('boguscmd', pipes.SOURCE)
121 self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
122
123 def testBadAppendOptions(self):
124 t = pipes.Template()
125
126 # try a non-string command
127 self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
128
129 # try a type that isn't recognized
130 self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
131
132 # shouldn't be able to append a source
133 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
134
135 # check appending two sinks
136 t = pipes.Template()
137 t.append('boguscmd', pipes.SINK)
138 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
139
140 # command needing file input but with no $IN
141 t = pipes.Template()
142 self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
143 pipes.FILEIN_FILEOUT)
144 t = pipes.Template()
145 self.assertRaises(ValueError, t.append, 'boguscmd',
146 pipes.FILEIN_STDOUT)
147
148 # command needing file output but with no $OUT
149 t = pipes.Template()
150 self.assertRaises(ValueError, t.append, 'boguscmd $IN',
151 pipes.FILEIN_FILEOUT)
152 t = pipes.Template()
153 self.assertRaises(ValueError, t.append, 'boguscmd',
154 pipes.STDIN_FILEOUT)
155
156
157 def testBadPrependOptions(self):
158 t = pipes.Template()
159
160 # try a non-string command
161 self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
162
163 # try a type that isn't recognized
164 self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
165
166 # shouldn't be able to prepend a sink
167 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
168
169 # check prepending two sources
170 t = pipes.Template()
171 t.prepend('boguscmd', pipes.SOURCE)
172 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
173
174 # command needing file input but with no $IN
175 t = pipes.Template()
176 self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
177 pipes.FILEIN_FILEOUT)
178 t = pipes.Template()
179 self.assertRaises(ValueError, t.prepend, 'boguscmd',
180 pipes.FILEIN_STDOUT)
181
182 # command needing file output but with no $OUT
183 t = pipes.Template()
184 self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
185 pipes.FILEIN_FILEOUT)
186 t = pipes.Template()
187 self.assertRaises(ValueError, t.prepend, 'boguscmd',
188 pipes.STDIN_FILEOUT)
189
190 def testBadOpenMode(self):
191 t = pipes.Template()
192 self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
193
194 def testClone(self):
195 t = pipes.Template()
196 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
197
198 u = t.clone()
199 self.assertNotEqual(id(t), id(u))
200 self.assertEqual(t.steps, u.steps)
201 self.assertNotEqual(id(t.steps), id(u.steps))
202 self.assertEqual(t.debugging, u.debugging)
203
204
205 def tearDownModule():
206 reap_children()
207
208
209 if __name__ == "__main__":
210 unittest.main()