python (3.12.0)
1 import doctest
2 import unittest
3 from test import support
4 from test.support import threading_helper
5 from itertools import *
6 import weakref
7 from decimal import Decimal
8 from fractions import Fraction
9 import operator
10 import random
11 import copy
12 import pickle
13 from functools import reduce
14 import sys
15 import struct
16 import threading
17 import gc
18 import warnings
19
20 def pickle_deprecated(testfunc):
21 """ Run the test three times.
22 First, verify that a Deprecation Warning is raised.
23 Second, run normally but with DeprecationWarnings temporarily disabled.
24 Third, run with warnings promoted to errors.
25 """
26 def inner(self):
27 with self.assertWarns(DeprecationWarning):
28 testfunc(self)
29 with warnings.catch_warnings():
30 warnings.simplefilter("ignore", category=DeprecationWarning)
31 testfunc(self)
32 with warnings.catch_warnings():
33 warnings.simplefilter("error", category=DeprecationWarning)
34 with self.assertRaises((DeprecationWarning, AssertionError, SystemError)):
35 testfunc(self)
36
37 return inner
38
39 maxsize = support.MAX_Py_ssize_t
40 minsize = -maxsize-1
41
42 def lzip(*args):
43 return list(zip(*args))
44
45 def onearg(x):
46 'Test function of one argument'
47 return 2*x
48
49 def errfunc(*args):
50 'Test function that raises an error'
51 raise ValueError
52
53 def gen3():
54 'Non-restartable source sequence'
55 for i in (0, 1, 2):
56 yield i
57
58 def isEven(x):
59 'Test predicate'
60 return x%2==0
61
62 def isOdd(x):
63 'Test predicate'
64 return x%2==1
65
66 def tupleize(*args):
67 return args
68
69 def irange(n):
70 for i in range(n):
71 yield i
72
73 class ESC[4;38;5;81mStopNow:
74 'Class emulating an empty iterable.'
75 def __iter__(self):
76 return self
77 def __next__(self):
78 raise StopIteration
79
80 def take(n, seq):
81 'Convenience function for partially consuming a long of infinite iterable'
82 return list(islice(seq, n))
83
84 def prod(iterable):
85 return reduce(operator.mul, iterable, 1)
86
87 def fact(n):
88 'Factorial'
89 return prod(range(1, n+1))
90
91 # root level methods for pickling ability
92 def testR(r):
93 return r[0]
94
95 def testR2(r):
96 return r[2]
97
98 def underten(x):
99 return x<10
100
101 picklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto))
102 for proto in range(pickle.HIGHEST_PROTOCOL + 1)]
103
104 class ESC[4;38;5;81mTestBasicOps(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
105
106 def pickletest(self, protocol, it, stop=4, take=1, compare=None):
107 """Test that an iterator is the same after pickling, also when part-consumed"""
108 def expand(it, i=0):
109 # Recursively expand iterables, within sensible bounds
110 if i > 10:
111 raise RuntimeError("infinite recursion encountered")
112 if isinstance(it, str):
113 return it
114 try:
115 l = list(islice(it, stop))
116 except TypeError:
117 return it # can't expand it
118 return [expand(e, i+1) for e in l]
119
120 # Test the initial copy against the original
121 dump = pickle.dumps(it, protocol)
122 i2 = pickle.loads(dump)
123 self.assertEqual(type(it), type(i2))
124 a, b = expand(it), expand(i2)
125 self.assertEqual(a, b)
126 if compare:
127 c = expand(compare)
128 self.assertEqual(a, c)
129
130 # Take from the copy, and create another copy and compare them.
131 i3 = pickle.loads(dump)
132 took = 0
133 try:
134 for i in range(take):
135 next(i3)
136 took += 1
137 except StopIteration:
138 pass #in case there is less data than 'take'
139 dump = pickle.dumps(i3, protocol)
140 i4 = pickle.loads(dump)
141 a, b = expand(i3), expand(i4)
142 self.assertEqual(a, b)
143 if compare:
144 c = expand(compare[took:])
145 self.assertEqual(a, c);
146
147 @pickle_deprecated
148 def test_accumulate(self):
149 self.assertEqual(list(accumulate(range(10))), # one positional arg
150 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])
151 self.assertEqual(list(accumulate(iterable=range(10))), # kw arg
152 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])
153 for typ in int, complex, Decimal, Fraction: # multiple types
154 self.assertEqual(
155 list(accumulate(map(typ, range(10)))),
156 list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45])))
157 self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric
158 self.assertEqual(list(accumulate([])), []) # empty iterable
159 self.assertEqual(list(accumulate([7])), [7]) # iterable of length one
160 self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args
161 self.assertRaises(TypeError, accumulate) # too few args
162 self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg
163 self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add
164
165 s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6]
166 self.assertEqual(list(accumulate(s, min)),
167 [2, 2, 2, 2, 2, 0, 0, 0, 0, 0])
168 self.assertEqual(list(accumulate(s, max)),
169 [2, 8, 9, 9, 9, 9, 9, 9, 9, 9])
170 self.assertEqual(list(accumulate(s, operator.mul)),
171 [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0])
172 with self.assertRaises(TypeError):
173 list(accumulate(s, chr)) # unary-operation
174 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
175 self.pickletest(proto, accumulate(range(10))) # test pickling
176 self.pickletest(proto, accumulate(range(10), initial=7))
177 self.assertEqual(list(accumulate([10, 5, 1], initial=None)), [10, 15, 16])
178 self.assertEqual(list(accumulate([10, 5, 1], initial=100)), [100, 110, 115, 116])
179 self.assertEqual(list(accumulate([], initial=100)), [100])
180 with self.assertRaises(TypeError):
181 list(accumulate([10, 20], 100))
182
183 def test_batched(self):
184 self.assertEqual(list(batched('ABCDEFG', 3)),
185 [('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)])
186 self.assertEqual(list(batched('ABCDEFG', 2)),
187 [('A', 'B'), ('C', 'D'), ('E', 'F'), ('G',)])
188 self.assertEqual(list(batched('ABCDEFG', 1)),
189 [('A',), ('B',), ('C',), ('D',), ('E',), ('F',), ('G',)])
190
191 with self.assertRaises(TypeError): # Too few arguments
192 list(batched('ABCDEFG'))
193 with self.assertRaises(TypeError):
194 list(batched('ABCDEFG', 3, None)) # Too many arguments
195 with self.assertRaises(TypeError):
196 list(batched(None, 3)) # Non-iterable input
197 with self.assertRaises(TypeError):
198 list(batched('ABCDEFG', 'hello')) # n is a string
199 with self.assertRaises(ValueError):
200 list(batched('ABCDEFG', 0)) # n is zero
201 with self.assertRaises(ValueError):
202 list(batched('ABCDEFG', -1)) # n is negative
203
204 data = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
205 for n in range(1, 6):
206 for i in range(len(data)):
207 s = data[:i]
208 batches = list(batched(s, n))
209 with self.subTest(s=s, n=n, batches=batches):
210 # Order is preserved and no data is lost
211 self.assertEqual(''.join(chain(*batches)), s)
212 # Each batch is an exact tuple
213 self.assertTrue(all(type(batch) is tuple for batch in batches))
214 # All but the last batch is of size n
215 if batches:
216 last_batch = batches.pop()
217 self.assertTrue(all(len(batch) == n for batch in batches))
218 self.assertTrue(len(last_batch) <= n)
219 batches.append(last_batch)
220
221 def test_chain(self):
222
223 def chain2(*iterables):
224 'Pure python version in the docs'
225 for it in iterables:
226 for element in it:
227 yield element
228
229 for c in (chain, chain2):
230 self.assertEqual(list(c('abc', 'def')), list('abcdef'))
231 self.assertEqual(list(c('abc')), list('abc'))
232 self.assertEqual(list(c('')), [])
233 self.assertEqual(take(4, c('abc', 'def')), list('abcd'))
234 self.assertRaises(TypeError, list,c(2, 3))
235
236 def test_chain_from_iterable(self):
237 self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef'))
238 self.assertEqual(list(chain.from_iterable(['abc'])), list('abc'))
239 self.assertEqual(list(chain.from_iterable([''])), [])
240 self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd'))
241 self.assertRaises(TypeError, list, chain.from_iterable([2, 3]))
242 self.assertEqual(list(islice(chain.from_iterable(repeat(range(5))), 2)), [0, 1])
243
244 @pickle_deprecated
245 def test_chain_reducible(self):
246 for oper in [copy.deepcopy] + picklecopiers:
247 it = chain('abc', 'def')
248 self.assertEqual(list(oper(it)), list('abcdef'))
249 self.assertEqual(next(it), 'a')
250 self.assertEqual(list(oper(it)), list('bcdef'))
251
252 self.assertEqual(list(oper(chain(''))), [])
253 self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd'))
254 self.assertRaises(TypeError, list, oper(chain(2, 3)))
255 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
256 self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef'))
257
258 @pickle_deprecated
259 def test_chain_setstate(self):
260 self.assertRaises(TypeError, chain().__setstate__, ())
261 self.assertRaises(TypeError, chain().__setstate__, [])
262 self.assertRaises(TypeError, chain().__setstate__, 0)
263 self.assertRaises(TypeError, chain().__setstate__, ([],))
264 self.assertRaises(TypeError, chain().__setstate__, (iter([]), []))
265 it = chain()
266 it.__setstate__((iter(['abc', 'def']),))
267 self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f'])
268 it = chain()
269 it.__setstate__((iter(['abc', 'def']), iter(['ghi'])))
270 self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f'])
271
272 @pickle_deprecated
273 def test_combinations(self):
274 self.assertRaises(TypeError, combinations, 'abc') # missing r argument
275 self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments
276 self.assertRaises(TypeError, combinations, None) # pool is not iterable
277 self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative
278
279 for op in [lambda a:a] + picklecopiers:
280 self.assertEqual(list(op(combinations('abc', 32))), []) # r > n
281
282 self.assertEqual(list(op(combinations('ABCD', 2))),
283 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
284 testIntermediate = combinations('ABCD', 2)
285 next(testIntermediate)
286 self.assertEqual(list(op(testIntermediate)),
287 [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
288
289 self.assertEqual(list(op(combinations(range(4), 3))),
290 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)])
291 testIntermediate = combinations(range(4), 3)
292 next(testIntermediate)
293 self.assertEqual(list(op(testIntermediate)),
294 [(0,1,3), (0,2,3), (1,2,3)])
295
296 def combinations1(iterable, r):
297 'Pure python version shown in the docs'
298 pool = tuple(iterable)
299 n = len(pool)
300 if r > n:
301 return
302 indices = list(range(r))
303 yield tuple(pool[i] for i in indices)
304 while 1:
305 for i in reversed(range(r)):
306 if indices[i] != i + n - r:
307 break
308 else:
309 return
310 indices[i] += 1
311 for j in range(i+1, r):
312 indices[j] = indices[j-1] + 1
313 yield tuple(pool[i] for i in indices)
314
315 def combinations2(iterable, r):
316 'Pure python version shown in the docs'
317 pool = tuple(iterable)
318 n = len(pool)
319 for indices in permutations(range(n), r):
320 if sorted(indices) == list(indices):
321 yield tuple(pool[i] for i in indices)
322
323 def combinations3(iterable, r):
324 'Pure python version from cwr()'
325 pool = tuple(iterable)
326 n = len(pool)
327 for indices in combinations_with_replacement(range(n), r):
328 if len(set(indices)) == r:
329 yield tuple(pool[i] for i in indices)
330
331 for n in range(7):
332 values = [5*x-12 for x in range(n)]
333 for r in range(n+2):
334 result = list(combinations(values, r))
335 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs
336 self.assertEqual(len(result), len(set(result))) # no repeats
337 self.assertEqual(result, sorted(result)) # lexicographic order
338 for c in result:
339 self.assertEqual(len(c), r) # r-length combinations
340 self.assertEqual(len(set(c)), r) # no duplicate elements
341 self.assertEqual(list(c), sorted(c)) # keep original ordering
342 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable
343 self.assertEqual(list(c),
344 [e for e in values if e in c]) # comb is a subsequence of the input iterable
345 self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version
346 self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version
347 self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version
348
349 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
350 self.pickletest(proto, combinations(values, r)) # test pickling
351
352 @support.bigaddrspacetest
353 def test_combinations_overflow(self):
354 with self.assertRaises((OverflowError, MemoryError)):
355 combinations("AA", 2**29)
356
357 # Test implementation detail: tuple re-use
358 @support.impl_detail("tuple reuse is specific to CPython")
359 def test_combinations_tuple_reuse(self):
360 self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1)
361 self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1)
362
363 @pickle_deprecated
364 def test_combinations_with_replacement(self):
365 cwr = combinations_with_replacement
366 self.assertRaises(TypeError, cwr, 'abc') # missing r argument
367 self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments
368 self.assertRaises(TypeError, cwr, None) # pool is not iterable
369 self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative
370
371 for op in [lambda a:a] + picklecopiers:
372 self.assertEqual(list(op(cwr('ABC', 2))),
373 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
374 testIntermediate = cwr('ABC', 2)
375 next(testIntermediate)
376 self.assertEqual(list(op(testIntermediate)),
377 [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
378
379
380 def cwr1(iterable, r):
381 'Pure python version shown in the docs'
382 # number items returned: (n+r-1)! / r! / (n-1)! when n>0
383 pool = tuple(iterable)
384 n = len(pool)
385 if not n and r:
386 return
387 indices = [0] * r
388 yield tuple(pool[i] for i in indices)
389 while 1:
390 for i in reversed(range(r)):
391 if indices[i] != n - 1:
392 break
393 else:
394 return
395 indices[i:] = [indices[i] + 1] * (r - i)
396 yield tuple(pool[i] for i in indices)
397
398 def cwr2(iterable, r):
399 'Pure python version shown in the docs'
400 pool = tuple(iterable)
401 n = len(pool)
402 for indices in product(range(n), repeat=r):
403 if sorted(indices) == list(indices):
404 yield tuple(pool[i] for i in indices)
405
406 def numcombs(n, r):
407 if not n:
408 return 0 if r else 1
409 return fact(n+r-1) / fact(r)/ fact(n-1)
410
411 for n in range(7):
412 values = [5*x-12 for x in range(n)]
413 for r in range(n+2):
414 result = list(cwr(values, r))
415
416 self.assertEqual(len(result), numcombs(n, r)) # right number of combs
417 self.assertEqual(len(result), len(set(result))) # no repeats
418 self.assertEqual(result, sorted(result)) # lexicographic order
419
420 regular_combs = list(combinations(values, r)) # compare to combs without replacement
421 if n == 0 or r <= 1:
422 self.assertEqual(result, regular_combs) # cases that should be identical
423 else:
424 self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs
425
426 for c in result:
427 self.assertEqual(len(c), r) # r-length combinations
428 noruns = [k for k,v in groupby(c)] # combo without consecutive repeats
429 self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive
430 self.assertEqual(list(c), sorted(c)) # keep original ordering
431 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable
432 self.assertEqual(noruns,
433 [e for e in values if e in c]) # comb is a subsequence of the input iterable
434 self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version
435 self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version
436
437 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
438 self.pickletest(proto, cwr(values,r)) # test pickling
439
440 @support.bigaddrspacetest
441 def test_combinations_with_replacement_overflow(self):
442 with self.assertRaises((OverflowError, MemoryError)):
443 combinations_with_replacement("AA", 2**30)
444
445 # Test implementation detail: tuple re-use
446 @support.impl_detail("tuple reuse is specific to CPython")
447 def test_combinations_with_replacement_tuple_reuse(self):
448 cwr = combinations_with_replacement
449 self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1)
450 self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1)
451
452 @pickle_deprecated
453 def test_permutations(self):
454 self.assertRaises(TypeError, permutations) # too few arguments
455 self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments
456 self.assertRaises(TypeError, permutations, None) # pool is not iterable
457 self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative
458 self.assertEqual(list(permutations('abc', 32)), []) # r > n
459 self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None
460 self.assertEqual(list(permutations(range(3), 2)),
461 [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)])
462
463 def permutations1(iterable, r=None):
464 'Pure python version shown in the docs'
465 pool = tuple(iterable)
466 n = len(pool)
467 r = n if r is None else r
468 if r > n:
469 return
470 indices = list(range(n))
471 cycles = list(range(n-r+1, n+1))[::-1]
472 yield tuple(pool[i] for i in indices[:r])
473 while n:
474 for i in reversed(range(r)):
475 cycles[i] -= 1
476 if cycles[i] == 0:
477 indices[i:] = indices[i+1:] + indices[i:i+1]
478 cycles[i] = n - i
479 else:
480 j = cycles[i]
481 indices[i], indices[-j] = indices[-j], indices[i]
482 yield tuple(pool[i] for i in indices[:r])
483 break
484 else:
485 return
486
487 def permutations2(iterable, r=None):
488 'Pure python version shown in the docs'
489 pool = tuple(iterable)
490 n = len(pool)
491 r = n if r is None else r
492 for indices in product(range(n), repeat=r):
493 if len(set(indices)) == r:
494 yield tuple(pool[i] for i in indices)
495
496 for n in range(7):
497 values = [5*x-12 for x in range(n)]
498 for r in range(n+2):
499 result = list(permutations(values, r))
500 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms
501 self.assertEqual(len(result), len(set(result))) # no repeats
502 self.assertEqual(result, sorted(result)) # lexicographic order
503 for p in result:
504 self.assertEqual(len(p), r) # r-length permutations
505 self.assertEqual(len(set(p)), r) # no duplicate elements
506 self.assertTrue(all(e in values for e in p)) # elements taken from input iterable
507 self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version
508 self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version
509 if r == n:
510 self.assertEqual(result, list(permutations(values, None))) # test r as None
511 self.assertEqual(result, list(permutations(values))) # test default r
512
513 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
514 self.pickletest(proto, permutations(values, r)) # test pickling
515
516 @support.bigaddrspacetest
517 def test_permutations_overflow(self):
518 with self.assertRaises((OverflowError, MemoryError)):
519 permutations("A", 2**30)
520
521 @support.impl_detail("tuple reuse is specific to CPython")
522 def test_permutations_tuple_reuse(self):
523 self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1)
524 self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1)
525
526 def test_combinatorics(self):
527 # Test relationships between product(), permutations(),
528 # combinations() and combinations_with_replacement().
529
530 for n in range(6):
531 s = 'ABCDEFG'[:n]
532 for r in range(8):
533 prod = list(product(s, repeat=r))
534 cwr = list(combinations_with_replacement(s, r))
535 perm = list(permutations(s, r))
536 comb = list(combinations(s, r))
537
538 # Check size
539 self.assertEqual(len(prod), n**r)
540 self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r))
541 self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r))
542 self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r))
543
544 # Check lexicographic order without repeated tuples
545 self.assertEqual(prod, sorted(set(prod)))
546 self.assertEqual(cwr, sorted(set(cwr)))
547 self.assertEqual(perm, sorted(set(perm)))
548 self.assertEqual(comb, sorted(set(comb)))
549
550 # Check interrelationships
551 self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted
552 self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups
553 self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted
554 self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups
555 self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr
556 self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm
557 self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm
558
559 @pickle_deprecated
560 def test_compress(self):
561 self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF'))
562 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF'))
563 self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list(''))
564 self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF'))
565 self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC'))
566 self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC'))
567 n = 10000
568 data = chain.from_iterable(repeat(range(6), n))
569 selectors = chain.from_iterable(repeat((0, 1)))
570 self.assertEqual(list(compress(data, selectors)), [1,3,5] * n)
571 self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable
572 self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable
573 self.assertRaises(TypeError, compress, range(6)) # too few args
574 self.assertRaises(TypeError, compress, range(6), None) # too many args
575
576 # check copy, deepcopy, pickle
577 for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers:
578 for data, selectors, result1, result2 in [
579 ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'),
580 ('ABCDEF', [0,0,0,0,0,0], '', ''),
581 ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'),
582 ('ABCDEF', [1,0,1], 'AC', 'C'),
583 ('ABC', [0,1,1,1,1,1], 'BC', 'C'),
584 ]:
585
586 self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1))
587 self.assertEqual(list(op(compress(data, selectors))), list(result1))
588 testIntermediate = compress(data, selectors)
589 if result1:
590 next(testIntermediate)
591 self.assertEqual(list(op(testIntermediate)), list(result2))
592
593 @pickle_deprecated
594 def test_count(self):
595 self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)])
596 self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)])
597 self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)])
598 self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)])
599 self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)])
600 self.assertRaises(TypeError, count, 2, 3, 4)
601 self.assertRaises(TypeError, count, 'a')
602 self.assertEqual(take(10, count(maxsize-5)),
603 list(range(maxsize-5, maxsize+5)))
604 self.assertEqual(take(10, count(-maxsize-5)),
605 list(range(-maxsize-5, -maxsize+5)))
606 self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25])
607 self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j])
608 self.assertEqual(take(3, count(Decimal('1.1'))),
609 [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')])
610 self.assertEqual(take(3, count(Fraction(2, 3))),
611 [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)])
612 BIGINT = 1<<1000
613 self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2])
614 c = count(3)
615 self.assertEqual(repr(c), 'count(3)')
616 next(c)
617 self.assertEqual(repr(c), 'count(4)')
618 c = count(-9)
619 self.assertEqual(repr(c), 'count(-9)')
620 next(c)
621 self.assertEqual(next(c), -8)
622 self.assertEqual(repr(count(10.25)), 'count(10.25)')
623 self.assertEqual(repr(count(10.0)), 'count(10.0)')
624 self.assertEqual(type(next(count(10.0))), float)
625 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5):
626 # Test repr
627 r1 = repr(count(i))
628 r2 = 'count(%r)'.__mod__(i)
629 self.assertEqual(r1, r2)
630
631 # check copy, deepcopy, pickle
632 for value in -3, 3, maxsize-5, maxsize+5:
633 c = count(value)
634 self.assertEqual(next(copy.copy(c)), value)
635 self.assertEqual(next(copy.deepcopy(c)), value)
636 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
637 self.pickletest(proto, count(value))
638
639 #check proper internal error handling for large "step' sizes
640 count(1, maxsize+5); sys.exc_info()
641
642 @pickle_deprecated
643 def test_count_with_stride(self):
644 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)])
645 self.assertEqual(lzip('abc',count(start=2,step=3)),
646 [('a', 2), ('b', 5), ('c', 8)])
647 self.assertEqual(lzip('abc',count(step=-1)),
648 [('a', 0), ('b', -1), ('c', -2)])
649 self.assertRaises(TypeError, count, 'a', 'b')
650 self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)])
651 self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)])
652 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)])
653 self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3)))
654 self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3)))
655 self.assertEqual(take(3, count(10, maxsize+5)),
656 list(range(10, 10+3*(maxsize+5), maxsize+5)))
657 self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5])
658 self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j])
659 self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))),
660 [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')])
661 self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))),
662 [Fraction(2,3), Fraction(17,21), Fraction(20,21)])
663 BIGINT = 1<<1000
664 self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT])
665 self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0]))
666 c = count(3, 5)
667 self.assertEqual(repr(c), 'count(3, 5)')
668 next(c)
669 self.assertEqual(repr(c), 'count(8, 5)')
670 c = count(-9, 0)
671 self.assertEqual(repr(c), 'count(-9, 0)')
672 next(c)
673 self.assertEqual(repr(c), 'count(-9, 0)')
674 c = count(-9, -3)
675 self.assertEqual(repr(c), 'count(-9, -3)')
676 next(c)
677 self.assertEqual(repr(c), 'count(-12, -3)')
678 self.assertEqual(repr(c), 'count(-12, -3)')
679 self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)')
680 self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int
681 self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0
682 self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)')
683 c = count(10, 1.0)
684 self.assertEqual(type(next(c)), int)
685 self.assertEqual(type(next(c)), float)
686 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5):
687 for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5):
688 # Test repr
689 r1 = repr(count(i, j))
690 if j == 1:
691 r2 = ('count(%r)' % i)
692 else:
693 r2 = ('count(%r, %r)' % (i, j))
694 self.assertEqual(r1, r2)
695 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
696 self.pickletest(proto, count(i, j))
697
698 def test_cycle(self):
699 self.assertEqual(take(10, cycle('abc')), list('abcabcabca'))
700 self.assertEqual(list(cycle('')), [])
701 self.assertRaises(TypeError, cycle)
702 self.assertRaises(TypeError, cycle, 5)
703 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0])
704
705 @pickle_deprecated
706 def test_cycle_copy_pickle(self):
707 # check copy, deepcopy, pickle
708 c = cycle('abc')
709 self.assertEqual(next(c), 'a')
710 #simple copy currently not supported, because __reduce__ returns
711 #an internal iterator
712 #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab'))
713 self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab'))
714 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
715 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))),
716 list('bcabcabcab'))
717 next(c)
718 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))),
719 list('cabcabcabc'))
720 next(c)
721 next(c)
722 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
723 self.pickletest(proto, cycle('abc'))
724
725 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
726 # test with partial consumed input iterable
727 it = iter('abcde')
728 c = cycle(it)
729 _ = [next(c) for i in range(2)] # consume 2 of 5 inputs
730 p = pickle.dumps(c, proto)
731 d = pickle.loads(p) # rebuild the cycle object
732 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
733
734 # test with completely consumed input iterable
735 it = iter('abcde')
736 c = cycle(it)
737 _ = [next(c) for i in range(7)] # consume 7 of 5 inputs
738 p = pickle.dumps(c, proto)
739 d = pickle.loads(p) # rebuild the cycle object
740 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
741
742 @pickle_deprecated
743 def test_cycle_unpickle_compat(self):
744 testcases = [
745 b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI0\ntb.',
746 b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aK\x00tb.',
747 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
748 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
749 b'\x80\x04\x95=\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
750
751 b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI1\ntb.',
752 b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00K\x01tb.',
753 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
754 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
755 b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
756
757 b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI00\ntb.',
758 b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aI00\ntb.',
759 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
760 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
761 b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
762
763 b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI01\ntb.',
764 b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00I01\ntb.',
765 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
766 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
767 b'\x80\x04\x95;\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
768 ]
769 assert len(testcases) == 20
770 for t in testcases:
771 it = pickle.loads(t)
772 self.assertEqual(take(10, it), [2, 3, 1, 2, 3, 1, 2, 3, 1, 2])
773
774 @pickle_deprecated
775 def test_cycle_setstate(self):
776 # Verify both modes for restoring state
777
778 # Mode 0 is efficient. It uses an incompletely consumed input
779 # iterator to build a cycle object and then passes in state with
780 # a list of previously consumed values. There is no data
781 # overlap between the two.
782 c = cycle('defg')
783 c.__setstate__((list('abc'), 0))
784 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
785
786 # Mode 1 is inefficient. It starts with a cycle object built
787 # from an iterator over the remaining elements in a partial
788 # cycle and then passes in state with all of the previously
789 # seen values (this overlaps values included in the iterator).
790 c = cycle('defg')
791 c.__setstate__((list('abcdefg'), 1))
792 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab'))
793
794 # The first argument to setstate needs to be a tuple
795 with self.assertRaises(TypeError):
796 cycle('defg').__setstate__([list('abcdefg'), 0])
797
798 # The first argument in the setstate tuple must be a list
799 with self.assertRaises(TypeError):
800 c = cycle('defg')
801 c.__setstate__((tuple('defg'), 0))
802 take(20, c)
803
804 # The second argument in the setstate tuple must be an int
805 with self.assertRaises(TypeError):
806 cycle('defg').__setstate__((list('abcdefg'), 'x'))
807
808 self.assertRaises(TypeError, cycle('').__setstate__, ())
809 self.assertRaises(TypeError, cycle('').__setstate__, ([],))
810
811 @pickle_deprecated
812 def test_groupby(self):
813 # Check whether it accepts arguments correctly
814 self.assertEqual([], list(groupby([])))
815 self.assertEqual([], list(groupby([], key=id)))
816 self.assertRaises(TypeError, list, groupby('abc', []))
817 self.assertRaises(TypeError, groupby, None)
818 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10)
819
820 # Check normal input
821 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22),
822 (2,15,22), (3,16,23), (3,17,23)]
823 dup = []
824 for k, g in groupby(s, lambda r:r[0]):
825 for elem in g:
826 self.assertEqual(k, elem[0])
827 dup.append(elem)
828 self.assertEqual(s, dup)
829
830 # Check normal pickled
831 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
832 dup = []
833 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)):
834 for elem in g:
835 self.assertEqual(k, elem[0])
836 dup.append(elem)
837 self.assertEqual(s, dup)
838
839 # Check nested case
840 dup = []
841 for k, g in groupby(s, testR):
842 for ik, ig in groupby(g, testR2):
843 for elem in ig:
844 self.assertEqual(k, elem[0])
845 self.assertEqual(ik, elem[2])
846 dup.append(elem)
847 self.assertEqual(s, dup)
848
849 # Check nested and pickled
850 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
851 dup = []
852 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)):
853 for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)):
854 for elem in ig:
855 self.assertEqual(k, elem[0])
856 self.assertEqual(ik, elem[2])
857 dup.append(elem)
858 self.assertEqual(s, dup)
859
860
861 # Check case where inner iterator is not used
862 keys = [k for k, g in groupby(s, testR)]
863 expectedkeys = set([r[0] for r in s])
864 self.assertEqual(set(keys), expectedkeys)
865 self.assertEqual(len(keys), len(expectedkeys))
866
867 # Check case where inner iterator is used after advancing the groupby
868 # iterator
869 s = list(zip('AABBBAAAA', range(9)))
870 it = groupby(s, testR)
871 _, g1 = next(it)
872 _, g2 = next(it)
873 _, g3 = next(it)
874 self.assertEqual(list(g1), [])
875 self.assertEqual(list(g2), [])
876 self.assertEqual(next(g3), ('A', 5))
877 list(it) # exhaust the groupby iterator
878 self.assertEqual(list(g3), [])
879
880 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
881 it = groupby(s, testR)
882 _, g = next(it)
883 next(it)
884 next(it)
885 self.assertEqual(list(pickle.loads(pickle.dumps(g, proto))), [])
886
887 # Exercise pipes and filters style
888 s = 'abracadabra'
889 # sort s | uniq
890 r = [k for k, g in groupby(sorted(s))]
891 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r'])
892 # sort s | uniq -d
893 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))]
894 self.assertEqual(r, ['a', 'b', 'r'])
895 # sort s | uniq -c
896 r = [(len(list(g)), k) for k, g in groupby(sorted(s))]
897 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')])
898 # sort s | uniq -c | sort -rn | head -3
899 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3]
900 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')])
901
902 # iter.__next__ failure
903 class ESC[4;38;5;81mExpectedError(ESC[4;38;5;149mException):
904 pass
905 def delayed_raise(n=0):
906 for i in range(n):
907 yield 'yo'
908 raise ExpectedError
909 def gulp(iterable, keyp=None, func=list):
910 return [func(g) for k, g in groupby(iterable, keyp)]
911
912 # iter.__next__ failure on outer object
913 self.assertRaises(ExpectedError, gulp, delayed_raise(0))
914 # iter.__next__ failure on inner object
915 self.assertRaises(ExpectedError, gulp, delayed_raise(1))
916
917 # __eq__ failure
918 class ESC[4;38;5;81mDummyCmp:
919 def __eq__(self, dst):
920 raise ExpectedError
921 s = [DummyCmp(), DummyCmp(), None]
922
923 # __eq__ failure on outer object
924 self.assertRaises(ExpectedError, gulp, s, func=id)
925 # __eq__ failure on inner object
926 self.assertRaises(ExpectedError, gulp, s)
927
928 # keyfunc failure
929 def keyfunc(obj):
930 if keyfunc.skip > 0:
931 keyfunc.skip -= 1
932 return obj
933 else:
934 raise ExpectedError
935
936 # keyfunc failure on outer object
937 keyfunc.skip = 0
938 self.assertRaises(ExpectedError, gulp, [None], keyfunc)
939 keyfunc.skip = 1
940 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc)
941
942 def test_filter(self):
943 self.assertEqual(list(filter(isEven, range(6))), [0,2,4])
944 self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2])
945 self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2])
946 self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6])
947 self.assertRaises(TypeError, filter)
948 self.assertRaises(TypeError, filter, lambda x:x)
949 self.assertRaises(TypeError, filter, lambda x:x, range(6), 7)
950 self.assertRaises(TypeError, filter, isEven, 3)
951 self.assertRaises(TypeError, next, filter(range(6), range(6)))
952
953 # check copy, deepcopy, pickle
954 ans = [0,2,4]
955
956 c = filter(isEven, range(6))
957 self.assertEqual(list(copy.copy(c)), ans)
958 c = filter(isEven, range(6))
959 self.assertEqual(list(copy.deepcopy(c)), ans)
960 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
961 c = filter(isEven, range(6))
962 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans)
963 next(c)
964 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:])
965 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
966 c = filter(isEven, range(6))
967 self.pickletest(proto, c)
968
969 @pickle_deprecated
970 def test_filterfalse(self):
971 self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5])
972 self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0])
973 self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0])
974 self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7])
975 self.assertRaises(TypeError, filterfalse)
976 self.assertRaises(TypeError, filterfalse, lambda x:x)
977 self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7)
978 self.assertRaises(TypeError, filterfalse, isEven, 3)
979 self.assertRaises(TypeError, next, filterfalse(range(6), range(6)))
980 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
981 self.pickletest(proto, filterfalse(isEven, range(6)))
982
983 def test_zip(self):
984 # XXX This is rather silly now that builtin zip() calls zip()...
985 ans = [(x,y) for x, y in zip('abc',count())]
986 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
987 self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6)))
988 self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3)))
989 self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3)))
990 self.assertEqual(list(zip('abcdef')), lzip('abcdef'))
991 self.assertEqual(list(zip()), lzip())
992 self.assertRaises(TypeError, zip, 3)
993 self.assertRaises(TypeError, zip, range(3), 3)
994 self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')],
995 lzip('abc', 'def'))
996 self.assertEqual([pair for pair in zip('abc', 'def')],
997 lzip('abc', 'def'))
998
999 @support.impl_detail("tuple reuse is specific to CPython")
1000 @pickle_deprecated
1001 def test_zip_tuple_reuse(self):
1002 ids = list(map(id, zip('abc', 'def')))
1003 self.assertEqual(min(ids), max(ids))
1004 ids = list(map(id, list(zip('abc', 'def'))))
1005 self.assertEqual(len(dict.fromkeys(ids)), len(ids))
1006
1007 # check copy, deepcopy, pickle
1008 ans = [(x,y) for x, y in copy.copy(zip('abc',count()))]
1009 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
1010
1011 ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))]
1012 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
1013
1014 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1015 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))]
1016 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)])
1017
1018 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1019 testIntermediate = zip('abc',count())
1020 next(testIntermediate)
1021 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))]
1022 self.assertEqual(ans, [('b', 1), ('c', 2)])
1023
1024 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1025 self.pickletest(proto, zip('abc', count()))
1026
1027 def test_ziplongest(self):
1028 for args in [
1029 ['abc', range(6)],
1030 [range(6), 'abc'],
1031 [range(1000), range(2000,2100), range(3000,3050)],
1032 [range(1000), range(0), range(3000,3050), range(1200), range(1500)],
1033 [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)],
1034 ]:
1035 target = [tuple([arg[i] if i < len(arg) else None for arg in args])
1036 for i in range(max(map(len, args)))]
1037 self.assertEqual(list(zip_longest(*args)), target)
1038 self.assertEqual(list(zip_longest(*args, **{})), target)
1039 target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X'
1040 self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target)
1041
1042 self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input
1043
1044 self.assertEqual(list(zip_longest()), list(zip()))
1045 self.assertEqual(list(zip_longest([])), list(zip([])))
1046 self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef')))
1047
1048 self.assertEqual(list(zip_longest('abc', 'defg', **{})),
1049 list(zip(list('abc')+[None], 'defg'))) # empty keyword dict
1050 self.assertRaises(TypeError, zip_longest, 3)
1051 self.assertRaises(TypeError, zip_longest, range(3), 3)
1052
1053 for stmt in [
1054 "zip_longest('abc', fv=1)",
1055 "zip_longest('abc', fillvalue=1, bogus_keyword=None)",
1056 ]:
1057 try:
1058 eval(stmt, globals(), locals())
1059 except TypeError:
1060 pass
1061 else:
1062 self.fail('Did not raise Type in: ' + stmt)
1063
1064 self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')],
1065 list(zip('abc', 'def')))
1066 self.assertEqual([pair for pair in zip_longest('abc', 'def')],
1067 list(zip('abc', 'def')))
1068
1069 @support.impl_detail("tuple reuse is specific to CPython")
1070 def test_zip_longest_tuple_reuse(self):
1071 ids = list(map(id, zip_longest('abc', 'def')))
1072 self.assertEqual(min(ids), max(ids))
1073 ids = list(map(id, list(zip_longest('abc', 'def'))))
1074 self.assertEqual(len(dict.fromkeys(ids)), len(ids))
1075
1076 @pickle_deprecated
1077 def test_zip_longest_pickling(self):
1078 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1079 self.pickletest(proto, zip_longest("abc", "def"))
1080 self.pickletest(proto, zip_longest("abc", "defgh"))
1081 self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1))
1082 self.pickletest(proto, zip_longest("", "defgh"))
1083
1084 def test_zip_longest_bad_iterable(self):
1085 exception = TypeError()
1086
1087 class ESC[4;38;5;81mBadIterable:
1088 def __iter__(self):
1089 raise exception
1090
1091 with self.assertRaises(TypeError) as cm:
1092 zip_longest(BadIterable())
1093
1094 self.assertIs(cm.exception, exception)
1095
1096 def test_bug_7244(self):
1097
1098 class ESC[4;38;5;81mRepeater:
1099 # this class is similar to itertools.repeat
1100 def __init__(self, o, t, e):
1101 self.o = o
1102 self.t = int(t)
1103 self.e = e
1104 def __iter__(self): # its iterator is itself
1105 return self
1106 def __next__(self):
1107 if self.t > 0:
1108 self.t -= 1
1109 return self.o
1110 else:
1111 raise self.e
1112
1113 # Formerly this code in would fail in debug mode
1114 # with Undetected Error and Stop Iteration
1115 r1 = Repeater(1, 3, StopIteration)
1116 r2 = Repeater(2, 4, StopIteration)
1117 def run(r1, r2):
1118 result = []
1119 for i, j in zip_longest(r1, r2, fillvalue=0):
1120 with support.captured_output('stdout'):
1121 print((i, j))
1122 result.append((i, j))
1123 return result
1124 self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)])
1125
1126 # Formerly, the RuntimeError would be lost
1127 # and StopIteration would stop as expected
1128 r1 = Repeater(1, 3, RuntimeError)
1129 r2 = Repeater(2, 4, StopIteration)
1130 it = zip_longest(r1, r2, fillvalue=0)
1131 self.assertEqual(next(it), (1, 2))
1132 self.assertEqual(next(it), (1, 2))
1133 self.assertEqual(next(it), (1, 2))
1134 self.assertRaises(RuntimeError, next, it)
1135
1136 def test_pairwise(self):
1137 self.assertEqual(list(pairwise('')), [])
1138 self.assertEqual(list(pairwise('a')), [])
1139 self.assertEqual(list(pairwise('ab')),
1140 [('a', 'b')]),
1141 self.assertEqual(list(pairwise('abcde')),
1142 [('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')])
1143 self.assertEqual(list(pairwise(range(10_000))),
1144 list(zip(range(10_000), range(1, 10_000))))
1145
1146 with self.assertRaises(TypeError):
1147 pairwise() # too few arguments
1148 with self.assertRaises(TypeError):
1149 pairwise('abc', 10) # too many arguments
1150 with self.assertRaises(TypeError):
1151 pairwise(iterable='abc') # keyword arguments
1152 with self.assertRaises(TypeError):
1153 pairwise(None) # non-iterable argument
1154
1155 def test_product(self):
1156 for args, result in [
1157 ([], [()]), # zero iterables
1158 (['ab'], [('a',), ('b',)]), # one iterable
1159 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables
1160 ([range(0), range(2), range(3)], []), # first iterable with zero length
1161 ([range(2), range(0), range(3)], []), # middle iterable with zero length
1162 ([range(2), range(3), range(0)], []), # last iterable with zero length
1163 ]:
1164 self.assertEqual(list(product(*args)), result)
1165 for r in range(4):
1166 self.assertEqual(list(product(*(args*r))),
1167 list(product(*args, **dict(repeat=r))))
1168 self.assertEqual(len(list(product(*[range(7)]*6))), 7**6)
1169 self.assertRaises(TypeError, product, range(6), None)
1170
1171 def product1(*args, **kwds):
1172 pools = list(map(tuple, args)) * kwds.get('repeat', 1)
1173 n = len(pools)
1174 if n == 0:
1175 yield ()
1176 return
1177 if any(len(pool) == 0 for pool in pools):
1178 return
1179 indices = [0] * n
1180 yield tuple(pool[i] for pool, i in zip(pools, indices))
1181 while 1:
1182 for i in reversed(range(n)): # right to left
1183 if indices[i] == len(pools[i]) - 1:
1184 continue
1185 indices[i] += 1
1186 for j in range(i+1, n):
1187 indices[j] = 0
1188 yield tuple(pool[i] for pool, i in zip(pools, indices))
1189 break
1190 else:
1191 return
1192
1193 def product2(*args, **kwds):
1194 'Pure python version used in docs'
1195 pools = list(map(tuple, args)) * kwds.get('repeat', 1)
1196 result = [[]]
1197 for pool in pools:
1198 result = [x+[y] for x in result for y in pool]
1199 for prod in result:
1200 yield tuple(prod)
1201
1202 argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3),
1203 set('abcdefg'), range(11), tuple(range(13))]
1204 for i in range(100):
1205 args = [random.choice(argtypes) for j in range(random.randrange(5))]
1206 expected_len = prod(map(len, args))
1207 self.assertEqual(len(list(product(*args))), expected_len)
1208 self.assertEqual(list(product(*args)), list(product1(*args)))
1209 self.assertEqual(list(product(*args)), list(product2(*args)))
1210 args = map(iter, args)
1211 self.assertEqual(len(list(product(*args))), expected_len)
1212
1213 @support.bigaddrspacetest
1214 def test_product_overflow(self):
1215 with self.assertRaises((OverflowError, MemoryError)):
1216 product(*(['ab']*2**5), repeat=2**25)
1217
1218 @support.impl_detail("tuple reuse is specific to CPython")
1219 def test_product_tuple_reuse(self):
1220 self.assertEqual(len(set(map(id, product('abc', 'def')))), 1)
1221 self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1)
1222
1223 @pickle_deprecated
1224 def test_product_pickling(self):
1225 # check copy, deepcopy, pickle
1226 for args, result in [
1227 ([], [()]), # zero iterables
1228 (['ab'], [('a',), ('b',)]), # one iterable
1229 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables
1230 ([range(0), range(2), range(3)], []), # first iterable with zero length
1231 ([range(2), range(0), range(3)], []), # middle iterable with zero length
1232 ([range(2), range(3), range(0)], []), # last iterable with zero length
1233 ]:
1234 self.assertEqual(list(copy.copy(product(*args))), result)
1235 self.assertEqual(list(copy.deepcopy(product(*args))), result)
1236 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1237 self.pickletest(proto, product(*args))
1238
1239 @pickle_deprecated
1240 def test_product_issue_25021(self):
1241 # test that indices are properly clamped to the length of the tuples
1242 p = product((1, 2),(3,))
1243 p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped
1244 self.assertEqual(next(p), (2, 3))
1245 # test that empty tuple in the list will result in an immediate StopIteration
1246 p = product((1, 2), (), (3,))
1247 p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped
1248 self.assertRaises(StopIteration, next, p)
1249
1250 @pickle_deprecated
1251 def test_repeat(self):
1252 self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a'])
1253 self.assertEqual(lzip(range(3),repeat('a')),
1254 [(0, 'a'), (1, 'a'), (2, 'a')])
1255 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a'])
1256 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a'])
1257 self.assertEqual(list(repeat('a', 0)), [])
1258 self.assertEqual(list(repeat('a', -3)), [])
1259 self.assertRaises(TypeError, repeat)
1260 self.assertRaises(TypeError, repeat, None, 3, 4)
1261 self.assertRaises(TypeError, repeat, None, 'a')
1262 r = repeat(1+0j)
1263 self.assertEqual(repr(r), 'repeat((1+0j))')
1264 r = repeat(1+0j, 5)
1265 self.assertEqual(repr(r), 'repeat((1+0j), 5)')
1266 list(r)
1267 self.assertEqual(repr(r), 'repeat((1+0j), 0)')
1268
1269 # check copy, deepcopy, pickle
1270 c = repeat(object='a', times=10)
1271 self.assertEqual(next(c), 'a')
1272 self.assertEqual(take(2, copy.copy(c)), list('a' * 2))
1273 self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2))
1274 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1275 self.pickletest(proto, repeat(object='a', times=10))
1276
1277 def test_repeat_with_negative_times(self):
1278 self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)")
1279 self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)")
1280 self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)")
1281 self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)")
1282
1283 @pickle_deprecated
1284 def test_map(self):
1285 self.assertEqual(list(map(operator.pow, range(3), range(1,7))),
1286 [0**1, 1**2, 2**3])
1287 self.assertEqual(list(map(tupleize, 'abc', range(5))),
1288 [('a',0),('b',1),('c',2)])
1289 self.assertEqual(list(map(tupleize, 'abc', count())),
1290 [('a',0),('b',1),('c',2)])
1291 self.assertEqual(take(2,map(tupleize, 'abc', count())),
1292 [('a',0),('b',1)])
1293 self.assertEqual(list(map(operator.pow, [])), [])
1294 self.assertRaises(TypeError, map)
1295 self.assertRaises(TypeError, list, map(None, range(3), range(3)))
1296 self.assertRaises(TypeError, map, operator.neg)
1297 self.assertRaises(TypeError, next, map(10, range(5)))
1298 self.assertRaises(ValueError, next, map(errfunc, [4], [5]))
1299 self.assertRaises(TypeError, next, map(onearg, [4], [5]))
1300
1301 # check copy, deepcopy, pickle
1302 ans = [('a',0),('b',1),('c',2)]
1303
1304 c = map(tupleize, 'abc', count())
1305 self.assertEqual(list(copy.copy(c)), ans)
1306
1307 c = map(tupleize, 'abc', count())
1308 self.assertEqual(list(copy.deepcopy(c)), ans)
1309
1310 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1311 c = map(tupleize, 'abc', count())
1312 self.pickletest(proto, c)
1313
1314 @pickle_deprecated
1315 def test_starmap(self):
1316 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))),
1317 [0**1, 1**2, 2**3])
1318 self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))),
1319 [0**1, 1**2, 2**3])
1320 self.assertEqual(list(starmap(operator.pow, [])), [])
1321 self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5])
1322 self.assertRaises(TypeError, list, starmap(operator.pow, [None]))
1323 self.assertRaises(TypeError, starmap)
1324 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra')
1325 self.assertRaises(TypeError, next, starmap(10, [(4,5)]))
1326 self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)]))
1327 self.assertRaises(TypeError, next, starmap(onearg, [(4,5)]))
1328
1329 # check copy, deepcopy, pickle
1330 ans = [0**1, 1**2, 2**3]
1331
1332 c = starmap(operator.pow, zip(range(3), range(1,7)))
1333 self.assertEqual(list(copy.copy(c)), ans)
1334
1335 c = starmap(operator.pow, zip(range(3), range(1,7)))
1336 self.assertEqual(list(copy.deepcopy(c)), ans)
1337
1338 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1339 c = starmap(operator.pow, zip(range(3), range(1,7)))
1340 self.pickletest(proto, c)
1341
1342 @pickle_deprecated
1343 def test_islice(self):
1344 for args in [ # islice(args) should agree with range(args)
1345 (10, 20, 3),
1346 (10, 3, 20),
1347 (10, 20),
1348 (10, 10),
1349 (10, 3),
1350 (20,)
1351 ]:
1352 self.assertEqual(list(islice(range(100), *args)),
1353 list(range(*args)))
1354
1355 for args, tgtargs in [ # Stop when seqn is exhausted
1356 ((10, 110, 3), ((10, 100, 3))),
1357 ((10, 110), ((10, 100))),
1358 ((110,), (100,))
1359 ]:
1360 self.assertEqual(list(islice(range(100), *args)),
1361 list(range(*tgtargs)))
1362
1363 # Test stop=None
1364 self.assertEqual(list(islice(range(10), None)), list(range(10)))
1365 self.assertEqual(list(islice(range(10), None, None)), list(range(10)))
1366 self.assertEqual(list(islice(range(10), None, None, None)), list(range(10)))
1367 self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10)))
1368 self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2)))
1369
1370 # Test number of items consumed SF #1171417
1371 it = iter(range(10))
1372 self.assertEqual(list(islice(it, 3)), list(range(3)))
1373 self.assertEqual(list(it), list(range(3, 10)))
1374
1375 it = iter(range(10))
1376 self.assertEqual(list(islice(it, 3, 3)), [])
1377 self.assertEqual(list(it), list(range(3, 10)))
1378
1379 # Test invalid arguments
1380 ra = range(10)
1381 self.assertRaises(TypeError, islice, ra)
1382 self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4)
1383 self.assertRaises(ValueError, islice, ra, -5, 10, 1)
1384 self.assertRaises(ValueError, islice, ra, 1, -5, -1)
1385 self.assertRaises(ValueError, islice, ra, 1, 10, -1)
1386 self.assertRaises(ValueError, islice, ra, 1, 10, 0)
1387 self.assertRaises(ValueError, islice, ra, 'a')
1388 self.assertRaises(ValueError, islice, ra, 'a', 1)
1389 self.assertRaises(ValueError, islice, ra, 1, 'a')
1390 self.assertRaises(ValueError, islice, ra, 'a', 1, 1)
1391 self.assertRaises(ValueError, islice, ra, 1, 'a', 1)
1392 self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1)
1393
1394 # Issue #10323: Less islice in a predictable state
1395 c = count()
1396 self.assertEqual(list(islice(c, 1, 3, 50)), [1])
1397 self.assertEqual(next(c), 3)
1398
1399 # check copy, deepcopy, pickle
1400 for args in [ # islice(args) should agree with range(args)
1401 (10, 20, 3),
1402 (10, 3, 20),
1403 (10, 20),
1404 (10, 3),
1405 (20,)
1406 ]:
1407 self.assertEqual(list(copy.copy(islice(range(100), *args))),
1408 list(range(*args)))
1409 self.assertEqual(list(copy.deepcopy(islice(range(100), *args))),
1410 list(range(*args)))
1411 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1412 self.pickletest(proto, islice(range(100), *args))
1413
1414 # Issue #21321: check source iterator is not referenced
1415 # from islice() after the latter has been exhausted
1416 it = (x for x in (1, 2))
1417 wr = weakref.ref(it)
1418 it = islice(it, 1)
1419 self.assertIsNotNone(wr())
1420 list(it) # exhaust the iterator
1421 support.gc_collect()
1422 self.assertIsNone(wr())
1423
1424 # Issue #30537: islice can accept integer-like objects as
1425 # arguments
1426 class ESC[4;38;5;81mIntLike(ESC[4;38;5;149mobject):
1427 def __init__(self, val):
1428 self.val = val
1429 def __index__(self):
1430 return self.val
1431 self.assertEqual(list(islice(range(100), IntLike(10))), list(range(10)))
1432 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50))),
1433 list(range(10, 50)))
1434 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))),
1435 list(range(10,50,5)))
1436
1437 @pickle_deprecated
1438 def test_takewhile(self):
1439 data = [1, 3, 5, 20, 2, 4, 6, 8]
1440 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5])
1441 self.assertEqual(list(takewhile(underten, [])), [])
1442 self.assertRaises(TypeError, takewhile)
1443 self.assertRaises(TypeError, takewhile, operator.pow)
1444 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra')
1445 self.assertRaises(TypeError, next, takewhile(10, [(4,5)]))
1446 self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)]))
1447 t = takewhile(bool, [1, 1, 1, 0, 0, 0])
1448 self.assertEqual(list(t), [1, 1, 1])
1449 self.assertRaises(StopIteration, next, t)
1450
1451 # check copy, deepcopy, pickle
1452 self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5])
1453 self.assertEqual(list(copy.deepcopy(takewhile(underten, data))),
1454 [1, 3, 5])
1455 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1456 self.pickletest(proto, takewhile(underten, data))
1457
1458 @pickle_deprecated
1459 def test_dropwhile(self):
1460 data = [1, 3, 5, 20, 2, 4, 6, 8]
1461 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8])
1462 self.assertEqual(list(dropwhile(underten, [])), [])
1463 self.assertRaises(TypeError, dropwhile)
1464 self.assertRaises(TypeError, dropwhile, operator.pow)
1465 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra')
1466 self.assertRaises(TypeError, next, dropwhile(10, [(4,5)]))
1467 self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)]))
1468
1469 # check copy, deepcopy, pickle
1470 self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8])
1471 self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))),
1472 [20, 2, 4, 6, 8])
1473 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1474 self.pickletest(proto, dropwhile(underten, data))
1475
1476 @pickle_deprecated
1477 def test_tee(self):
1478 n = 200
1479
1480 a, b = tee([]) # test empty iterator
1481 self.assertEqual(list(a), [])
1482 self.assertEqual(list(b), [])
1483
1484 a, b = tee(irange(n)) # test 100% interleaved
1485 self.assertEqual(lzip(a,b), lzip(range(n), range(n)))
1486
1487 a, b = tee(irange(n)) # test 0% interleaved
1488 self.assertEqual(list(a), list(range(n)))
1489 self.assertEqual(list(b), list(range(n)))
1490
1491 a, b = tee(irange(n)) # test dealloc of leading iterator
1492 for i in range(100):
1493 self.assertEqual(next(a), i)
1494 del a
1495 self.assertEqual(list(b), list(range(n)))
1496
1497 a, b = tee(irange(n)) # test dealloc of trailing iterator
1498 for i in range(100):
1499 self.assertEqual(next(a), i)
1500 del b
1501 self.assertEqual(list(a), list(range(100, n)))
1502
1503 for j in range(5): # test randomly interleaved
1504 order = [0]*n + [1]*n
1505 random.shuffle(order)
1506 lists = ([], [])
1507 its = tee(irange(n))
1508 for i in order:
1509 value = next(its[i])
1510 lists[i].append(value)
1511 self.assertEqual(lists[0], list(range(n)))
1512 self.assertEqual(lists[1], list(range(n)))
1513
1514 # test argument format checking
1515 self.assertRaises(TypeError, tee)
1516 self.assertRaises(TypeError, tee, 3)
1517 self.assertRaises(TypeError, tee, [1,2], 'x')
1518 self.assertRaises(TypeError, tee, [1,2], 3, 'x')
1519
1520 # tee object should be instantiable
1521 a, b = tee('abc')
1522 c = type(a)('def')
1523 self.assertEqual(list(c), list('def'))
1524
1525 # test long-lagged and multi-way split
1526 a, b, c = tee(range(2000), 3)
1527 for i in range(100):
1528 self.assertEqual(next(a), i)
1529 self.assertEqual(list(b), list(range(2000)))
1530 self.assertEqual([next(c), next(c)], list(range(2)))
1531 self.assertEqual(list(a), list(range(100,2000)))
1532 self.assertEqual(list(c), list(range(2,2000)))
1533
1534 # test values of n
1535 self.assertRaises(TypeError, tee, 'abc', 'invalid')
1536 self.assertRaises(ValueError, tee, [], -1)
1537 for n in range(5):
1538 result = tee('abc', n)
1539 self.assertEqual(type(result), tuple)
1540 self.assertEqual(len(result), n)
1541 self.assertEqual([list(x) for x in result], [list('abc')]*n)
1542
1543 # tee pass-through to copyable iterator
1544 a, b = tee('abc')
1545 c, d = tee(a)
1546 self.assertTrue(a is c)
1547
1548 # test tee_new
1549 t1, t2 = tee('abc')
1550 tnew = type(t1)
1551 self.assertRaises(TypeError, tnew)
1552 self.assertRaises(TypeError, tnew, 10)
1553 t3 = tnew(t1)
1554 self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc'))
1555
1556 # test that tee objects are weak referencable
1557 a, b = tee(range(10))
1558 p = weakref.proxy(a)
1559 self.assertEqual(getattr(p, '__class__'), type(b))
1560 del a
1561 support.gc_collect() # For PyPy or other GCs.
1562 self.assertRaises(ReferenceError, getattr, p, '__class__')
1563
1564 ans = list('abc')
1565 long_ans = list(range(10000))
1566
1567 # check copy
1568 a, b = tee('abc')
1569 self.assertEqual(list(copy.copy(a)), ans)
1570 self.assertEqual(list(copy.copy(b)), ans)
1571 a, b = tee(list(range(10000)))
1572 self.assertEqual(list(copy.copy(a)), long_ans)
1573 self.assertEqual(list(copy.copy(b)), long_ans)
1574
1575 # check partially consumed copy
1576 a, b = tee('abc')
1577 take(2, a)
1578 take(1, b)
1579 self.assertEqual(list(copy.copy(a)), ans[2:])
1580 self.assertEqual(list(copy.copy(b)), ans[1:])
1581 self.assertEqual(list(a), ans[2:])
1582 self.assertEqual(list(b), ans[1:])
1583 a, b = tee(range(10000))
1584 take(100, a)
1585 take(60, b)
1586 self.assertEqual(list(copy.copy(a)), long_ans[100:])
1587 self.assertEqual(list(copy.copy(b)), long_ans[60:])
1588 self.assertEqual(list(a), long_ans[100:])
1589 self.assertEqual(list(b), long_ans[60:])
1590
1591 # check deepcopy
1592 a, b = tee('abc')
1593 self.assertEqual(list(copy.deepcopy(a)), ans)
1594 self.assertEqual(list(copy.deepcopy(b)), ans)
1595 self.assertEqual(list(a), ans)
1596 self.assertEqual(list(b), ans)
1597 a, b = tee(range(10000))
1598 self.assertEqual(list(copy.deepcopy(a)), long_ans)
1599 self.assertEqual(list(copy.deepcopy(b)), long_ans)
1600 self.assertEqual(list(a), long_ans)
1601 self.assertEqual(list(b), long_ans)
1602
1603 # check partially consumed deepcopy
1604 a, b = tee('abc')
1605 take(2, a)
1606 take(1, b)
1607 self.assertEqual(list(copy.deepcopy(a)), ans[2:])
1608 self.assertEqual(list(copy.deepcopy(b)), ans[1:])
1609 self.assertEqual(list(a), ans[2:])
1610 self.assertEqual(list(b), ans[1:])
1611 a, b = tee(range(10000))
1612 take(100, a)
1613 take(60, b)
1614 self.assertEqual(list(copy.deepcopy(a)), long_ans[100:])
1615 self.assertEqual(list(copy.deepcopy(b)), long_ans[60:])
1616 self.assertEqual(list(a), long_ans[100:])
1617 self.assertEqual(list(b), long_ans[60:])
1618
1619 # check pickle
1620 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1621 self.pickletest(proto, iter(tee('abc')))
1622 a, b = tee('abc')
1623 self.pickletest(proto, a, compare=ans)
1624 self.pickletest(proto, b, compare=ans)
1625
1626 # Issue 13454: Crash when deleting backward iterator from tee()
1627 def test_tee_del_backward(self):
1628 forward, backward = tee(repeat(None, 20000000))
1629 try:
1630 any(forward) # exhaust the iterator
1631 del backward
1632 except:
1633 del forward, backward
1634 raise
1635
1636 def test_tee_reenter(self):
1637 class ESC[4;38;5;81mI:
1638 first = True
1639 def __iter__(self):
1640 return self
1641 def __next__(self):
1642 first = self.first
1643 self.first = False
1644 if first:
1645 return next(b)
1646
1647 a, b = tee(I())
1648 with self.assertRaisesRegex(RuntimeError, "tee"):
1649 next(a)
1650
1651 @threading_helper.requires_working_threading()
1652 def test_tee_concurrent(self):
1653 start = threading.Event()
1654 finish = threading.Event()
1655 class ESC[4;38;5;81mI:
1656 def __iter__(self):
1657 return self
1658 def __next__(self):
1659 start.set()
1660 finish.wait()
1661
1662 a, b = tee(I())
1663 thread = threading.Thread(target=next, args=[a])
1664 thread.start()
1665 try:
1666 start.wait()
1667 with self.assertRaisesRegex(RuntimeError, "tee"):
1668 next(b)
1669 finally:
1670 finish.set()
1671 thread.join()
1672
1673 def test_StopIteration(self):
1674 self.assertRaises(StopIteration, next, zip())
1675
1676 for f in (chain, cycle, zip, groupby):
1677 self.assertRaises(StopIteration, next, f([]))
1678 self.assertRaises(StopIteration, next, f(StopNow()))
1679
1680 self.assertRaises(StopIteration, next, islice([], None))
1681 self.assertRaises(StopIteration, next, islice(StopNow(), None))
1682
1683 p, q = tee([])
1684 self.assertRaises(StopIteration, next, p)
1685 self.assertRaises(StopIteration, next, q)
1686 p, q = tee(StopNow())
1687 self.assertRaises(StopIteration, next, p)
1688 self.assertRaises(StopIteration, next, q)
1689
1690 self.assertRaises(StopIteration, next, repeat(None, 0))
1691
1692 for f in (filter, filterfalse, map, takewhile, dropwhile, starmap):
1693 self.assertRaises(StopIteration, next, f(lambda x:x, []))
1694 self.assertRaises(StopIteration, next, f(lambda x:x, StopNow()))
1695
1696 @support.cpython_only
1697 def test_combinations_result_gc(self):
1698 # bpo-42536: combinations's tuple-reuse speed trick breaks the GC's
1699 # assumptions about what can be untracked. Make sure we re-track result
1700 # tuples whenever we reuse them.
1701 it = combinations([None, []], 1)
1702 next(it)
1703 gc.collect()
1704 # That GC collection probably untracked the recycled internal result
1705 # tuple, which has the value (None,). Make sure it's re-tracked when
1706 # it's mutated and returned from __next__:
1707 self.assertTrue(gc.is_tracked(next(it)))
1708
1709 @support.cpython_only
1710 def test_combinations_with_replacement_result_gc(self):
1711 # Ditto for combinations_with_replacement.
1712 it = combinations_with_replacement([None, []], 1)
1713 next(it)
1714 gc.collect()
1715 self.assertTrue(gc.is_tracked(next(it)))
1716
1717 @support.cpython_only
1718 def test_permutations_result_gc(self):
1719 # Ditto for permutations.
1720 it = permutations([None, []], 1)
1721 next(it)
1722 gc.collect()
1723 self.assertTrue(gc.is_tracked(next(it)))
1724
1725 @support.cpython_only
1726 def test_product_result_gc(self):
1727 # Ditto for product.
1728 it = product([None, []])
1729 next(it)
1730 gc.collect()
1731 self.assertTrue(gc.is_tracked(next(it)))
1732
1733 @support.cpython_only
1734 def test_zip_longest_result_gc(self):
1735 # Ditto for zip_longest.
1736 it = zip_longest([[]])
1737 gc.collect()
1738 self.assertTrue(gc.is_tracked(next(it)))
1739
1740 @support.cpython_only
1741 def test_immutable_types(self):
1742 from itertools import _grouper, _tee, _tee_dataobject
1743 dataset = (
1744 accumulate,
1745 batched,
1746 chain,
1747 combinations,
1748 combinations_with_replacement,
1749 compress,
1750 count,
1751 cycle,
1752 dropwhile,
1753 filterfalse,
1754 groupby,
1755 _grouper,
1756 islice,
1757 pairwise,
1758 permutations,
1759 product,
1760 repeat,
1761 starmap,
1762 takewhile,
1763 _tee,
1764 _tee_dataobject,
1765 zip_longest,
1766 )
1767 for tp in dataset:
1768 with self.subTest(tp=tp):
1769 with self.assertRaisesRegex(TypeError, "immutable"):
1770 tp.foobar = 1
1771
1772
1773 class ESC[4;38;5;81mTestExamples(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1774
1775 def test_accumulate(self):
1776 self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15])
1777
1778 @pickle_deprecated
1779 def test_accumulate_reducible(self):
1780 # check copy, deepcopy, pickle
1781 data = [1, 2, 3, 4, 5]
1782 accumulated = [1, 3, 6, 10, 15]
1783
1784 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1785 it = accumulate(data)
1786 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:])
1787 self.assertEqual(next(it), 1)
1788 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:])
1789 it = accumulate(data)
1790 self.assertEqual(next(it), 1)
1791 self.assertEqual(list(copy.deepcopy(it)), accumulated[1:])
1792 self.assertEqual(list(copy.copy(it)), accumulated[1:])
1793
1794 @pickle_deprecated
1795 def test_accumulate_reducible_none(self):
1796 # Issue #25718: total is None
1797 it = accumulate([None, None, None], operator.is_)
1798 self.assertEqual(next(it), None)
1799 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1800 it_copy = pickle.loads(pickle.dumps(it, proto))
1801 self.assertEqual(list(it_copy), [True, False])
1802 self.assertEqual(list(copy.deepcopy(it)), [True, False])
1803 self.assertEqual(list(copy.copy(it)), [True, False])
1804
1805 def test_chain(self):
1806 self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF')
1807
1808 def test_chain_from_iterable(self):
1809 self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF')
1810
1811 def test_combinations(self):
1812 self.assertEqual(list(combinations('ABCD', 2)),
1813 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')])
1814 self.assertEqual(list(combinations(range(4), 3)),
1815 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)])
1816
1817 def test_combinations_with_replacement(self):
1818 self.assertEqual(list(combinations_with_replacement('ABC', 2)),
1819 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')])
1820
1821 def test_compress(self):
1822 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF'))
1823
1824 def test_count(self):
1825 self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14])
1826
1827 def test_cycle(self):
1828 self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD'))
1829
1830 def test_dropwhile(self):
1831 self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1])
1832
1833 def test_groupby(self):
1834 self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')],
1835 list('ABCDAB'))
1836 self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')],
1837 [list('AAAA'), list('BBB'), list('CC'), list('D')])
1838
1839 def test_filter(self):
1840 self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9])
1841
1842 def test_filterfalse(self):
1843 self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8])
1844
1845 def test_map(self):
1846 self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000])
1847
1848 def test_islice(self):
1849 self.assertEqual(list(islice('ABCDEFG', 2)), list('AB'))
1850 self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD'))
1851 self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG'))
1852 self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG'))
1853
1854 def test_zip(self):
1855 self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')])
1856
1857 def test_zip_longest(self):
1858 self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')),
1859 [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')])
1860
1861 def test_permutations(self):
1862 self.assertEqual(list(permutations('ABCD', 2)),
1863 list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split())))
1864 self.assertEqual(list(permutations(range(3))),
1865 [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)])
1866
1867 def test_product(self):
1868 self.assertEqual(list(product('ABCD', 'xy')),
1869 list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split())))
1870 self.assertEqual(list(product(range(2), repeat=3)),
1871 [(0,0,0), (0,0,1), (0,1,0), (0,1,1),
1872 (1,0,0), (1,0,1), (1,1,0), (1,1,1)])
1873
1874 def test_repeat(self):
1875 self.assertEqual(list(repeat(10, 3)), [10, 10, 10])
1876
1877 def test_stapmap(self):
1878 self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])),
1879 [32, 9, 1000])
1880
1881 def test_takewhile(self):
1882 self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4])
1883
1884
1885 class ESC[4;38;5;81mTestPurePythonRoughEquivalents(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1886
1887 def test_batched_recipe(self):
1888 def batched_recipe(iterable, n):
1889 "Batch data into tuples of length n. The last batch may be shorter."
1890 # batched('ABCDEFG', 3) --> ABC DEF G
1891 if n < 1:
1892 raise ValueError('n must be at least one')
1893 it = iter(iterable)
1894 while batch := tuple(islice(it, n)):
1895 yield batch
1896
1897 for iterable, n in product(
1898 ['', 'a', 'ab', 'abc', 'abcd', 'abcde', 'abcdef', 'abcdefg', None],
1899 [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, None]):
1900 with self.subTest(iterable=iterable, n=n):
1901 try:
1902 e1, r1 = None, list(batched(iterable, n))
1903 except Exception as e:
1904 e1, r1 = type(e), None
1905 try:
1906 e2, r2 = None, list(batched_recipe(iterable, n))
1907 except Exception as e:
1908 e2, r2 = type(e), None
1909 self.assertEqual(r1, r2)
1910 self.assertEqual(e1, e2)
1911
1912 @staticmethod
1913 def islice(iterable, *args):
1914 s = slice(*args)
1915 start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1
1916 it = iter(range(start, stop, step))
1917 try:
1918 nexti = next(it)
1919 except StopIteration:
1920 # Consume *iterable* up to the *start* position.
1921 for i, element in zip(range(start), iterable):
1922 pass
1923 return
1924 try:
1925 for i, element in enumerate(iterable):
1926 if i == nexti:
1927 yield element
1928 nexti = next(it)
1929 except StopIteration:
1930 # Consume to *stop*.
1931 for i, element in zip(range(i + 1, stop), iterable):
1932 pass
1933
1934 def test_islice_recipe(self):
1935 self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB'))
1936 self.assertEqual(list(self.islice('ABCDEFG', 2, 4)), list('CD'))
1937 self.assertEqual(list(self.islice('ABCDEFG', 2, None)), list('CDEFG'))
1938 self.assertEqual(list(self.islice('ABCDEFG', 0, None, 2)), list('ACEG'))
1939 # Test items consumed.
1940 it = iter(range(10))
1941 self.assertEqual(list(self.islice(it, 3)), list(range(3)))
1942 self.assertEqual(list(it), list(range(3, 10)))
1943 it = iter(range(10))
1944 self.assertEqual(list(self.islice(it, 3, 3)), [])
1945 self.assertEqual(list(it), list(range(3, 10)))
1946 # Test that slice finishes in predictable state.
1947 c = count()
1948 self.assertEqual(list(self.islice(c, 1, 3, 50)), [1])
1949 self.assertEqual(next(c), 3)
1950
1951
1952 class ESC[4;38;5;81mTestGC(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
1953
1954 def makecycle(self, iterator, container):
1955 container.append(iterator)
1956 next(iterator)
1957 del container, iterator
1958
1959 def test_accumulate(self):
1960 a = []
1961 self.makecycle(accumulate([1,2,a,3]), a)
1962
1963 def test_batched(self):
1964 a = []
1965 self.makecycle(batched([1,2,a,3], 2), a)
1966
1967 def test_chain(self):
1968 a = []
1969 self.makecycle(chain(a), a)
1970
1971 def test_chain_from_iterable(self):
1972 a = []
1973 self.makecycle(chain.from_iterable([a]), a)
1974
1975 def test_combinations(self):
1976 a = []
1977 self.makecycle(combinations([1,2,a,3], 3), a)
1978
1979 def test_combinations_with_replacement(self):
1980 a = []
1981 self.makecycle(combinations_with_replacement([1,2,a,3], 3), a)
1982
1983 def test_compress(self):
1984 a = []
1985 self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a)
1986
1987 def test_count(self):
1988 a = []
1989 Int = type('Int', (int,), dict(x=a))
1990 self.makecycle(count(Int(0), Int(1)), a)
1991
1992 def test_cycle(self):
1993 a = []
1994 self.makecycle(cycle([a]*2), a)
1995
1996 def test_dropwhile(self):
1997 a = []
1998 self.makecycle(dropwhile(bool, [0, a, a]), a)
1999
2000 def test_groupby(self):
2001 a = []
2002 self.makecycle(groupby([a]*2, lambda x:x), a)
2003
2004 def test_issue2246(self):
2005 # Issue 2246 -- the _grouper iterator was not included in GC
2006 n = 10
2007 keyfunc = lambda x: x
2008 for i, j in groupby(range(n), key=keyfunc):
2009 keyfunc.__dict__.setdefault('x',[]).append(j)
2010
2011 def test_filter(self):
2012 a = []
2013 self.makecycle(filter(lambda x:True, [a]*2), a)
2014
2015 def test_filterfalse(self):
2016 a = []
2017 self.makecycle(filterfalse(lambda x:False, a), a)
2018
2019 def test_zip(self):
2020 a = []
2021 self.makecycle(zip([a]*2, [a]*3), a)
2022
2023 def test_zip_longest(self):
2024 a = []
2025 self.makecycle(zip_longest([a]*2, [a]*3), a)
2026 b = [a, None]
2027 self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a)
2028
2029 def test_map(self):
2030 a = []
2031 self.makecycle(map(lambda x:x, [a]*2), a)
2032
2033 def test_islice(self):
2034 a = []
2035 self.makecycle(islice([a]*2, None), a)
2036
2037 def test_pairwise(self):
2038 a = []
2039 self.makecycle(pairwise([a]*5), a)
2040
2041 def test_permutations(self):
2042 a = []
2043 self.makecycle(permutations([1,2,a,3], 3), a)
2044
2045 def test_product(self):
2046 a = []
2047 self.makecycle(product([1,2,a,3], repeat=3), a)
2048
2049 def test_repeat(self):
2050 a = []
2051 self.makecycle(repeat(a), a)
2052
2053 def test_starmap(self):
2054 a = []
2055 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a)
2056
2057 def test_takewhile(self):
2058 a = []
2059 self.makecycle(takewhile(bool, [1, 0, a, a]), a)
2060
2061 def R(seqn):
2062 'Regular generator'
2063 for i in seqn:
2064 yield i
2065
2066 class ESC[4;38;5;81mG:
2067 'Sequence using __getitem__'
2068 def __init__(self, seqn):
2069 self.seqn = seqn
2070 def __getitem__(self, i):
2071 return self.seqn[i]
2072
2073 class ESC[4;38;5;81mI:
2074 'Sequence using iterator protocol'
2075 def __init__(self, seqn):
2076 self.seqn = seqn
2077 self.i = 0
2078 def __iter__(self):
2079 return self
2080 def __next__(self):
2081 if self.i >= len(self.seqn): raise StopIteration
2082 v = self.seqn[self.i]
2083 self.i += 1
2084 return v
2085
2086 class ESC[4;38;5;81mIg:
2087 'Sequence using iterator protocol defined with a generator'
2088 def __init__(self, seqn):
2089 self.seqn = seqn
2090 self.i = 0
2091 def __iter__(self):
2092 for val in self.seqn:
2093 yield val
2094
2095 class ESC[4;38;5;81mX:
2096 'Missing __getitem__ and __iter__'
2097 def __init__(self, seqn):
2098 self.seqn = seqn
2099 self.i = 0
2100 def __next__(self):
2101 if self.i >= len(self.seqn): raise StopIteration
2102 v = self.seqn[self.i]
2103 self.i += 1
2104 return v
2105
2106 class ESC[4;38;5;81mN:
2107 'Iterator missing __next__()'
2108 def __init__(self, seqn):
2109 self.seqn = seqn
2110 self.i = 0
2111 def __iter__(self):
2112 return self
2113
2114 class ESC[4;38;5;81mE:
2115 'Test propagation of exceptions'
2116 def __init__(self, seqn):
2117 self.seqn = seqn
2118 self.i = 0
2119 def __iter__(self):
2120 return self
2121 def __next__(self):
2122 3 // 0
2123
2124 class ESC[4;38;5;81mE2:
2125 'Test propagation of exceptions after two iterations'
2126 def __init__(self, seqn):
2127 self.seqn = seqn
2128 self.i = 0
2129 def __iter__(self):
2130 return self
2131 def __next__(self):
2132 if self.i == 2:
2133 raise ZeroDivisionError
2134 v = self.seqn[self.i]
2135 self.i += 1
2136 return v
2137
2138 class ESC[4;38;5;81mS:
2139 'Test immediate stop'
2140 def __init__(self, seqn):
2141 pass
2142 def __iter__(self):
2143 return self
2144 def __next__(self):
2145 raise StopIteration
2146
2147 def L(seqn):
2148 'Test multiple tiers of iterators'
2149 return chain(map(lambda x:x, R(Ig(G(seqn)))))
2150
2151
2152 class ESC[4;38;5;81mTestVariousIteratorArgs(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2153
2154 def test_accumulate(self):
2155 s = [1,2,3,4,5]
2156 r = [1,3,6,10,15]
2157 n = len(s)
2158 for g in (G, I, Ig, L, R):
2159 self.assertEqual(list(accumulate(g(s))), r)
2160 self.assertEqual(list(accumulate(S(s))), [])
2161 self.assertRaises(TypeError, accumulate, X(s))
2162 self.assertRaises(TypeError, accumulate, N(s))
2163 self.assertRaises(ZeroDivisionError, list, accumulate(E(s)))
2164
2165 def test_batched(self):
2166 s = 'abcde'
2167 r = [('a', 'b'), ('c', 'd'), ('e',)]
2168 n = 2
2169 for g in (G, I, Ig, L, R):
2170 with self.subTest(g=g):
2171 self.assertEqual(list(batched(g(s), n)), r)
2172 self.assertEqual(list(batched(S(s), 2)), [])
2173 self.assertRaises(TypeError, batched, X(s), 2)
2174 self.assertRaises(TypeError, batched, N(s), 2)
2175 self.assertRaises(ZeroDivisionError, list, batched(E(s), 2))
2176 self.assertRaises(ZeroDivisionError, list, batched(E2(s), 4))
2177
2178 def test_chain(self):
2179 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2180 for g in (G, I, Ig, S, L, R):
2181 self.assertEqual(list(chain(g(s))), list(g(s)))
2182 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s)))
2183 self.assertRaises(TypeError, list, chain(X(s)))
2184 self.assertRaises(TypeError, list, chain(N(s)))
2185 self.assertRaises(ZeroDivisionError, list, chain(E(s)))
2186
2187 def test_compress(self):
2188 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2189 n = len(s)
2190 for g in (G, I, Ig, S, L, R):
2191 self.assertEqual(list(compress(g(s), repeat(1))), list(g(s)))
2192 self.assertRaises(TypeError, compress, X(s), repeat(1))
2193 self.assertRaises(TypeError, compress, N(s), repeat(1))
2194 self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1)))
2195
2196 def test_product(self):
2197 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2198 self.assertRaises(TypeError, product, X(s))
2199 self.assertRaises(TypeError, product, N(s))
2200 self.assertRaises(ZeroDivisionError, product, E(s))
2201
2202 def test_cycle(self):
2203 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2204 for g in (G, I, Ig, S, L, R):
2205 tgtlen = len(s) * 3
2206 expected = list(g(s))*3
2207 actual = list(islice(cycle(g(s)), tgtlen))
2208 self.assertEqual(actual, expected)
2209 self.assertRaises(TypeError, cycle, X(s))
2210 self.assertRaises(TypeError, cycle, N(s))
2211 self.assertRaises(ZeroDivisionError, list, cycle(E(s)))
2212
2213 def test_groupby(self):
2214 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
2215 for g in (G, I, Ig, S, L, R):
2216 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s)))
2217 self.assertRaises(TypeError, groupby, X(s))
2218 self.assertRaises(TypeError, groupby, N(s))
2219 self.assertRaises(ZeroDivisionError, list, groupby(E(s)))
2220
2221 def test_filter(self):
2222 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
2223 for g in (G, I, Ig, S, L, R):
2224 self.assertEqual(list(filter(isEven, g(s))),
2225 [x for x in g(s) if isEven(x)])
2226 self.assertRaises(TypeError, filter, isEven, X(s))
2227 self.assertRaises(TypeError, filter, isEven, N(s))
2228 self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s)))
2229
2230 def test_filterfalse(self):
2231 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
2232 for g in (G, I, Ig, S, L, R):
2233 self.assertEqual(list(filterfalse(isEven, g(s))),
2234 [x for x in g(s) if isOdd(x)])
2235 self.assertRaises(TypeError, filterfalse, isEven, X(s))
2236 self.assertRaises(TypeError, filterfalse, isEven, N(s))
2237 self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s)))
2238
2239 def test_zip(self):
2240 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2241 for g in (G, I, Ig, S, L, R):
2242 self.assertEqual(list(zip(g(s))), lzip(g(s)))
2243 self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s)))
2244 self.assertRaises(TypeError, zip, X(s))
2245 self.assertRaises(TypeError, zip, N(s))
2246 self.assertRaises(ZeroDivisionError, list, zip(E(s)))
2247
2248 def test_ziplongest(self):
2249 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2250 for g in (G, I, Ig, S, L, R):
2251 self.assertEqual(list(zip_longest(g(s))), list(zip(g(s))))
2252 self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s))))
2253 self.assertRaises(TypeError, zip_longest, X(s))
2254 self.assertRaises(TypeError, zip_longest, N(s))
2255 self.assertRaises(ZeroDivisionError, list, zip_longest(E(s)))
2256
2257 def test_map(self):
2258 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)):
2259 for g in (G, I, Ig, S, L, R):
2260 self.assertEqual(list(map(onearg, g(s))),
2261 [onearg(x) for x in g(s)])
2262 self.assertEqual(list(map(operator.pow, g(s), g(s))),
2263 [x**x for x in g(s)])
2264 self.assertRaises(TypeError, map, onearg, X(s))
2265 self.assertRaises(TypeError, map, onearg, N(s))
2266 self.assertRaises(ZeroDivisionError, list, map(onearg, E(s)))
2267
2268 def test_islice(self):
2269 for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2270 for g in (G, I, Ig, S, L, R):
2271 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2])
2272 self.assertRaises(TypeError, islice, X(s), 10)
2273 self.assertRaises(TypeError, islice, N(s), 10)
2274 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10))
2275
2276 def test_pairwise(self):
2277 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2278 for g in (G, I, Ig, S, L, R):
2279 seq = list(g(s))
2280 expected = list(zip(seq, seq[1:]))
2281 actual = list(pairwise(g(s)))
2282 self.assertEqual(actual, expected)
2283 self.assertRaises(TypeError, pairwise, X(s))
2284 self.assertRaises(TypeError, pairwise, N(s))
2285 self.assertRaises(ZeroDivisionError, list, pairwise(E(s)))
2286
2287 def test_starmap(self):
2288 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)):
2289 for g in (G, I, Ig, S, L, R):
2290 ss = lzip(s, s)
2291 self.assertEqual(list(starmap(operator.pow, g(ss))),
2292 [x**x for x in g(s)])
2293 self.assertRaises(TypeError, starmap, operator.pow, X(ss))
2294 self.assertRaises(TypeError, starmap, operator.pow, N(ss))
2295 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss)))
2296
2297 def test_takewhile(self):
2298 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
2299 for g in (G, I, Ig, S, L, R):
2300 tgt = []
2301 for elem in g(s):
2302 if not isEven(elem): break
2303 tgt.append(elem)
2304 self.assertEqual(list(takewhile(isEven, g(s))), tgt)
2305 self.assertRaises(TypeError, takewhile, isEven, X(s))
2306 self.assertRaises(TypeError, takewhile, isEven, N(s))
2307 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s)))
2308
2309 def test_dropwhile(self):
2310 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)):
2311 for g in (G, I, Ig, S, L, R):
2312 tgt = []
2313 for elem in g(s):
2314 if not tgt and isOdd(elem): continue
2315 tgt.append(elem)
2316 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt)
2317 self.assertRaises(TypeError, dropwhile, isOdd, X(s))
2318 self.assertRaises(TypeError, dropwhile, isOdd, N(s))
2319 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s)))
2320
2321 def test_tee(self):
2322 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)):
2323 for g in (G, I, Ig, S, L, R):
2324 it1, it2 = tee(g(s))
2325 self.assertEqual(list(it1), list(g(s)))
2326 self.assertEqual(list(it2), list(g(s)))
2327 self.assertRaises(TypeError, tee, X(s))
2328 self.assertRaises(TypeError, tee, N(s))
2329 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0])
2330
2331 class ESC[4;38;5;81mLengthTransparency(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2332
2333 def test_repeat(self):
2334 self.assertEqual(operator.length_hint(repeat(None, 50)), 50)
2335 self.assertEqual(operator.length_hint(repeat(None, 0)), 0)
2336 self.assertEqual(operator.length_hint(repeat(None), 12), 12)
2337
2338 def test_repeat_with_negative_times(self):
2339 self.assertEqual(operator.length_hint(repeat(None, -1)), 0)
2340 self.assertEqual(operator.length_hint(repeat(None, -2)), 0)
2341 self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0)
2342 self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0)
2343
2344 class ESC[4;38;5;81mRegressionTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2345
2346 def test_sf_793826(self):
2347 # Fix Armin Rigo's successful efforts to wreak havoc
2348
2349 def mutatingtuple(tuple1, f, tuple2):
2350 # this builds a tuple t which is a copy of tuple1,
2351 # then calls f(t), then mutates t to be equal to tuple2
2352 # (needs len(tuple1) == len(tuple2)).
2353 def g(value, first=[1]):
2354 if first:
2355 del first[:]
2356 f(next(z))
2357 return value
2358 items = list(tuple2)
2359 items[1:1] = list(tuple1)
2360 gen = map(g, items)
2361 z = zip(*[gen]*len(tuple1))
2362 next(z)
2363
2364 def f(t):
2365 global T
2366 T = t
2367 first[:] = list(T)
2368
2369 first = []
2370 mutatingtuple((1,2,3), f, (4,5,6))
2371 second = list(T)
2372 self.assertEqual(first, second)
2373
2374
2375 def test_sf_950057(self):
2376 # Make sure that chain() and cycle() catch exceptions immediately
2377 # rather than when shifting between input sources
2378
2379 def gen1():
2380 hist.append(0)
2381 yield 1
2382 hist.append(1)
2383 raise AssertionError
2384 hist.append(2)
2385
2386 def gen2(x):
2387 hist.append(3)
2388 yield 2
2389 hist.append(4)
2390
2391 hist = []
2392 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False)))
2393 self.assertEqual(hist, [0,1])
2394
2395 hist = []
2396 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True)))
2397 self.assertEqual(hist, [0,1])
2398
2399 hist = []
2400 self.assertRaises(AssertionError, list, cycle(gen1()))
2401 self.assertEqual(hist, [0,1])
2402
2403 @support.skip_if_pgo_task
2404 @support.requires_resource('cpu')
2405 def test_long_chain_of_empty_iterables(self):
2406 # Make sure itertools.chain doesn't run into recursion limits when
2407 # dealing with long chains of empty iterables. Even with a high
2408 # number this would probably only fail in Py_DEBUG mode.
2409 it = chain.from_iterable(() for unused in range(10000000))
2410 with self.assertRaises(StopIteration):
2411 next(it)
2412
2413 def test_issue30347_1(self):
2414 def f(n):
2415 if n == 5:
2416 list(b)
2417 return n != 6
2418 for (k, b) in groupby(range(10), f):
2419 list(b) # shouldn't crash
2420
2421 def test_issue30347_2(self):
2422 class ESC[4;38;5;81mK:
2423 def __init__(self, v):
2424 pass
2425 def __eq__(self, other):
2426 nonlocal i
2427 i += 1
2428 if i == 1:
2429 next(g, None)
2430 return True
2431 i = 0
2432 g = next(groupby(range(10), K))[1]
2433 for j in range(2):
2434 next(g, None) # shouldn't crash
2435
2436
2437 class ESC[4;38;5;81mSubclassWithKwargsTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2438 def test_keywords_in_subclass(self):
2439 # count is not subclassable...
2440 testcases = [
2441 (repeat, (1, 2), [1, 1]),
2442 (zip, ([1, 2], 'ab'), [(1, 'a'), (2, 'b')]),
2443 (filter, (None, [0, 1]), [1]),
2444 (filterfalse, (None, [0, 1]), [0]),
2445 (chain, ([1, 2], [3, 4]), [1, 2, 3]),
2446 (map, (str, [1, 2]), ['1', '2']),
2447 (starmap, (operator.pow, ((2, 3), (3, 2))), [8, 9]),
2448 (islice, ([1, 2, 3, 4], 1, 3), [2, 3]),
2449 (takewhile, (isEven, [2, 3, 4]), [2]),
2450 (dropwhile, (isEven, [2, 3, 4]), [3, 4]),
2451 (cycle, ([1, 2],), [1, 2, 1]),
2452 (compress, ('ABC', [1, 0, 1]), ['A', 'C']),
2453 ]
2454 for cls, args, result in testcases:
2455 with self.subTest(cls):
2456 class ESC[4;38;5;81msubclass(ESC[4;38;5;149mcls):
2457 pass
2458 u = subclass(*args)
2459 self.assertIs(type(u), subclass)
2460 self.assertEqual(list(islice(u, 0, 3)), result)
2461 with self.assertRaises(TypeError):
2462 subclass(*args, newarg=3)
2463
2464 for cls, args, result in testcases:
2465 # Constructors of repeat, zip, compress accept keyword arguments.
2466 # Their subclasses need overriding __new__ to support new
2467 # keyword arguments.
2468 if cls in [repeat, zip, compress]:
2469 continue
2470 with self.subTest(cls):
2471 class ESC[4;38;5;81msubclass_with_init(ESC[4;38;5;149mcls):
2472 def __init__(self, *args, newarg=None):
2473 self.newarg = newarg
2474 u = subclass_with_init(*args, newarg=3)
2475 self.assertIs(type(u), subclass_with_init)
2476 self.assertEqual(list(islice(u, 0, 3)), result)
2477 self.assertEqual(u.newarg, 3)
2478
2479 for cls, args, result in testcases:
2480 with self.subTest(cls):
2481 class ESC[4;38;5;81msubclass_with_new(ESC[4;38;5;149mcls):
2482 def __new__(cls, *args, newarg=None):
2483 self = super().__new__(cls, *args)
2484 self.newarg = newarg
2485 return self
2486 u = subclass_with_new(*args, newarg=3)
2487 self.assertIs(type(u), subclass_with_new)
2488 self.assertEqual(list(islice(u, 0, 3)), result)
2489 self.assertEqual(u.newarg, 3)
2490
2491
2492 @support.cpython_only
2493 class ESC[4;38;5;81mSizeofTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2494 def setUp(self):
2495 self.ssize_t = struct.calcsize('n')
2496
2497 check_sizeof = support.check_sizeof
2498
2499 def test_product_sizeof(self):
2500 basesize = support.calcobjsize('3Pi')
2501 check = self.check_sizeof
2502 check(product('ab', '12'), basesize + 2 * self.ssize_t)
2503 check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t)
2504
2505 def test_combinations_sizeof(self):
2506 basesize = support.calcobjsize('3Pni')
2507 check = self.check_sizeof
2508 check(combinations('abcd', 3), basesize + 3 * self.ssize_t)
2509 check(combinations(range(10), 4), basesize + 4 * self.ssize_t)
2510
2511 def test_combinations_with_replacement_sizeof(self):
2512 cwr = combinations_with_replacement
2513 basesize = support.calcobjsize('3Pni')
2514 check = self.check_sizeof
2515 check(cwr('abcd', 3), basesize + 3 * self.ssize_t)
2516 check(cwr(range(10), 4), basesize + 4 * self.ssize_t)
2517
2518 def test_permutations_sizeof(self):
2519 basesize = support.calcobjsize('4Pni')
2520 check = self.check_sizeof
2521 check(permutations('abcd'),
2522 basesize + 4 * self.ssize_t + 4 * self.ssize_t)
2523 check(permutations('abcd', 3),
2524 basesize + 4 * self.ssize_t + 3 * self.ssize_t)
2525 check(permutations('abcde', 3),
2526 basesize + 5 * self.ssize_t + 3 * self.ssize_t)
2527 check(permutations(range(10), 4),
2528 basesize + 10 * self.ssize_t + 4 * self.ssize_t)
2529
2530
2531 def load_tests(loader, tests, pattern):
2532 tests.addTest(doctest.DocTestSuite())
2533 return tests
2534
2535
2536 if __name__ == "__main__":
2537 unittest.main()