python (3.11.7)
1 import socket
2 import selectors
3 import threading
4 import contextlib
5
6 from test import support
7 from test.support import socket_helper, warnings_helper
8 import unittest
9
10 support.requires_working_socket(module=True)
11
12 telnetlib = warnings_helper.import_deprecated('telnetlib')
13
14 HOST = socket_helper.HOST
15
16 def server(evt, serv):
17 serv.listen()
18 evt.set()
19 try:
20 conn, addr = serv.accept()
21 conn.close()
22 except TimeoutError:
23 pass
24 finally:
25 serv.close()
26
27 class ESC[4;38;5;81mGeneralTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
28
29 def setUp(self):
30 self.evt = threading.Event()
31 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32 self.sock.settimeout(60) # Safety net. Look issue 11812
33 self.port = socket_helper.bind_port(self.sock)
34 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
35 self.thread.daemon = True
36 self.thread.start()
37 self.evt.wait()
38
39 def tearDown(self):
40 self.thread.join()
41 del self.thread # Clear out any dangling Thread objects.
42
43 def testBasic(self):
44 # connects
45 telnet = telnetlib.Telnet(HOST, self.port)
46 telnet.sock.close()
47
48 def testContextManager(self):
49 with telnetlib.Telnet(HOST, self.port) as tn:
50 self.assertIsNotNone(tn.get_socket())
51 self.assertIsNone(tn.get_socket())
52
53 def testTimeoutDefault(self):
54 self.assertTrue(socket.getdefaulttimeout() is None)
55 socket.setdefaulttimeout(30)
56 try:
57 telnet = telnetlib.Telnet(HOST, self.port)
58 finally:
59 socket.setdefaulttimeout(None)
60 self.assertEqual(telnet.sock.gettimeout(), 30)
61 telnet.sock.close()
62
63 def testTimeoutNone(self):
64 # None, having other default
65 self.assertTrue(socket.getdefaulttimeout() is None)
66 socket.setdefaulttimeout(30)
67 try:
68 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
69 finally:
70 socket.setdefaulttimeout(None)
71 self.assertTrue(telnet.sock.gettimeout() is None)
72 telnet.sock.close()
73
74 def testTimeoutValue(self):
75 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
76 self.assertEqual(telnet.sock.gettimeout(), 30)
77 telnet.sock.close()
78
79 def testTimeoutOpen(self):
80 telnet = telnetlib.Telnet()
81 telnet.open(HOST, self.port, timeout=30)
82 self.assertEqual(telnet.sock.gettimeout(), 30)
83 telnet.sock.close()
84
85 def testGetters(self):
86 # Test telnet getter methods
87 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
88 t_sock = telnet.sock
89 self.assertEqual(telnet.get_socket(), t_sock)
90 self.assertEqual(telnet.fileno(), t_sock.fileno())
91 telnet.sock.close()
92
93 class ESC[4;38;5;81mSocketStub(ESC[4;38;5;149mobject):
94 ''' a socket proxy that re-defines sendall() '''
95 def __init__(self, reads=()):
96 self.reads = list(reads) # Intentionally make a copy.
97 self.writes = []
98 self.block = False
99 def sendall(self, data):
100 self.writes.append(data)
101 def recv(self, size):
102 out = b''
103 while self.reads and len(out) < size:
104 out += self.reads.pop(0)
105 if len(out) > size:
106 self.reads.insert(0, out[size:])
107 out = out[:size]
108 return out
109
110 class ESC[4;38;5;81mTelnetAlike(ESC[4;38;5;149mtelnetlibESC[4;38;5;149m.ESC[4;38;5;149mTelnet):
111 def fileno(self):
112 raise NotImplementedError()
113 def close(self): pass
114 def sock_avail(self):
115 return (not self.sock.block)
116 def msg(self, msg, *args):
117 with support.captured_stdout() as out:
118 telnetlib.Telnet.msg(self, msg, *args)
119 self._messages += out.getvalue()
120 return
121
122 class ESC[4;38;5;81mMockSelector(ESC[4;38;5;149mselectorsESC[4;38;5;149m.ESC[4;38;5;149mBaseSelector):
123
124 def __init__(self):
125 self.keys = {}
126
127 @property
128 def resolution(self):
129 return 1e-3
130
131 def register(self, fileobj, events, data=None):
132 key = selectors.SelectorKey(fileobj, 0, events, data)
133 self.keys[fileobj] = key
134 return key
135
136 def unregister(self, fileobj):
137 return self.keys.pop(fileobj)
138
139 def select(self, timeout=None):
140 block = False
141 for fileobj in self.keys:
142 if isinstance(fileobj, TelnetAlike):
143 block = fileobj.sock.block
144 break
145 if block:
146 return []
147 else:
148 return [(key, key.events) for key in self.keys.values()]
149
150 def get_map(self):
151 return self.keys
152
153
154 @contextlib.contextmanager
155 def test_socket(reads):
156 def new_conn(*ignored):
157 return SocketStub(reads)
158 try:
159 old_conn = socket.create_connection
160 socket.create_connection = new_conn
161 yield None
162 finally:
163 socket.create_connection = old_conn
164 return
165
166 def test_telnet(reads=(), cls=TelnetAlike):
167 ''' return a telnetlib.Telnet object that uses a SocketStub with
168 reads queued up to be read '''
169 for x in reads:
170 assert type(x) is bytes, x
171 with test_socket(reads):
172 telnet = cls('dummy', 0)
173 telnet._messages = '' # debuglevel output
174 return telnet
175
176 class ESC[4;38;5;81mExpectAndReadTestCase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
177 def setUp(self):
178 self.old_selector = telnetlib._TelnetSelector
179 telnetlib._TelnetSelector = MockSelector
180 def tearDown(self):
181 telnetlib._TelnetSelector = self.old_selector
182
183 class ESC[4;38;5;81mReadTests(ESC[4;38;5;149mExpectAndReadTestCase):
184 def test_read_until(self):
185 """
186 read_until(expected, timeout=None)
187 test the blocking version of read_util
188 """
189 want = [b'xxxmatchyyy']
190 telnet = test_telnet(want)
191 data = telnet.read_until(b'match')
192 self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
193
194 reads = [b'x' * 50, b'match', b'y' * 50]
195 expect = b''.join(reads[:-1])
196 telnet = test_telnet(reads)
197 data = telnet.read_until(b'match')
198 self.assertEqual(data, expect)
199
200
201 def test_read_all(self):
202 """
203 read_all()
204 Read all data until EOF; may block.
205 """
206 reads = [b'x' * 500, b'y' * 500, b'z' * 500]
207 expect = b''.join(reads)
208 telnet = test_telnet(reads)
209 data = telnet.read_all()
210 self.assertEqual(data, expect)
211 return
212
213 def test_read_some(self):
214 """
215 read_some()
216 Read at least one byte or EOF; may block.
217 """
218 # test 'at least one byte'
219 telnet = test_telnet([b'x' * 500])
220 data = telnet.read_some()
221 self.assertTrue(len(data) >= 1)
222 # test EOF
223 telnet = test_telnet()
224 data = telnet.read_some()
225 self.assertEqual(b'', data)
226
227 def _read_eager(self, func_name):
228 """
229 read_*_eager()
230 Read all data available already queued or on the socket,
231 without blocking.
232 """
233 want = b'x' * 100
234 telnet = test_telnet([want])
235 func = getattr(telnet, func_name)
236 telnet.sock.block = True
237 self.assertEqual(b'', func())
238 telnet.sock.block = False
239 data = b''
240 while True:
241 try:
242 data += func()
243 except EOFError:
244 break
245 self.assertEqual(data, want)
246
247 def test_read_eager(self):
248 # read_eager and read_very_eager make the same guarantees
249 # (they behave differently but we only test the guarantees)
250 self._read_eager('read_eager')
251 self._read_eager('read_very_eager')
252 # NB -- we need to test the IAC block which is mentioned in the
253 # docstring but not in the module docs
254
255 def read_very_lazy(self):
256 want = b'x' * 100
257 telnet = test_telnet([want])
258 self.assertEqual(b'', telnet.read_very_lazy())
259 while telnet.sock.reads:
260 telnet.fill_rawq()
261 data = telnet.read_very_lazy()
262 self.assertEqual(want, data)
263 self.assertRaises(EOFError, telnet.read_very_lazy)
264
265 def test_read_lazy(self):
266 want = b'x' * 100
267 telnet = test_telnet([want])
268 self.assertEqual(b'', telnet.read_lazy())
269 data = b''
270 while True:
271 try:
272 read_data = telnet.read_lazy()
273 data += read_data
274 if not read_data:
275 telnet.fill_rawq()
276 except EOFError:
277 break
278 self.assertTrue(want.startswith(data))
279 self.assertEqual(data, want)
280
281 class ESC[4;38;5;81mnego_collector(ESC[4;38;5;149mobject):
282 def __init__(self, sb_getter=None):
283 self.seen = b''
284 self.sb_getter = sb_getter
285 self.sb_seen = b''
286
287 def do_nego(self, sock, cmd, opt):
288 self.seen += cmd + opt
289 if cmd == tl.SE and self.sb_getter:
290 sb_data = self.sb_getter()
291 self.sb_seen += sb_data
292
293 tl = telnetlib
294
295 class ESC[4;38;5;81mWriteTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
296 '''The only thing that write does is replace each tl.IAC for
297 tl.IAC+tl.IAC'''
298
299 def test_write(self):
300 data_sample = [b'data sample without IAC',
301 b'data sample with' + tl.IAC + b' one IAC',
302 b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
303 tl.IAC,
304 b'']
305 for data in data_sample:
306 telnet = test_telnet()
307 telnet.write(data)
308 written = b''.join(telnet.sock.writes)
309 self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
310
311 class ESC[4;38;5;81mOptionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
312 # RFC 854 commands
313 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
314
315 def _test_command(self, data):
316 """ helper for testing IAC + cmd """
317 telnet = test_telnet(data)
318 data_len = len(b''.join(data))
319 nego = nego_collector()
320 telnet.set_option_negotiation_callback(nego.do_nego)
321 txt = telnet.read_all()
322 cmd = nego.seen
323 self.assertTrue(len(cmd) > 0) # we expect at least one command
324 self.assertIn(cmd[:1], self.cmds)
325 self.assertEqual(cmd[1:2], tl.NOOPT)
326 self.assertEqual(data_len, len(txt + cmd))
327 nego.sb_getter = None # break the nego => telnet cycle
328
329 def test_IAC_commands(self):
330 for cmd in self.cmds:
331 self._test_command([tl.IAC, cmd])
332 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
333 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
334 # all at once
335 self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
336
337 def test_SB_commands(self):
338 # RFC 855, subnegotiations portion
339 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
340 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
341 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
342 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
343 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
344 ]
345 telnet = test_telnet(send)
346 nego = nego_collector(telnet.read_sb_data)
347 telnet.set_option_negotiation_callback(nego.do_nego)
348 txt = telnet.read_all()
349 self.assertEqual(txt, b'')
350 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd'
351 self.assertEqual(nego.sb_seen, want_sb_data)
352 self.assertEqual(b'', telnet.read_sb_data())
353 nego.sb_getter = None # break the nego => telnet cycle
354
355 def test_debuglevel_reads(self):
356 # test all the various places that self.msg(...) is called
357 given_a_expect_b = [
358 # Telnet.fill_rawq
359 (b'a', ": recv b''\n"),
360 # Telnet.process_rawq
361 (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
362 (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
363 (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
364 (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
365 (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
366 ]
367 for a, b in given_a_expect_b:
368 telnet = test_telnet([a])
369 telnet.set_debuglevel(1)
370 txt = telnet.read_all()
371 self.assertIn(b, telnet._messages)
372 return
373
374 def test_debuglevel_write(self):
375 telnet = test_telnet()
376 telnet.set_debuglevel(1)
377 telnet.write(b'xxx')
378 expected = "send b'xxx'\n"
379 self.assertIn(expected, telnet._messages)
380
381 def test_debug_accepts_str_port(self):
382 # Issue 10695
383 with test_socket([]):
384 telnet = TelnetAlike('dummy', '0')
385 telnet._messages = ''
386 telnet.set_debuglevel(1)
387 telnet.msg('test')
388 self.assertRegex(telnet._messages, r'0.*test')
389
390
391 class ESC[4;38;5;81mExpectTests(ESC[4;38;5;149mExpectAndReadTestCase):
392 def test_expect(self):
393 """
394 expect(expected, [timeout])
395 Read until the expected string has been seen, or a timeout is
396 hit (default is no timeout); may block.
397 """
398 want = [b'x' * 10, b'match', b'y' * 10]
399 telnet = test_telnet(want)
400 (_,_,data) = telnet.expect([b'match'])
401 self.assertEqual(data, b''.join(want[:-1]))
402
403
404 if __name__ == '__main__':
405 unittest.main()