(root)/
Python-3.12.0/
Lib/
test/
test_pipes.py
       1  import os
       2  import string
       3  import unittest
       4  import shutil
       5  from test.support import reap_children, unix_shell
       6  from test.support.os_helper import TESTFN, unlink
       7  from test.support.warnings_helper import import_deprecated
       8  
       9  pipes = import_deprecated("pipes")
      10  
      11  
      12  if os.name != 'posix':
      13      raise unittest.SkipTest('pipes module only works on posix')
      14  
      15  if not (unix_shell and os.path.exists(unix_shell)):
      16      raise unittest.SkipTest('pipes module requires a shell')
      17  
      18  TESTFN2 = TESTFN + "2"
      19  
      20  # tr a-z A-Z is not portable, so make the ranges explicit
      21  s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
      22  
      23  class ESC[4;38;5;81mSimplePipeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      24      def tearDown(self):
      25          for f in (TESTFN, TESTFN2):
      26              unlink(f)
      27  
      28      def testSimplePipe1(self):
      29          if shutil.which('tr') is None:
      30              self.skipTest('tr is not available')
      31          t = pipes.Template()
      32          t.append(s_command, pipes.STDIN_STDOUT)
      33          with t.open(TESTFN, 'w') as f:
      34              f.write('hello world #1')
      35          with open(TESTFN) as f:
      36              self.assertEqual(f.read(), 'HELLO WORLD #1')
      37  
      38      def testSimplePipe2(self):
      39          if shutil.which('tr') is None:
      40              self.skipTest('tr is not available')
      41          with open(TESTFN, 'w') as f:
      42              f.write('hello world #2')
      43          t = pipes.Template()
      44          t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
      45          t.copy(TESTFN, TESTFN2)
      46          with open(TESTFN2) as f:
      47              self.assertEqual(f.read(), 'HELLO WORLD #2')
      48  
      49      def testSimplePipe3(self):
      50          if shutil.which('tr') is None:
      51              self.skipTest('tr is not available')
      52          with open(TESTFN, 'w') as f:
      53              f.write('hello world #2')
      54          t = pipes.Template()
      55          t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
      56          f = t.open(TESTFN, 'r')
      57          try:
      58              self.assertEqual(f.read(), 'HELLO WORLD #2')
      59          finally:
      60              f.close()
      61  
      62      def testEmptyPipeline1(self):
      63          # copy through empty pipe
      64          d = 'empty pipeline test COPY'
      65          with open(TESTFN, 'w') as f:
      66              f.write(d)
      67          with open(TESTFN2, 'w') as f:
      68              f.write('')
      69          t=pipes.Template()
      70          t.copy(TESTFN, TESTFN2)
      71          with open(TESTFN2) as f:
      72              self.assertEqual(f.read(), d)
      73  
      74      def testEmptyPipeline2(self):
      75          # read through empty pipe
      76          d = 'empty pipeline test READ'
      77          with open(TESTFN, 'w') as f:
      78              f.write(d)
      79          t=pipes.Template()
      80          f = t.open(TESTFN, 'r')
      81          try:
      82              self.assertEqual(f.read(), d)
      83          finally:
      84              f.close()
      85  
      86      def testEmptyPipeline3(self):
      87          # write through empty pipe
      88          d = 'empty pipeline test WRITE'
      89          t = pipes.Template()
      90          with t.open(TESTFN, 'w') as f:
      91              f.write(d)
      92          with open(TESTFN) as f:
      93              self.assertEqual(f.read(), d)
      94  
      95      def testRepr(self):
      96          t = pipes.Template()
      97          self.assertEqual(repr(t), "<Template instance, steps=[]>")
      98          t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
      99          self.assertEqual(repr(t),
     100                      "<Template instance, steps=[('tr a-z A-Z', '--')]>")
     101  
     102      def testSetDebug(self):
     103          t = pipes.Template()
     104          t.debug(False)
     105          self.assertEqual(t.debugging, False)
     106          t.debug(True)
     107          self.assertEqual(t.debugging, True)
     108  
     109      def testReadOpenSink(self):
     110          # check calling open('r') on a pipe ending with
     111          # a sink raises ValueError
     112          t = pipes.Template()
     113          t.append('boguscmd', pipes.SINK)
     114          self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
     115  
     116      def testWriteOpenSource(self):
     117          # check calling open('w') on a pipe ending with
     118          # a source raises ValueError
     119          t = pipes.Template()
     120          t.prepend('boguscmd', pipes.SOURCE)
     121          self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
     122  
     123      def testBadAppendOptions(self):
     124          t = pipes.Template()
     125  
     126          # try a non-string command
     127          self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
     128  
     129          # try a type that isn't recognized
     130          self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
     131  
     132          # shouldn't be able to append a source
     133          self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
     134  
     135          # check appending two sinks
     136          t = pipes.Template()
     137          t.append('boguscmd', pipes.SINK)
     138          self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
     139  
     140          # command needing file input but with no $IN
     141          t = pipes.Template()
     142          self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
     143                             pipes.FILEIN_FILEOUT)
     144          t = pipes.Template()
     145          self.assertRaises(ValueError, t.append, 'boguscmd',
     146                             pipes.FILEIN_STDOUT)
     147  
     148          # command needing file output but with no $OUT
     149          t = pipes.Template()
     150          self.assertRaises(ValueError, t.append, 'boguscmd $IN',
     151                             pipes.FILEIN_FILEOUT)
     152          t = pipes.Template()
     153          self.assertRaises(ValueError, t.append, 'boguscmd',
     154                             pipes.STDIN_FILEOUT)
     155  
     156  
     157      def testBadPrependOptions(self):
     158          t = pipes.Template()
     159  
     160          # try a non-string command
     161          self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
     162  
     163          # try a type that isn't recognized
     164          self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
     165  
     166          # shouldn't be able to prepend a sink
     167          self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
     168  
     169          # check prepending two sources
     170          t = pipes.Template()
     171          t.prepend('boguscmd', pipes.SOURCE)
     172          self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
     173  
     174          # command needing file input but with no $IN
     175          t = pipes.Template()
     176          self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
     177                             pipes.FILEIN_FILEOUT)
     178          t = pipes.Template()
     179          self.assertRaises(ValueError, t.prepend, 'boguscmd',
     180                             pipes.FILEIN_STDOUT)
     181  
     182          # command needing file output but with no $OUT
     183          t = pipes.Template()
     184          self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
     185                             pipes.FILEIN_FILEOUT)
     186          t = pipes.Template()
     187          self.assertRaises(ValueError, t.prepend, 'boguscmd',
     188                             pipes.STDIN_FILEOUT)
     189  
     190      def testBadOpenMode(self):
     191          t = pipes.Template()
     192          self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
     193  
     194      def testClone(self):
     195          t = pipes.Template()
     196          t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
     197  
     198          u = t.clone()
     199          self.assertNotEqual(id(t), id(u))
     200          self.assertEqual(t.steps, u.steps)
     201          self.assertNotEqual(id(t.steps), id(u.steps))
     202          self.assertEqual(t.debugging, u.debugging)
     203  
     204  
     205  def tearDownModule():
     206      reap_children()
     207  
     208  
     209  if __name__ == "__main__":
     210      unittest.main()