1import unittest 2from test import support 3from itertools import * 4import weakref 5from decimal import Decimal 6from fractions import Fraction 7import operator 8import random 9import copy 10import pickle 11from functools import reduce 12import sys 13import struct 14import threading 15import gc 16 17maxsize = support.MAX_Py_ssize_t 18minsize = -maxsize-1 19 20def lzip(*args): 21 return list(zip(*args)) 22 23def onearg(x): 24 'Test function of one argument' 25 return 2*x 26 27def errfunc(*args): 28 'Test function that raises an error' 29 raise ValueError 30 31def gen3(): 32 'Non-restartable source sequence' 33 for i in (0, 1, 2): 34 yield i 35 36def isEven(x): 37 'Test predicate' 38 return x%2==0 39 40def isOdd(x): 41 'Test predicate' 42 return x%2==1 43 44def tupleize(*args): 45 return args 46 47def irange(n): 48 for i in range(n): 49 yield i 50 51class StopNow: 52 'Class emulating an empty iterable.' 53 def __iter__(self): 54 return self 55 def __next__(self): 56 raise StopIteration 57 58def take(n, seq): 59 'Convenience function for partially consuming a long of infinite iterable' 60 return list(islice(seq, n)) 61 62def prod(iterable): 63 return reduce(operator.mul, iterable, 1) 64 65def fact(n): 66 'Factorial' 67 return prod(range(1, n+1)) 68 69# root level methods for pickling ability 70def testR(r): 71 return r[0] 72 73def testR2(r): 74 return r[2] 75 76def underten(x): 77 return x<10 78 79picklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto)) 80 for proto in range(pickle.HIGHEST_PROTOCOL + 1)] 81 82class TestBasicOps(unittest.TestCase): 83 84 def pickletest(self, protocol, it, stop=4, take=1, compare=None): 85 """Test that an iterator is the same after pickling, also when part-consumed""" 86 def expand(it, i=0): 87 # Recursively expand iterables, within sensible bounds 88 if i > 10: 89 raise RuntimeError("infinite recursion encountered") 90 if isinstance(it, str): 91 return it 92 try: 93 l = list(islice(it, stop)) 94 except TypeError: 95 return it # can't expand it 96 return [expand(e, i+1) for e in l] 97 98 # Test the initial copy against the original 99 dump = pickle.dumps(it, protocol) 100 i2 = pickle.loads(dump) 101 self.assertEqual(type(it), type(i2)) 102 a, b = expand(it), expand(i2) 103 self.assertEqual(a, b) 104 if compare: 105 c = expand(compare) 106 self.assertEqual(a, c) 107 108 # Take from the copy, and create another copy and compare them. 109 i3 = pickle.loads(dump) 110 took = 0 111 try: 112 for i in range(take): 113 next(i3) 114 took += 1 115 except StopIteration: 116 pass #in case there is less data than 'take' 117 dump = pickle.dumps(i3, protocol) 118 i4 = pickle.loads(dump) 119 a, b = expand(i3), expand(i4) 120 self.assertEqual(a, b) 121 if compare: 122 c = expand(compare[took:]) 123 self.assertEqual(a, c); 124 125 def test_accumulate(self): 126 self.assertEqual(list(accumulate(range(10))), # one positional arg 127 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 128 self.assertEqual(list(accumulate(iterable=range(10))), # kw arg 129 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 130 for typ in int, complex, Decimal, Fraction: # multiple types 131 self.assertEqual( 132 list(accumulate(map(typ, range(10)))), 133 list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]))) 134 self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric 135 self.assertEqual(list(accumulate([])), []) # empty iterable 136 self.assertEqual(list(accumulate([7])), [7]) # iterable of length one 137 self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args 138 self.assertRaises(TypeError, accumulate) # too few args 139 self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg 140 self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add 141 142 s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6] 143 self.assertEqual(list(accumulate(s, min)), 144 [2, 2, 2, 2, 2, 0, 0, 0, 0, 0]) 145 self.assertEqual(list(accumulate(s, max)), 146 [2, 8, 9, 9, 9, 9, 9, 9, 9, 9]) 147 self.assertEqual(list(accumulate(s, operator.mul)), 148 [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0]) 149 with self.assertRaises(TypeError): 150 list(accumulate(s, chr)) # unary-operation 151 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 152 self.pickletest(proto, accumulate(range(10))) # test pickling 153 self.pickletest(proto, accumulate(range(10), initial=7)) 154 self.assertEqual(list(accumulate([10, 5, 1], initial=None)), [10, 15, 16]) 155 self.assertEqual(list(accumulate([10, 5, 1], initial=100)), [100, 110, 115, 116]) 156 self.assertEqual(list(accumulate([], initial=100)), [100]) 157 with self.assertRaises(TypeError): 158 list(accumulate([10, 20], 100)) 159 160 def test_chain(self): 161 162 def chain2(*iterables): 163 'Pure python version in the docs' 164 for it in iterables: 165 for element in it: 166 yield element 167 168 for c in (chain, chain2): 169 self.assertEqual(list(c('abc', 'def')), list('abcdef')) 170 self.assertEqual(list(c('abc')), list('abc')) 171 self.assertEqual(list(c('')), []) 172 self.assertEqual(take(4, c('abc', 'def')), list('abcd')) 173 self.assertRaises(TypeError, list,c(2, 3)) 174 175 def test_chain_from_iterable(self): 176 self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef')) 177 self.assertEqual(list(chain.from_iterable(['abc'])), list('abc')) 178 self.assertEqual(list(chain.from_iterable([''])), []) 179 self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd')) 180 self.assertRaises(TypeError, list, chain.from_iterable([2, 3])) 181 182 def test_chain_reducible(self): 183 for oper in [copy.deepcopy] + picklecopiers: 184 it = chain('abc', 'def') 185 self.assertEqual(list(oper(it)), list('abcdef')) 186 self.assertEqual(next(it), 'a') 187 self.assertEqual(list(oper(it)), list('bcdef')) 188 189 self.assertEqual(list(oper(chain(''))), []) 190 self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd')) 191 self.assertRaises(TypeError, list, oper(chain(2, 3))) 192 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 193 self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef')) 194 195 def test_chain_setstate(self): 196 self.assertRaises(TypeError, chain().__setstate__, ()) 197 self.assertRaises(TypeError, chain().__setstate__, []) 198 self.assertRaises(TypeError, chain().__setstate__, 0) 199 self.assertRaises(TypeError, chain().__setstate__, ([],)) 200 self.assertRaises(TypeError, chain().__setstate__, (iter([]), [])) 201 it = chain() 202 it.__setstate__((iter(['abc', 'def']),)) 203 self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f']) 204 it = chain() 205 it.__setstate__((iter(['abc', 'def']), iter(['ghi']))) 206 self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f']) 207 208 def test_combinations(self): 209 self.assertRaises(TypeError, combinations, 'abc') # missing r argument 210 self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments 211 self.assertRaises(TypeError, combinations, None) # pool is not iterable 212 self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative 213 214 for op in [lambda a:a] + picklecopiers: 215 self.assertEqual(list(op(combinations('abc', 32))), []) # r > n 216 217 self.assertEqual(list(op(combinations('ABCD', 2))), 218 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 219 testIntermediate = combinations('ABCD', 2) 220 next(testIntermediate) 221 self.assertEqual(list(op(testIntermediate)), 222 [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 223 224 self.assertEqual(list(op(combinations(range(4), 3))), 225 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 226 testIntermediate = combinations(range(4), 3) 227 next(testIntermediate) 228 self.assertEqual(list(op(testIntermediate)), 229 [(0,1,3), (0,2,3), (1,2,3)]) 230 231 232 def combinations1(iterable, r): 233 'Pure python version shown in the docs' 234 pool = tuple(iterable) 235 n = len(pool) 236 if r > n: 237 return 238 indices = list(range(r)) 239 yield tuple(pool[i] for i in indices) 240 while 1: 241 for i in reversed(range(r)): 242 if indices[i] != i + n - r: 243 break 244 else: 245 return 246 indices[i] += 1 247 for j in range(i+1, r): 248 indices[j] = indices[j-1] + 1 249 yield tuple(pool[i] for i in indices) 250 251 def combinations2(iterable, r): 252 'Pure python version shown in the docs' 253 pool = tuple(iterable) 254 n = len(pool) 255 for indices in permutations(range(n), r): 256 if sorted(indices) == list(indices): 257 yield tuple(pool[i] for i in indices) 258 259 def combinations3(iterable, r): 260 'Pure python version from cwr()' 261 pool = tuple(iterable) 262 n = len(pool) 263 for indices in combinations_with_replacement(range(n), r): 264 if len(set(indices)) == r: 265 yield tuple(pool[i] for i in indices) 266 267 for n in range(7): 268 values = [5*x-12 for x in range(n)] 269 for r in range(n+2): 270 result = list(combinations(values, r)) 271 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs 272 self.assertEqual(len(result), len(set(result))) # no repeats 273 self.assertEqual(result, sorted(result)) # lexicographic order 274 for c in result: 275 self.assertEqual(len(c), r) # r-length combinations 276 self.assertEqual(len(set(c)), r) # no duplicate elements 277 self.assertEqual(list(c), sorted(c)) # keep original ordering 278 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 279 self.assertEqual(list(c), 280 [e for e in values if e in c]) # comb is a subsequence of the input iterable 281 self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version 282 self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version 283 self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version 284 285 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 286 self.pickletest(proto, combinations(values, r)) # test pickling 287 288 @support.bigaddrspacetest 289 def test_combinations_overflow(self): 290 with self.assertRaises((OverflowError, MemoryError)): 291 combinations("AA", 2**29) 292 293 # Test implementation detail: tuple re-use 294 @support.impl_detail("tuple reuse is specific to CPython") 295 def test_combinations_tuple_reuse(self): 296 self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1) 297 self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1) 298 299 def test_combinations_with_replacement(self): 300 cwr = combinations_with_replacement 301 self.assertRaises(TypeError, cwr, 'abc') # missing r argument 302 self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments 303 self.assertRaises(TypeError, cwr, None) # pool is not iterable 304 self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative 305 306 for op in [lambda a:a] + picklecopiers: 307 self.assertEqual(list(op(cwr('ABC', 2))), 308 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 309 testIntermediate = cwr('ABC', 2) 310 next(testIntermediate) 311 self.assertEqual(list(op(testIntermediate)), 312 [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 313 314 315 def cwr1(iterable, r): 316 'Pure python version shown in the docs' 317 # number items returned: (n+r-1)! / r! / (n-1)! when n>0 318 pool = tuple(iterable) 319 n = len(pool) 320 if not n and r: 321 return 322 indices = [0] * r 323 yield tuple(pool[i] for i in indices) 324 while 1: 325 for i in reversed(range(r)): 326 if indices[i] != n - 1: 327 break 328 else: 329 return 330 indices[i:] = [indices[i] + 1] * (r - i) 331 yield tuple(pool[i] for i in indices) 332 333 def cwr2(iterable, r): 334 'Pure python version shown in the docs' 335 pool = tuple(iterable) 336 n = len(pool) 337 for indices in product(range(n), repeat=r): 338 if sorted(indices) == list(indices): 339 yield tuple(pool[i] for i in indices) 340 341 def numcombs(n, r): 342 if not n: 343 return 0 if r else 1 344 return fact(n+r-1) / fact(r)/ fact(n-1) 345 346 for n in range(7): 347 values = [5*x-12 for x in range(n)] 348 for r in range(n+2): 349 result = list(cwr(values, r)) 350 351 self.assertEqual(len(result), numcombs(n, r)) # right number of combs 352 self.assertEqual(len(result), len(set(result))) # no repeats 353 self.assertEqual(result, sorted(result)) # lexicographic order 354 355 regular_combs = list(combinations(values, r)) # compare to combs without replacement 356 if n == 0 or r <= 1: 357 self.assertEqual(result, regular_combs) # cases that should be identical 358 else: 359 self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs 360 361 for c in result: 362 self.assertEqual(len(c), r) # r-length combinations 363 noruns = [k for k,v in groupby(c)] # combo without consecutive repeats 364 self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive 365 self.assertEqual(list(c), sorted(c)) # keep original ordering 366 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 367 self.assertEqual(noruns, 368 [e for e in values if e in c]) # comb is a subsequence of the input iterable 369 self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version 370 self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version 371 372 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 373 self.pickletest(proto, cwr(values,r)) # test pickling 374 375 @support.bigaddrspacetest 376 def test_combinations_with_replacement_overflow(self): 377 with self.assertRaises((OverflowError, MemoryError)): 378 combinations_with_replacement("AA", 2**30) 379 380 # Test implementation detail: tuple re-use 381 @support.impl_detail("tuple reuse is specific to CPython") 382 def test_combinations_with_replacement_tuple_reuse(self): 383 cwr = combinations_with_replacement 384 self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1) 385 self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1) 386 387 def test_permutations(self): 388 self.assertRaises(TypeError, permutations) # too few arguments 389 self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments 390 self.assertRaises(TypeError, permutations, None) # pool is not iterable 391 self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative 392 self.assertEqual(list(permutations('abc', 32)), []) # r > n 393 self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None 394 self.assertEqual(list(permutations(range(3), 2)), 395 [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)]) 396 397 def permutations1(iterable, r=None): 398 'Pure python version shown in the docs' 399 pool = tuple(iterable) 400 n = len(pool) 401 r = n if r is None else r 402 if r > n: 403 return 404 indices = list(range(n)) 405 cycles = list(range(n-r+1, n+1))[::-1] 406 yield tuple(pool[i] for i in indices[:r]) 407 while n: 408 for i in reversed(range(r)): 409 cycles[i] -= 1 410 if cycles[i] == 0: 411 indices[i:] = indices[i+1:] + indices[i:i+1] 412 cycles[i] = n - i 413 else: 414 j = cycles[i] 415 indices[i], indices[-j] = indices[-j], indices[i] 416 yield tuple(pool[i] for i in indices[:r]) 417 break 418 else: 419 return 420 421 def permutations2(iterable, r=None): 422 'Pure python version shown in the docs' 423 pool = tuple(iterable) 424 n = len(pool) 425 r = n if r is None else r 426 for indices in product(range(n), repeat=r): 427 if len(set(indices)) == r: 428 yield tuple(pool[i] for i in indices) 429 430 for n in range(7): 431 values = [5*x-12 for x in range(n)] 432 for r in range(n+2): 433 result = list(permutations(values, r)) 434 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms 435 self.assertEqual(len(result), len(set(result))) # no repeats 436 self.assertEqual(result, sorted(result)) # lexicographic order 437 for p in result: 438 self.assertEqual(len(p), r) # r-length permutations 439 self.assertEqual(len(set(p)), r) # no duplicate elements 440 self.assertTrue(all(e in values for e in p)) # elements taken from input iterable 441 self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version 442 self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version 443 if r == n: 444 self.assertEqual(result, list(permutations(values, None))) # test r as None 445 self.assertEqual(result, list(permutations(values))) # test default r 446 447 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 448 self.pickletest(proto, permutations(values, r)) # test pickling 449 450 @support.bigaddrspacetest 451 def test_permutations_overflow(self): 452 with self.assertRaises((OverflowError, MemoryError)): 453 permutations("A", 2**30) 454 455 @support.impl_detail("tuple reuse is specific to CPython") 456 def test_permutations_tuple_reuse(self): 457 self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1) 458 self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1) 459 460 def test_combinatorics(self): 461 # Test relationships between product(), permutations(), 462 # combinations() and combinations_with_replacement(). 463 464 for n in range(6): 465 s = 'ABCDEFG'[:n] 466 for r in range(8): 467 prod = list(product(s, repeat=r)) 468 cwr = list(combinations_with_replacement(s, r)) 469 perm = list(permutations(s, r)) 470 comb = list(combinations(s, r)) 471 472 # Check size 473 self.assertEqual(len(prod), n**r) 474 self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r)) 475 self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r)) 476 self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r)) 477 478 # Check lexicographic order without repeated tuples 479 self.assertEqual(prod, sorted(set(prod))) 480 self.assertEqual(cwr, sorted(set(cwr))) 481 self.assertEqual(perm, sorted(set(perm))) 482 self.assertEqual(comb, sorted(set(comb))) 483 484 # Check interrelationships 485 self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted 486 self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups 487 self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted 488 self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups 489 self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr 490 self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm 491 self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm 492 493 def test_compress(self): 494 self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF')) 495 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 496 self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list('')) 497 self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF')) 498 self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC')) 499 self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC')) 500 n = 10000 501 data = chain.from_iterable(repeat(range(6), n)) 502 selectors = chain.from_iterable(repeat((0, 1))) 503 self.assertEqual(list(compress(data, selectors)), [1,3,5] * n) 504 self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable 505 self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable 506 self.assertRaises(TypeError, compress, range(6)) # too few args 507 self.assertRaises(TypeError, compress, range(6), None) # too many args 508 509 # check copy, deepcopy, pickle 510 for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers: 511 for data, selectors, result1, result2 in [ 512 ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'), 513 ('ABCDEF', [0,0,0,0,0,0], '', ''), 514 ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'), 515 ('ABCDEF', [1,0,1], 'AC', 'C'), 516 ('ABC', [0,1,1,1,1,1], 'BC', 'C'), 517 ]: 518 519 self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1)) 520 self.assertEqual(list(op(compress(data, selectors))), list(result1)) 521 testIntermediate = compress(data, selectors) 522 if result1: 523 next(testIntermediate) 524 self.assertEqual(list(op(testIntermediate)), list(result2)) 525 526 527 def test_count(self): 528 self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)]) 529 self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)]) 530 self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)]) 531 self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)]) 532 self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)]) 533 self.assertRaises(TypeError, count, 2, 3, 4) 534 self.assertRaises(TypeError, count, 'a') 535 self.assertEqual(take(10, count(maxsize-5)), 536 list(range(maxsize-5, maxsize+5))) 537 self.assertEqual(take(10, count(-maxsize-5)), 538 list(range(-maxsize-5, -maxsize+5))) 539 self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25]) 540 self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j]) 541 self.assertEqual(take(3, count(Decimal('1.1'))), 542 [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')]) 543 self.assertEqual(take(3, count(Fraction(2, 3))), 544 [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)]) 545 BIGINT = 1<<1000 546 self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2]) 547 c = count(3) 548 self.assertEqual(repr(c), 'count(3)') 549 next(c) 550 self.assertEqual(repr(c), 'count(4)') 551 c = count(-9) 552 self.assertEqual(repr(c), 'count(-9)') 553 next(c) 554 self.assertEqual(next(c), -8) 555 self.assertEqual(repr(count(10.25)), 'count(10.25)') 556 self.assertEqual(repr(count(10.0)), 'count(10.0)') 557 self.assertEqual(type(next(count(10.0))), float) 558 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 559 # Test repr 560 r1 = repr(count(i)) 561 r2 = 'count(%r)'.__mod__(i) 562 self.assertEqual(r1, r2) 563 564 # check copy, deepcopy, pickle 565 for value in -3, 3, maxsize-5, maxsize+5: 566 c = count(value) 567 self.assertEqual(next(copy.copy(c)), value) 568 self.assertEqual(next(copy.deepcopy(c)), value) 569 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 570 self.pickletest(proto, count(value)) 571 572 #check proper internal error handling for large "step' sizes 573 count(1, maxsize+5); sys.exc_info() 574 575 def test_count_with_stride(self): 576 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 577 self.assertEqual(lzip('abc',count(start=2,step=3)), 578 [('a', 2), ('b', 5), ('c', 8)]) 579 self.assertEqual(lzip('abc',count(step=-1)), 580 [('a', 0), ('b', -1), ('c', -2)]) 581 self.assertRaises(TypeError, count, 'a', 'b') 582 self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)]) 583 self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)]) 584 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 585 self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3))) 586 self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3))) 587 self.assertEqual(take(3, count(10, maxsize+5)), 588 list(range(10, 10+3*(maxsize+5), maxsize+5))) 589 self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5]) 590 self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j]) 591 self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))), 592 [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')]) 593 self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))), 594 [Fraction(2,3), Fraction(17,21), Fraction(20,21)]) 595 BIGINT = 1<<1000 596 self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT]) 597 self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0])) 598 c = count(3, 5) 599 self.assertEqual(repr(c), 'count(3, 5)') 600 next(c) 601 self.assertEqual(repr(c), 'count(8, 5)') 602 c = count(-9, 0) 603 self.assertEqual(repr(c), 'count(-9, 0)') 604 next(c) 605 self.assertEqual(repr(c), 'count(-9, 0)') 606 c = count(-9, -3) 607 self.assertEqual(repr(c), 'count(-9, -3)') 608 next(c) 609 self.assertEqual(repr(c), 'count(-12, -3)') 610 self.assertEqual(repr(c), 'count(-12, -3)') 611 self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)') 612 self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int 613 self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0 614 self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)') 615 c = count(10, 1.0) 616 self.assertEqual(type(next(c)), int) 617 self.assertEqual(type(next(c)), float) 618 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 619 for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5): 620 # Test repr 621 r1 = repr(count(i, j)) 622 if j == 1: 623 r2 = ('count(%r)' % i) 624 else: 625 r2 = ('count(%r, %r)' % (i, j)) 626 self.assertEqual(r1, r2) 627 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 628 self.pickletest(proto, count(i, j)) 629 630 def test_cycle(self): 631 self.assertEqual(take(10, cycle('abc')), list('abcabcabca')) 632 self.assertEqual(list(cycle('')), []) 633 self.assertRaises(TypeError, cycle) 634 self.assertRaises(TypeError, cycle, 5) 635 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0]) 636 637 # check copy, deepcopy, pickle 638 c = cycle('abc') 639 self.assertEqual(next(c), 'a') 640 #simple copy currently not supported, because __reduce__ returns 641 #an internal iterator 642 #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab')) 643 self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab')) 644 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 645 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 646 list('bcabcabcab')) 647 next(c) 648 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 649 list('cabcabcabc')) 650 next(c) 651 next(c) 652 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 653 self.pickletest(proto, cycle('abc')) 654 655 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 656 # test with partial consumed input iterable 657 it = iter('abcde') 658 c = cycle(it) 659 _ = [next(c) for i in range(2)] # consume 2 of 5 inputs 660 p = pickle.dumps(c, proto) 661 d = pickle.loads(p) # rebuild the cycle object 662 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 663 664 # test with completely consumed input iterable 665 it = iter('abcde') 666 c = cycle(it) 667 _ = [next(c) for i in range(7)] # consume 7 of 5 inputs 668 p = pickle.dumps(c, proto) 669 d = pickle.loads(p) # rebuild the cycle object 670 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 671 672 def test_cycle_setstate(self): 673 # Verify both modes for restoring state 674 675 # Mode 0 is efficient. It uses an incompletely consumed input 676 # iterator to build a cycle object and then passes in state with 677 # a list of previously consumed values. There is no data 678 # overlap between the two. 679 c = cycle('defg') 680 c.__setstate__((list('abc'), 0)) 681 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 682 683 # Mode 1 is inefficient. It starts with a cycle object built 684 # from an iterator over the remaining elements in a partial 685 # cycle and then passes in state with all of the previously 686 # seen values (this overlaps values included in the iterator). 687 c = cycle('defg') 688 c.__setstate__((list('abcdefg'), 1)) 689 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 690 691 # The first argument to setstate needs to be a tuple 692 with self.assertRaises(TypeError): 693 cycle('defg').__setstate__([list('abcdefg'), 0]) 694 695 # The first argument in the setstate tuple must be a list 696 with self.assertRaises(TypeError): 697 c = cycle('defg') 698 c.__setstate__((tuple('defg'), 0)) 699 take(20, c) 700 701 # The second argument in the setstate tuple must be an int 702 with self.assertRaises(TypeError): 703 cycle('defg').__setstate__((list('abcdefg'), 'x')) 704 705 self.assertRaises(TypeError, cycle('').__setstate__, ()) 706 self.assertRaises(TypeError, cycle('').__setstate__, ([],)) 707 708 def test_groupby(self): 709 # Check whether it accepts arguments correctly 710 self.assertEqual([], list(groupby([]))) 711 self.assertEqual([], list(groupby([], key=id))) 712 self.assertRaises(TypeError, list, groupby('abc', [])) 713 self.assertRaises(TypeError, groupby, None) 714 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10) 715 716 # Check normal input 717 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22), 718 (2,15,22), (3,16,23), (3,17,23)] 719 dup = [] 720 for k, g in groupby(s, lambda r:r[0]): 721 for elem in g: 722 self.assertEqual(k, elem[0]) 723 dup.append(elem) 724 self.assertEqual(s, dup) 725 726 # Check normal pickled 727 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 728 dup = [] 729 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 730 for elem in g: 731 self.assertEqual(k, elem[0]) 732 dup.append(elem) 733 self.assertEqual(s, dup) 734 735 # Check nested case 736 dup = [] 737 for k, g in groupby(s, testR): 738 for ik, ig in groupby(g, testR2): 739 for elem in ig: 740 self.assertEqual(k, elem[0]) 741 self.assertEqual(ik, elem[2]) 742 dup.append(elem) 743 self.assertEqual(s, dup) 744 745 # Check nested and pickled 746 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 747 dup = [] 748 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 749 for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)): 750 for elem in ig: 751 self.assertEqual(k, elem[0]) 752 self.assertEqual(ik, elem[2]) 753 dup.append(elem) 754 self.assertEqual(s, dup) 755 756 757 # Check case where inner iterator is not used 758 keys = [k for k, g in groupby(s, testR)] 759 expectedkeys = set([r[0] for r in s]) 760 self.assertEqual(set(keys), expectedkeys) 761 self.assertEqual(len(keys), len(expectedkeys)) 762 763 # Check case where inner iterator is used after advancing the groupby 764 # iterator 765 s = list(zip('AABBBAAAA', range(9))) 766 it = groupby(s, testR) 767 _, g1 = next(it) 768 _, g2 = next(it) 769 _, g3 = next(it) 770 self.assertEqual(list(g1), []) 771 self.assertEqual(list(g2), []) 772 self.assertEqual(next(g3), ('A', 5)) 773 list(it) # exhaust the groupby iterator 774 self.assertEqual(list(g3), []) 775 776 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 777 it = groupby(s, testR) 778 _, g = next(it) 779 next(it) 780 next(it) 781 self.assertEqual(list(pickle.loads(pickle.dumps(g, proto))), []) 782 783 # Exercise pipes and filters style 784 s = 'abracadabra' 785 # sort s | uniq 786 r = [k for k, g in groupby(sorted(s))] 787 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r']) 788 # sort s | uniq -d 789 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))] 790 self.assertEqual(r, ['a', 'b', 'r']) 791 # sort s | uniq -c 792 r = [(len(list(g)), k) for k, g in groupby(sorted(s))] 793 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')]) 794 # sort s | uniq -c | sort -rn | head -3 795 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3] 796 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')]) 797 798 # iter.__next__ failure 799 class ExpectedError(Exception): 800 pass 801 def delayed_raise(n=0): 802 for i in range(n): 803 yield 'yo' 804 raise ExpectedError 805 def gulp(iterable, keyp=None, func=list): 806 return [func(g) for k, g in groupby(iterable, keyp)] 807 808 # iter.__next__ failure on outer object 809 self.assertRaises(ExpectedError, gulp, delayed_raise(0)) 810 # iter.__next__ failure on inner object 811 self.assertRaises(ExpectedError, gulp, delayed_raise(1)) 812 813 # __eq__ failure 814 class DummyCmp: 815 def __eq__(self, dst): 816 raise ExpectedError 817 s = [DummyCmp(), DummyCmp(), None] 818 819 # __eq__ failure on outer object 820 self.assertRaises(ExpectedError, gulp, s, func=id) 821 # __eq__ failure on inner object 822 self.assertRaises(ExpectedError, gulp, s) 823 824 # keyfunc failure 825 def keyfunc(obj): 826 if keyfunc.skip > 0: 827 keyfunc.skip -= 1 828 return obj 829 else: 830 raise ExpectedError 831 832 # keyfunc failure on outer object 833 keyfunc.skip = 0 834 self.assertRaises(ExpectedError, gulp, [None], keyfunc) 835 keyfunc.skip = 1 836 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc) 837 838 def test_filter(self): 839 self.assertEqual(list(filter(isEven, range(6))), [0,2,4]) 840 self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2]) 841 self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2]) 842 self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6]) 843 self.assertRaises(TypeError, filter) 844 self.assertRaises(TypeError, filter, lambda x:x) 845 self.assertRaises(TypeError, filter, lambda x:x, range(6), 7) 846 self.assertRaises(TypeError, filter, isEven, 3) 847 self.assertRaises(TypeError, next, filter(range(6), range(6))) 848 849 # check copy, deepcopy, pickle 850 ans = [0,2,4] 851 852 c = filter(isEven, range(6)) 853 self.assertEqual(list(copy.copy(c)), ans) 854 c = filter(isEven, range(6)) 855 self.assertEqual(list(copy.deepcopy(c)), ans) 856 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 857 c = filter(isEven, range(6)) 858 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans) 859 next(c) 860 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:]) 861 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 862 c = filter(isEven, range(6)) 863 self.pickletest(proto, c) 864 865 def test_filterfalse(self): 866 self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5]) 867 self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0]) 868 self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0]) 869 self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7]) 870 self.assertRaises(TypeError, filterfalse) 871 self.assertRaises(TypeError, filterfalse, lambda x:x) 872 self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7) 873 self.assertRaises(TypeError, filterfalse, isEven, 3) 874 self.assertRaises(TypeError, next, filterfalse(range(6), range(6))) 875 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 876 self.pickletest(proto, filterfalse(isEven, range(6))) 877 878 def test_zip(self): 879 # XXX This is rather silly now that builtin zip() calls zip()... 880 ans = [(x,y) for x, y in zip('abc',count())] 881 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 882 self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6))) 883 self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3))) 884 self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3))) 885 self.assertEqual(list(zip('abcdef')), lzip('abcdef')) 886 self.assertEqual(list(zip()), lzip()) 887 self.assertRaises(TypeError, zip, 3) 888 self.assertRaises(TypeError, zip, range(3), 3) 889 self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')], 890 lzip('abc', 'def')) 891 self.assertEqual([pair for pair in zip('abc', 'def')], 892 lzip('abc', 'def')) 893 894 @support.impl_detail("tuple reuse is specific to CPython") 895 def test_zip_tuple_reuse(self): 896 ids = list(map(id, zip('abc', 'def'))) 897 self.assertEqual(min(ids), max(ids)) 898 ids = list(map(id, list(zip('abc', 'def')))) 899 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 900 901 # check copy, deepcopy, pickle 902 ans = [(x,y) for x, y in copy.copy(zip('abc',count()))] 903 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 904 905 ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))] 906 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 907 908 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 909 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))] 910 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 911 912 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 913 testIntermediate = zip('abc',count()) 914 next(testIntermediate) 915 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))] 916 self.assertEqual(ans, [('b', 1), ('c', 2)]) 917 918 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 919 self.pickletest(proto, zip('abc', count())) 920 921 def test_ziplongest(self): 922 for args in [ 923 ['abc', range(6)], 924 [range(6), 'abc'], 925 [range(1000), range(2000,2100), range(3000,3050)], 926 [range(1000), range(0), range(3000,3050), range(1200), range(1500)], 927 [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)], 928 ]: 929 target = [tuple([arg[i] if i < len(arg) else None for arg in args]) 930 for i in range(max(map(len, args)))] 931 self.assertEqual(list(zip_longest(*args)), target) 932 self.assertEqual(list(zip_longest(*args, **{})), target) 933 target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X' 934 self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target) 935 936 self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input 937 938 self.assertEqual(list(zip_longest()), list(zip())) 939 self.assertEqual(list(zip_longest([])), list(zip([]))) 940 self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef'))) 941 942 self.assertEqual(list(zip_longest('abc', 'defg', **{})), 943 list(zip(list('abc')+[None], 'defg'))) # empty keyword dict 944 self.assertRaises(TypeError, zip_longest, 3) 945 self.assertRaises(TypeError, zip_longest, range(3), 3) 946 947 for stmt in [ 948 "zip_longest('abc', fv=1)", 949 "zip_longest('abc', fillvalue=1, bogus_keyword=None)", 950 ]: 951 try: 952 eval(stmt, globals(), locals()) 953 except TypeError: 954 pass 955 else: 956 self.fail('Did not raise Type in: ' + stmt) 957 958 self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')], 959 list(zip('abc', 'def'))) 960 self.assertEqual([pair for pair in zip_longest('abc', 'def')], 961 list(zip('abc', 'def'))) 962 963 @support.impl_detail("tuple reuse is specific to CPython") 964 def test_zip_longest_tuple_reuse(self): 965 ids = list(map(id, zip_longest('abc', 'def'))) 966 self.assertEqual(min(ids), max(ids)) 967 ids = list(map(id, list(zip_longest('abc', 'def')))) 968 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 969 970 def test_zip_longest_pickling(self): 971 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 972 self.pickletest(proto, zip_longest("abc", "def")) 973 self.pickletest(proto, zip_longest("abc", "defgh")) 974 self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1)) 975 self.pickletest(proto, zip_longest("", "defgh")) 976 977 def test_zip_longest_bad_iterable(self): 978 exception = TypeError() 979 980 class BadIterable: 981 def __iter__(self): 982 raise exception 983 984 with self.assertRaises(TypeError) as cm: 985 zip_longest(BadIterable()) 986 987 self.assertIs(cm.exception, exception) 988 989 def test_bug_7244(self): 990 991 class Repeater: 992 # this class is similar to itertools.repeat 993 def __init__(self, o, t, e): 994 self.o = o 995 self.t = int(t) 996 self.e = e 997 def __iter__(self): # its iterator is itself 998 return self 999 def __next__(self): 1000 if self.t > 0: 1001 self.t -= 1 1002 return self.o 1003 else: 1004 raise self.e 1005 1006 # Formerly this code in would fail in debug mode 1007 # with Undetected Error and Stop Iteration 1008 r1 = Repeater(1, 3, StopIteration) 1009 r2 = Repeater(2, 4, StopIteration) 1010 def run(r1, r2): 1011 result = [] 1012 for i, j in zip_longest(r1, r2, fillvalue=0): 1013 with support.captured_output('stdout'): 1014 print((i, j)) 1015 result.append((i, j)) 1016 return result 1017 self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)]) 1018 1019 # Formerly, the RuntimeError would be lost 1020 # and StopIteration would stop as expected 1021 r1 = Repeater(1, 3, RuntimeError) 1022 r2 = Repeater(2, 4, StopIteration) 1023 it = zip_longest(r1, r2, fillvalue=0) 1024 self.assertEqual(next(it), (1, 2)) 1025 self.assertEqual(next(it), (1, 2)) 1026 self.assertEqual(next(it), (1, 2)) 1027 self.assertRaises(RuntimeError, next, it) 1028 1029 def test_product(self): 1030 for args, result in [ 1031 ([], [()]), # zero iterables 1032 (['ab'], [('a',), ('b',)]), # one iterable 1033 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1034 ([range(0), range(2), range(3)], []), # first iterable with zero length 1035 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1036 ([range(2), range(3), range(0)], []), # last iterable with zero length 1037 ]: 1038 self.assertEqual(list(product(*args)), result) 1039 for r in range(4): 1040 self.assertEqual(list(product(*(args*r))), 1041 list(product(*args, **dict(repeat=r)))) 1042 self.assertEqual(len(list(product(*[range(7)]*6))), 7**6) 1043 self.assertRaises(TypeError, product, range(6), None) 1044 1045 def product1(*args, **kwds): 1046 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1047 n = len(pools) 1048 if n == 0: 1049 yield () 1050 return 1051 if any(len(pool) == 0 for pool in pools): 1052 return 1053 indices = [0] * n 1054 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1055 while 1: 1056 for i in reversed(range(n)): # right to left 1057 if indices[i] == len(pools[i]) - 1: 1058 continue 1059 indices[i] += 1 1060 for j in range(i+1, n): 1061 indices[j] = 0 1062 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1063 break 1064 else: 1065 return 1066 1067 def product2(*args, **kwds): 1068 'Pure python version used in docs' 1069 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1070 result = [[]] 1071 for pool in pools: 1072 result = [x+[y] for x in result for y in pool] 1073 for prod in result: 1074 yield tuple(prod) 1075 1076 argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3), 1077 set('abcdefg'), range(11), tuple(range(13))] 1078 for i in range(100): 1079 args = [random.choice(argtypes) for j in range(random.randrange(5))] 1080 expected_len = prod(map(len, args)) 1081 self.assertEqual(len(list(product(*args))), expected_len) 1082 self.assertEqual(list(product(*args)), list(product1(*args))) 1083 self.assertEqual(list(product(*args)), list(product2(*args))) 1084 args = map(iter, args) 1085 self.assertEqual(len(list(product(*args))), expected_len) 1086 1087 @support.bigaddrspacetest 1088 def test_product_overflow(self): 1089 with self.assertRaises((OverflowError, MemoryError)): 1090 product(*(['ab']*2**5), repeat=2**25) 1091 1092 @support.impl_detail("tuple reuse is specific to CPython") 1093 def test_product_tuple_reuse(self): 1094 self.assertEqual(len(set(map(id, product('abc', 'def')))), 1) 1095 self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1) 1096 1097 def test_product_pickling(self): 1098 # check copy, deepcopy, pickle 1099 for args, result in [ 1100 ([], [()]), # zero iterables 1101 (['ab'], [('a',), ('b',)]), # one iterable 1102 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1103 ([range(0), range(2), range(3)], []), # first iterable with zero length 1104 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1105 ([range(2), range(3), range(0)], []), # last iterable with zero length 1106 ]: 1107 self.assertEqual(list(copy.copy(product(*args))), result) 1108 self.assertEqual(list(copy.deepcopy(product(*args))), result) 1109 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1110 self.pickletest(proto, product(*args)) 1111 1112 def test_product_issue_25021(self): 1113 # test that indices are properly clamped to the length of the tuples 1114 p = product((1, 2),(3,)) 1115 p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped 1116 self.assertEqual(next(p), (2, 3)) 1117 # test that empty tuple in the list will result in an immediate StopIteration 1118 p = product((1, 2), (), (3,)) 1119 p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped 1120 self.assertRaises(StopIteration, next, p) 1121 1122 def test_repeat(self): 1123 self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a']) 1124 self.assertEqual(lzip(range(3),repeat('a')), 1125 [(0, 'a'), (1, 'a'), (2, 'a')]) 1126 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a']) 1127 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a']) 1128 self.assertEqual(list(repeat('a', 0)), []) 1129 self.assertEqual(list(repeat('a', -3)), []) 1130 self.assertRaises(TypeError, repeat) 1131 self.assertRaises(TypeError, repeat, None, 3, 4) 1132 self.assertRaises(TypeError, repeat, None, 'a') 1133 r = repeat(1+0j) 1134 self.assertEqual(repr(r), 'repeat((1+0j))') 1135 r = repeat(1+0j, 5) 1136 self.assertEqual(repr(r), 'repeat((1+0j), 5)') 1137 list(r) 1138 self.assertEqual(repr(r), 'repeat((1+0j), 0)') 1139 1140 # check copy, deepcopy, pickle 1141 c = repeat(object='a', times=10) 1142 self.assertEqual(next(c), 'a') 1143 self.assertEqual(take(2, copy.copy(c)), list('a' * 2)) 1144 self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2)) 1145 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1146 self.pickletest(proto, repeat(object='a', times=10)) 1147 1148 def test_repeat_with_negative_times(self): 1149 self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)") 1150 self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)") 1151 self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)") 1152 self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)") 1153 1154 def test_map(self): 1155 self.assertEqual(list(map(operator.pow, range(3), range(1,7))), 1156 [0**1, 1**2, 2**3]) 1157 self.assertEqual(list(map(tupleize, 'abc', range(5))), 1158 [('a',0),('b',1),('c',2)]) 1159 self.assertEqual(list(map(tupleize, 'abc', count())), 1160 [('a',0),('b',1),('c',2)]) 1161 self.assertEqual(take(2,map(tupleize, 'abc', count())), 1162 [('a',0),('b',1)]) 1163 self.assertEqual(list(map(operator.pow, [])), []) 1164 self.assertRaises(TypeError, map) 1165 self.assertRaises(TypeError, list, map(None, range(3), range(3))) 1166 self.assertRaises(TypeError, map, operator.neg) 1167 self.assertRaises(TypeError, next, map(10, range(5))) 1168 self.assertRaises(ValueError, next, map(errfunc, [4], [5])) 1169 self.assertRaises(TypeError, next, map(onearg, [4], [5])) 1170 1171 # check copy, deepcopy, pickle 1172 ans = [('a',0),('b',1),('c',2)] 1173 1174 c = map(tupleize, 'abc', count()) 1175 self.assertEqual(list(copy.copy(c)), ans) 1176 1177 c = map(tupleize, 'abc', count()) 1178 self.assertEqual(list(copy.deepcopy(c)), ans) 1179 1180 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1181 c = map(tupleize, 'abc', count()) 1182 self.pickletest(proto, c) 1183 1184 def test_starmap(self): 1185 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))), 1186 [0**1, 1**2, 2**3]) 1187 self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))), 1188 [0**1, 1**2, 2**3]) 1189 self.assertEqual(list(starmap(operator.pow, [])), []) 1190 self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5]) 1191 self.assertRaises(TypeError, list, starmap(operator.pow, [None])) 1192 self.assertRaises(TypeError, starmap) 1193 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra') 1194 self.assertRaises(TypeError, next, starmap(10, [(4,5)])) 1195 self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)])) 1196 self.assertRaises(TypeError, next, starmap(onearg, [(4,5)])) 1197 1198 # check copy, deepcopy, pickle 1199 ans = [0**1, 1**2, 2**3] 1200 1201 c = starmap(operator.pow, zip(range(3), range(1,7))) 1202 self.assertEqual(list(copy.copy(c)), ans) 1203 1204 c = starmap(operator.pow, zip(range(3), range(1,7))) 1205 self.assertEqual(list(copy.deepcopy(c)), ans) 1206 1207 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1208 c = starmap(operator.pow, zip(range(3), range(1,7))) 1209 self.pickletest(proto, c) 1210 1211 def test_islice(self): 1212 for args in [ # islice(args) should agree with range(args) 1213 (10, 20, 3), 1214 (10, 3, 20), 1215 (10, 20), 1216 (10, 10), 1217 (10, 3), 1218 (20,) 1219 ]: 1220 self.assertEqual(list(islice(range(100), *args)), 1221 list(range(*args))) 1222 1223 for args, tgtargs in [ # Stop when seqn is exhausted 1224 ((10, 110, 3), ((10, 100, 3))), 1225 ((10, 110), ((10, 100))), 1226 ((110,), (100,)) 1227 ]: 1228 self.assertEqual(list(islice(range(100), *args)), 1229 list(range(*tgtargs))) 1230 1231 # Test stop=None 1232 self.assertEqual(list(islice(range(10), None)), list(range(10))) 1233 self.assertEqual(list(islice(range(10), None, None)), list(range(10))) 1234 self.assertEqual(list(islice(range(10), None, None, None)), list(range(10))) 1235 self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10))) 1236 self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2))) 1237 1238 # Test number of items consumed SF #1171417 1239 it = iter(range(10)) 1240 self.assertEqual(list(islice(it, 3)), list(range(3))) 1241 self.assertEqual(list(it), list(range(3, 10))) 1242 1243 it = iter(range(10)) 1244 self.assertEqual(list(islice(it, 3, 3)), []) 1245 self.assertEqual(list(it), list(range(3, 10))) 1246 1247 # Test invalid arguments 1248 ra = range(10) 1249 self.assertRaises(TypeError, islice, ra) 1250 self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4) 1251 self.assertRaises(ValueError, islice, ra, -5, 10, 1) 1252 self.assertRaises(ValueError, islice, ra, 1, -5, -1) 1253 self.assertRaises(ValueError, islice, ra, 1, 10, -1) 1254 self.assertRaises(ValueError, islice, ra, 1, 10, 0) 1255 self.assertRaises(ValueError, islice, ra, 'a') 1256 self.assertRaises(ValueError, islice, ra, 'a', 1) 1257 self.assertRaises(ValueError, islice, ra, 1, 'a') 1258 self.assertRaises(ValueError, islice, ra, 'a', 1, 1) 1259 self.assertRaises(ValueError, islice, ra, 1, 'a', 1) 1260 self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1) 1261 1262 # Issue #10323: Less islice in a predictable state 1263 c = count() 1264 self.assertEqual(list(islice(c, 1, 3, 50)), [1]) 1265 self.assertEqual(next(c), 3) 1266 1267 # check copy, deepcopy, pickle 1268 for args in [ # islice(args) should agree with range(args) 1269 (10, 20, 3), 1270 (10, 3, 20), 1271 (10, 20), 1272 (10, 3), 1273 (20,) 1274 ]: 1275 self.assertEqual(list(copy.copy(islice(range(100), *args))), 1276 list(range(*args))) 1277 self.assertEqual(list(copy.deepcopy(islice(range(100), *args))), 1278 list(range(*args))) 1279 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1280 self.pickletest(proto, islice(range(100), *args)) 1281 1282 # Issue #21321: check source iterator is not referenced 1283 # from islice() after the latter has been exhausted 1284 it = (x for x in (1, 2)) 1285 wr = weakref.ref(it) 1286 it = islice(it, 1) 1287 self.assertIsNotNone(wr()) 1288 list(it) # exhaust the iterator 1289 support.gc_collect() 1290 self.assertIsNone(wr()) 1291 1292 # Issue #30537: islice can accept integer-like objects as 1293 # arguments 1294 class IntLike(object): 1295 def __init__(self, val): 1296 self.val = val 1297 def __index__(self): 1298 return self.val 1299 self.assertEqual(list(islice(range(100), IntLike(10))), list(range(10))) 1300 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50))), 1301 list(range(10, 50))) 1302 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))), 1303 list(range(10,50,5))) 1304 1305 def test_takewhile(self): 1306 data = [1, 3, 5, 20, 2, 4, 6, 8] 1307 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5]) 1308 self.assertEqual(list(takewhile(underten, [])), []) 1309 self.assertRaises(TypeError, takewhile) 1310 self.assertRaises(TypeError, takewhile, operator.pow) 1311 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra') 1312 self.assertRaises(TypeError, next, takewhile(10, [(4,5)])) 1313 self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)])) 1314 t = takewhile(bool, [1, 1, 1, 0, 0, 0]) 1315 self.assertEqual(list(t), [1, 1, 1]) 1316 self.assertRaises(StopIteration, next, t) 1317 1318 # check copy, deepcopy, pickle 1319 self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5]) 1320 self.assertEqual(list(copy.deepcopy(takewhile(underten, data))), 1321 [1, 3, 5]) 1322 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1323 self.pickletest(proto, takewhile(underten, data)) 1324 1325 def test_dropwhile(self): 1326 data = [1, 3, 5, 20, 2, 4, 6, 8] 1327 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8]) 1328 self.assertEqual(list(dropwhile(underten, [])), []) 1329 self.assertRaises(TypeError, dropwhile) 1330 self.assertRaises(TypeError, dropwhile, operator.pow) 1331 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra') 1332 self.assertRaises(TypeError, next, dropwhile(10, [(4,5)])) 1333 self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)])) 1334 1335 # check copy, deepcopy, pickle 1336 self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8]) 1337 self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))), 1338 [20, 2, 4, 6, 8]) 1339 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1340 self.pickletest(proto, dropwhile(underten, data)) 1341 1342 def test_tee(self): 1343 n = 200 1344 1345 a, b = tee([]) # test empty iterator 1346 self.assertEqual(list(a), []) 1347 self.assertEqual(list(b), []) 1348 1349 a, b = tee(irange(n)) # test 100% interleaved 1350 self.assertEqual(lzip(a,b), lzip(range(n), range(n))) 1351 1352 a, b = tee(irange(n)) # test 0% interleaved 1353 self.assertEqual(list(a), list(range(n))) 1354 self.assertEqual(list(b), list(range(n))) 1355 1356 a, b = tee(irange(n)) # test dealloc of leading iterator 1357 for i in range(100): 1358 self.assertEqual(next(a), i) 1359 del a 1360 self.assertEqual(list(b), list(range(n))) 1361 1362 a, b = tee(irange(n)) # test dealloc of trailing iterator 1363 for i in range(100): 1364 self.assertEqual(next(a), i) 1365 del b 1366 self.assertEqual(list(a), list(range(100, n))) 1367 1368 for j in range(5): # test randomly interleaved 1369 order = [0]*n + [1]*n 1370 random.shuffle(order) 1371 lists = ([], []) 1372 its = tee(irange(n)) 1373 for i in order: 1374 value = next(its[i]) 1375 lists[i].append(value) 1376 self.assertEqual(lists[0], list(range(n))) 1377 self.assertEqual(lists[1], list(range(n))) 1378 1379 # test argument format checking 1380 self.assertRaises(TypeError, tee) 1381 self.assertRaises(TypeError, tee, 3) 1382 self.assertRaises(TypeError, tee, [1,2], 'x') 1383 self.assertRaises(TypeError, tee, [1,2], 3, 'x') 1384 1385 # tee object should be instantiable 1386 a, b = tee('abc') 1387 c = type(a)('def') 1388 self.assertEqual(list(c), list('def')) 1389 1390 # test long-lagged and multi-way split 1391 a, b, c = tee(range(2000), 3) 1392 for i in range(100): 1393 self.assertEqual(next(a), i) 1394 self.assertEqual(list(b), list(range(2000))) 1395 self.assertEqual([next(c), next(c)], list(range(2))) 1396 self.assertEqual(list(a), list(range(100,2000))) 1397 self.assertEqual(list(c), list(range(2,2000))) 1398 1399 # test values of n 1400 self.assertRaises(TypeError, tee, 'abc', 'invalid') 1401 self.assertRaises(ValueError, tee, [], -1) 1402 for n in range(5): 1403 result = tee('abc', n) 1404 self.assertEqual(type(result), tuple) 1405 self.assertEqual(len(result), n) 1406 self.assertEqual([list(x) for x in result], [list('abc')]*n) 1407 1408 # tee pass-through to copyable iterator 1409 a, b = tee('abc') 1410 c, d = tee(a) 1411 self.assertTrue(a is c) 1412 1413 # test tee_new 1414 t1, t2 = tee('abc') 1415 tnew = type(t1) 1416 self.assertRaises(TypeError, tnew) 1417 self.assertRaises(TypeError, tnew, 10) 1418 t3 = tnew(t1) 1419 self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc')) 1420 1421 # test that tee objects are weak referencable 1422 a, b = tee(range(10)) 1423 p = weakref.proxy(a) 1424 self.assertEqual(getattr(p, '__class__'), type(b)) 1425 del a 1426 self.assertRaises(ReferenceError, getattr, p, '__class__') 1427 1428 ans = list('abc') 1429 long_ans = list(range(10000)) 1430 1431 # check copy 1432 a, b = tee('abc') 1433 self.assertEqual(list(copy.copy(a)), ans) 1434 self.assertEqual(list(copy.copy(b)), ans) 1435 a, b = tee(list(range(10000))) 1436 self.assertEqual(list(copy.copy(a)), long_ans) 1437 self.assertEqual(list(copy.copy(b)), long_ans) 1438 1439 # check partially consumed copy 1440 a, b = tee('abc') 1441 take(2, a) 1442 take(1, b) 1443 self.assertEqual(list(copy.copy(a)), ans[2:]) 1444 self.assertEqual(list(copy.copy(b)), ans[1:]) 1445 self.assertEqual(list(a), ans[2:]) 1446 self.assertEqual(list(b), ans[1:]) 1447 a, b = tee(range(10000)) 1448 take(100, a) 1449 take(60, b) 1450 self.assertEqual(list(copy.copy(a)), long_ans[100:]) 1451 self.assertEqual(list(copy.copy(b)), long_ans[60:]) 1452 self.assertEqual(list(a), long_ans[100:]) 1453 self.assertEqual(list(b), long_ans[60:]) 1454 1455 # check deepcopy 1456 a, b = tee('abc') 1457 self.assertEqual(list(copy.deepcopy(a)), ans) 1458 self.assertEqual(list(copy.deepcopy(b)), ans) 1459 self.assertEqual(list(a), ans) 1460 self.assertEqual(list(b), ans) 1461 a, b = tee(range(10000)) 1462 self.assertEqual(list(copy.deepcopy(a)), long_ans) 1463 self.assertEqual(list(copy.deepcopy(b)), long_ans) 1464 self.assertEqual(list(a), long_ans) 1465 self.assertEqual(list(b), long_ans) 1466 1467 # check partially consumed deepcopy 1468 a, b = tee('abc') 1469 take(2, a) 1470 take(1, b) 1471 self.assertEqual(list(copy.deepcopy(a)), ans[2:]) 1472 self.assertEqual(list(copy.deepcopy(b)), ans[1:]) 1473 self.assertEqual(list(a), ans[2:]) 1474 self.assertEqual(list(b), ans[1:]) 1475 a, b = tee(range(10000)) 1476 take(100, a) 1477 take(60, b) 1478 self.assertEqual(list(copy.deepcopy(a)), long_ans[100:]) 1479 self.assertEqual(list(copy.deepcopy(b)), long_ans[60:]) 1480 self.assertEqual(list(a), long_ans[100:]) 1481 self.assertEqual(list(b), long_ans[60:]) 1482 1483 # check pickle 1484 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1485 self.pickletest(proto, iter(tee('abc'))) 1486 a, b = tee('abc') 1487 self.pickletest(proto, a, compare=ans) 1488 self.pickletest(proto, b, compare=ans) 1489 1490 # Issue 13454: Crash when deleting backward iterator from tee() 1491 def test_tee_del_backward(self): 1492 forward, backward = tee(repeat(None, 20000000)) 1493 try: 1494 any(forward) # exhaust the iterator 1495 del backward 1496 except: 1497 del forward, backward 1498 raise 1499 1500 def test_tee_reenter(self): 1501 class I: 1502 first = True 1503 def __iter__(self): 1504 return self 1505 def __next__(self): 1506 first = self.first 1507 self.first = False 1508 if first: 1509 return next(b) 1510 1511 a, b = tee(I()) 1512 with self.assertRaisesRegex(RuntimeError, "tee"): 1513 next(a) 1514 1515 def test_tee_concurrent(self): 1516 start = threading.Event() 1517 finish = threading.Event() 1518 class I: 1519 def __iter__(self): 1520 return self 1521 def __next__(self): 1522 start.set() 1523 finish.wait() 1524 1525 a, b = tee(I()) 1526 thread = threading.Thread(target=next, args=[a]) 1527 thread.start() 1528 try: 1529 start.wait() 1530 with self.assertRaisesRegex(RuntimeError, "tee"): 1531 next(b) 1532 finally: 1533 finish.set() 1534 thread.join() 1535 1536 def test_StopIteration(self): 1537 self.assertRaises(StopIteration, next, zip()) 1538 1539 for f in (chain, cycle, zip, groupby): 1540 self.assertRaises(StopIteration, next, f([])) 1541 self.assertRaises(StopIteration, next, f(StopNow())) 1542 1543 self.assertRaises(StopIteration, next, islice([], None)) 1544 self.assertRaises(StopIteration, next, islice(StopNow(), None)) 1545 1546 p, q = tee([]) 1547 self.assertRaises(StopIteration, next, p) 1548 self.assertRaises(StopIteration, next, q) 1549 p, q = tee(StopNow()) 1550 self.assertRaises(StopIteration, next, p) 1551 self.assertRaises(StopIteration, next, q) 1552 1553 self.assertRaises(StopIteration, next, repeat(None, 0)) 1554 1555 for f in (filter, filterfalse, map, takewhile, dropwhile, starmap): 1556 self.assertRaises(StopIteration, next, f(lambda x:x, [])) 1557 self.assertRaises(StopIteration, next, f(lambda x:x, StopNow())) 1558 1559 @support.cpython_only 1560 def test_combinations_result_gc(self): 1561 # bpo-42536: combinations's tuple-reuse speed trick breaks the GC's 1562 # assumptions about what can be untracked. Make sure we re-track result 1563 # tuples whenever we reuse them. 1564 it = combinations([None, []], 1) 1565 next(it) 1566 gc.collect() 1567 # That GC collection probably untracked the recycled internal result 1568 # tuple, which has the value (None,). Make sure it's re-tracked when 1569 # it's mutated and returned from __next__: 1570 self.assertTrue(gc.is_tracked(next(it))) 1571 1572 @support.cpython_only 1573 def test_combinations_with_replacement_result_gc(self): 1574 # Ditto for combinations_with_replacement. 1575 it = combinations_with_replacement([None, []], 1) 1576 next(it) 1577 gc.collect() 1578 self.assertTrue(gc.is_tracked(next(it))) 1579 1580 @support.cpython_only 1581 def test_permutations_result_gc(self): 1582 # Ditto for permutations. 1583 it = permutations([None, []], 1) 1584 next(it) 1585 gc.collect() 1586 self.assertTrue(gc.is_tracked(next(it))) 1587 1588 @support.cpython_only 1589 def test_product_result_gc(self): 1590 # Ditto for product. 1591 it = product([None, []]) 1592 next(it) 1593 gc.collect() 1594 self.assertTrue(gc.is_tracked(next(it))) 1595 1596 @support.cpython_only 1597 def test_zip_longest_result_gc(self): 1598 # Ditto for zip_longest. 1599 it = zip_longest([[]]) 1600 gc.collect() 1601 self.assertTrue(gc.is_tracked(next(it))) 1602 1603 1604class TestExamples(unittest.TestCase): 1605 1606 def test_accumulate(self): 1607 self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15]) 1608 1609 def test_accumulate_reducible(self): 1610 # check copy, deepcopy, pickle 1611 data = [1, 2, 3, 4, 5] 1612 accumulated = [1, 3, 6, 10, 15] 1613 1614 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1615 it = accumulate(data) 1616 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:]) 1617 self.assertEqual(next(it), 1) 1618 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:]) 1619 it = accumulate(data) 1620 self.assertEqual(next(it), 1) 1621 self.assertEqual(list(copy.deepcopy(it)), accumulated[1:]) 1622 self.assertEqual(list(copy.copy(it)), accumulated[1:]) 1623 1624 def test_accumulate_reducible_none(self): 1625 # Issue #25718: total is None 1626 it = accumulate([None, None, None], operator.is_) 1627 self.assertEqual(next(it), None) 1628 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1629 it_copy = pickle.loads(pickle.dumps(it, proto)) 1630 self.assertEqual(list(it_copy), [True, False]) 1631 self.assertEqual(list(copy.deepcopy(it)), [True, False]) 1632 self.assertEqual(list(copy.copy(it)), [True, False]) 1633 1634 def test_chain(self): 1635 self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF') 1636 1637 def test_chain_from_iterable(self): 1638 self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF') 1639 1640 def test_combinations(self): 1641 self.assertEqual(list(combinations('ABCD', 2)), 1642 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 1643 self.assertEqual(list(combinations(range(4), 3)), 1644 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 1645 1646 def test_combinations_with_replacement(self): 1647 self.assertEqual(list(combinations_with_replacement('ABC', 2)), 1648 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 1649 1650 def test_compress(self): 1651 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 1652 1653 def test_count(self): 1654 self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14]) 1655 1656 def test_cycle(self): 1657 self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD')) 1658 1659 def test_dropwhile(self): 1660 self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1]) 1661 1662 def test_groupby(self): 1663 self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')], 1664 list('ABCDAB')) 1665 self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')], 1666 [list('AAAA'), list('BBB'), list('CC'), list('D')]) 1667 1668 def test_filter(self): 1669 self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9]) 1670 1671 def test_filterfalse(self): 1672 self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8]) 1673 1674 def test_map(self): 1675 self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000]) 1676 1677 def test_islice(self): 1678 self.assertEqual(list(islice('ABCDEFG', 2)), list('AB')) 1679 self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD')) 1680 self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG')) 1681 self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1682 1683 def test_zip(self): 1684 self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')]) 1685 1686 def test_zip_longest(self): 1687 self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')), 1688 [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')]) 1689 1690 def test_permutations(self): 1691 self.assertEqual(list(permutations('ABCD', 2)), 1692 list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split()))) 1693 self.assertEqual(list(permutations(range(3))), 1694 [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)]) 1695 1696 def test_product(self): 1697 self.assertEqual(list(product('ABCD', 'xy')), 1698 list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split()))) 1699 self.assertEqual(list(product(range(2), repeat=3)), 1700 [(0,0,0), (0,0,1), (0,1,0), (0,1,1), 1701 (1,0,0), (1,0,1), (1,1,0), (1,1,1)]) 1702 1703 def test_repeat(self): 1704 self.assertEqual(list(repeat(10, 3)), [10, 10, 10]) 1705 1706 def test_stapmap(self): 1707 self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])), 1708 [32, 9, 1000]) 1709 1710 def test_takewhile(self): 1711 self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4]) 1712 1713 1714class TestPurePythonRoughEquivalents(unittest.TestCase): 1715 1716 @staticmethod 1717 def islice(iterable, *args): 1718 s = slice(*args) 1719 start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1 1720 it = iter(range(start, stop, step)) 1721 try: 1722 nexti = next(it) 1723 except StopIteration: 1724 # Consume *iterable* up to the *start* position. 1725 for i, element in zip(range(start), iterable): 1726 pass 1727 return 1728 try: 1729 for i, element in enumerate(iterable): 1730 if i == nexti: 1731 yield element 1732 nexti = next(it) 1733 except StopIteration: 1734 # Consume to *stop*. 1735 for i, element in zip(range(i + 1, stop), iterable): 1736 pass 1737 1738 def test_islice_recipe(self): 1739 self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB')) 1740 self.assertEqual(list(self.islice('ABCDEFG', 2, 4)), list('CD')) 1741 self.assertEqual(list(self.islice('ABCDEFG', 2, None)), list('CDEFG')) 1742 self.assertEqual(list(self.islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1743 # Test items consumed. 1744 it = iter(range(10)) 1745 self.assertEqual(list(self.islice(it, 3)), list(range(3))) 1746 self.assertEqual(list(it), list(range(3, 10))) 1747 it = iter(range(10)) 1748 self.assertEqual(list(self.islice(it, 3, 3)), []) 1749 self.assertEqual(list(it), list(range(3, 10))) 1750 # Test that slice finishes in predictable state. 1751 c = count() 1752 self.assertEqual(list(self.islice(c, 1, 3, 50)), [1]) 1753 self.assertEqual(next(c), 3) 1754 1755 1756class TestGC(unittest.TestCase): 1757 1758 def makecycle(self, iterator, container): 1759 container.append(iterator) 1760 next(iterator) 1761 del container, iterator 1762 1763 def test_accumulate(self): 1764 a = [] 1765 self.makecycle(accumulate([1,2,a,3]), a) 1766 1767 def test_chain(self): 1768 a = [] 1769 self.makecycle(chain(a), a) 1770 1771 def test_chain_from_iterable(self): 1772 a = [] 1773 self.makecycle(chain.from_iterable([a]), a) 1774 1775 def test_combinations(self): 1776 a = [] 1777 self.makecycle(combinations([1,2,a,3], 3), a) 1778 1779 def test_combinations_with_replacement(self): 1780 a = [] 1781 self.makecycle(combinations_with_replacement([1,2,a,3], 3), a) 1782 1783 def test_compress(self): 1784 a = [] 1785 self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a) 1786 1787 def test_count(self): 1788 a = [] 1789 Int = type('Int', (int,), dict(x=a)) 1790 self.makecycle(count(Int(0), Int(1)), a) 1791 1792 def test_cycle(self): 1793 a = [] 1794 self.makecycle(cycle([a]*2), a) 1795 1796 def test_dropwhile(self): 1797 a = [] 1798 self.makecycle(dropwhile(bool, [0, a, a]), a) 1799 1800 def test_groupby(self): 1801 a = [] 1802 self.makecycle(groupby([a]*2, lambda x:x), a) 1803 1804 def test_issue2246(self): 1805 # Issue 2246 -- the _grouper iterator was not included in GC 1806 n = 10 1807 keyfunc = lambda x: x 1808 for i, j in groupby(range(n), key=keyfunc): 1809 keyfunc.__dict__.setdefault('x',[]).append(j) 1810 1811 def test_filter(self): 1812 a = [] 1813 self.makecycle(filter(lambda x:True, [a]*2), a) 1814 1815 def test_filterfalse(self): 1816 a = [] 1817 self.makecycle(filterfalse(lambda x:False, a), a) 1818 1819 def test_zip(self): 1820 a = [] 1821 self.makecycle(zip([a]*2, [a]*3), a) 1822 1823 def test_zip_longest(self): 1824 a = [] 1825 self.makecycle(zip_longest([a]*2, [a]*3), a) 1826 b = [a, None] 1827 self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a) 1828 1829 def test_map(self): 1830 a = [] 1831 self.makecycle(map(lambda x:x, [a]*2), a) 1832 1833 def test_islice(self): 1834 a = [] 1835 self.makecycle(islice([a]*2, None), a) 1836 1837 def test_permutations(self): 1838 a = [] 1839 self.makecycle(permutations([1,2,a,3], 3), a) 1840 1841 def test_product(self): 1842 a = [] 1843 self.makecycle(product([1,2,a,3], repeat=3), a) 1844 1845 def test_repeat(self): 1846 a = [] 1847 self.makecycle(repeat(a), a) 1848 1849 def test_starmap(self): 1850 a = [] 1851 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a) 1852 1853 def test_takewhile(self): 1854 a = [] 1855 self.makecycle(takewhile(bool, [1, 0, a, a]), a) 1856 1857def R(seqn): 1858 'Regular generator' 1859 for i in seqn: 1860 yield i 1861 1862class G: 1863 'Sequence using __getitem__' 1864 def __init__(self, seqn): 1865 self.seqn = seqn 1866 def __getitem__(self, i): 1867 return self.seqn[i] 1868 1869class I: 1870 'Sequence using iterator protocol' 1871 def __init__(self, seqn): 1872 self.seqn = seqn 1873 self.i = 0 1874 def __iter__(self): 1875 return self 1876 def __next__(self): 1877 if self.i >= len(self.seqn): raise StopIteration 1878 v = self.seqn[self.i] 1879 self.i += 1 1880 return v 1881 1882class Ig: 1883 'Sequence using iterator protocol defined with a generator' 1884 def __init__(self, seqn): 1885 self.seqn = seqn 1886 self.i = 0 1887 def __iter__(self): 1888 for val in self.seqn: 1889 yield val 1890 1891class X: 1892 'Missing __getitem__ and __iter__' 1893 def __init__(self, seqn): 1894 self.seqn = seqn 1895 self.i = 0 1896 def __next__(self): 1897 if self.i >= len(self.seqn): raise StopIteration 1898 v = self.seqn[self.i] 1899 self.i += 1 1900 return v 1901 1902class N: 1903 'Iterator missing __next__()' 1904 def __init__(self, seqn): 1905 self.seqn = seqn 1906 self.i = 0 1907 def __iter__(self): 1908 return self 1909 1910class E: 1911 'Test propagation of exceptions' 1912 def __init__(self, seqn): 1913 self.seqn = seqn 1914 self.i = 0 1915 def __iter__(self): 1916 return self 1917 def __next__(self): 1918 3 // 0 1919 1920class S: 1921 'Test immediate stop' 1922 def __init__(self, seqn): 1923 pass 1924 def __iter__(self): 1925 return self 1926 def __next__(self): 1927 raise StopIteration 1928 1929def L(seqn): 1930 'Test multiple tiers of iterators' 1931 return chain(map(lambda x:x, R(Ig(G(seqn))))) 1932 1933 1934class TestVariousIteratorArgs(unittest.TestCase): 1935 1936 def test_accumulate(self): 1937 s = [1,2,3,4,5] 1938 r = [1,3,6,10,15] 1939 n = len(s) 1940 for g in (G, I, Ig, L, R): 1941 self.assertEqual(list(accumulate(g(s))), r) 1942 self.assertEqual(list(accumulate(S(s))), []) 1943 self.assertRaises(TypeError, accumulate, X(s)) 1944 self.assertRaises(TypeError, accumulate, N(s)) 1945 self.assertRaises(ZeroDivisionError, list, accumulate(E(s))) 1946 1947 def test_chain(self): 1948 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1949 for g in (G, I, Ig, S, L, R): 1950 self.assertEqual(list(chain(g(s))), list(g(s))) 1951 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s))) 1952 self.assertRaises(TypeError, list, chain(X(s))) 1953 self.assertRaises(TypeError, list, chain(N(s))) 1954 self.assertRaises(ZeroDivisionError, list, chain(E(s))) 1955 1956 def test_compress(self): 1957 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1958 n = len(s) 1959 for g in (G, I, Ig, S, L, R): 1960 self.assertEqual(list(compress(g(s), repeat(1))), list(g(s))) 1961 self.assertRaises(TypeError, compress, X(s), repeat(1)) 1962 self.assertRaises(TypeError, compress, N(s), repeat(1)) 1963 self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1))) 1964 1965 def test_product(self): 1966 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1967 self.assertRaises(TypeError, product, X(s)) 1968 self.assertRaises(TypeError, product, N(s)) 1969 self.assertRaises(ZeroDivisionError, product, E(s)) 1970 1971 def test_cycle(self): 1972 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1973 for g in (G, I, Ig, S, L, R): 1974 tgtlen = len(s) * 3 1975 expected = list(g(s))*3 1976 actual = list(islice(cycle(g(s)), tgtlen)) 1977 self.assertEqual(actual, expected) 1978 self.assertRaises(TypeError, cycle, X(s)) 1979 self.assertRaises(TypeError, cycle, N(s)) 1980 self.assertRaises(ZeroDivisionError, list, cycle(E(s))) 1981 1982 def test_groupby(self): 1983 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 1984 for g in (G, I, Ig, S, L, R): 1985 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s))) 1986 self.assertRaises(TypeError, groupby, X(s)) 1987 self.assertRaises(TypeError, groupby, N(s)) 1988 self.assertRaises(ZeroDivisionError, list, groupby(E(s))) 1989 1990 def test_filter(self): 1991 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 1992 for g in (G, I, Ig, S, L, R): 1993 self.assertEqual(list(filter(isEven, g(s))), 1994 [x for x in g(s) if isEven(x)]) 1995 self.assertRaises(TypeError, filter, isEven, X(s)) 1996 self.assertRaises(TypeError, filter, isEven, N(s)) 1997 self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s))) 1998 1999 def test_filterfalse(self): 2000 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2001 for g in (G, I, Ig, S, L, R): 2002 self.assertEqual(list(filterfalse(isEven, g(s))), 2003 [x for x in g(s) if isOdd(x)]) 2004 self.assertRaises(TypeError, filterfalse, isEven, X(s)) 2005 self.assertRaises(TypeError, filterfalse, isEven, N(s)) 2006 self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s))) 2007 2008 def test_zip(self): 2009 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2010 for g in (G, I, Ig, S, L, R): 2011 self.assertEqual(list(zip(g(s))), lzip(g(s))) 2012 self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s))) 2013 self.assertRaises(TypeError, zip, X(s)) 2014 self.assertRaises(TypeError, zip, N(s)) 2015 self.assertRaises(ZeroDivisionError, list, zip(E(s))) 2016 2017 def test_ziplongest(self): 2018 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2019 for g in (G, I, Ig, S, L, R): 2020 self.assertEqual(list(zip_longest(g(s))), list(zip(g(s)))) 2021 self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s)))) 2022 self.assertRaises(TypeError, zip_longest, X(s)) 2023 self.assertRaises(TypeError, zip_longest, N(s)) 2024 self.assertRaises(ZeroDivisionError, list, zip_longest(E(s))) 2025 2026 def test_map(self): 2027 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2028 for g in (G, I, Ig, S, L, R): 2029 self.assertEqual(list(map(onearg, g(s))), 2030 [onearg(x) for x in g(s)]) 2031 self.assertEqual(list(map(operator.pow, g(s), g(s))), 2032 [x**x for x in g(s)]) 2033 self.assertRaises(TypeError, map, onearg, X(s)) 2034 self.assertRaises(TypeError, map, onearg, N(s)) 2035 self.assertRaises(ZeroDivisionError, list, map(onearg, E(s))) 2036 2037 def test_islice(self): 2038 for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2039 for g in (G, I, Ig, S, L, R): 2040 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2]) 2041 self.assertRaises(TypeError, islice, X(s), 10) 2042 self.assertRaises(TypeError, islice, N(s), 10) 2043 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10)) 2044 2045 def test_starmap(self): 2046 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2047 for g in (G, I, Ig, S, L, R): 2048 ss = lzip(s, s) 2049 self.assertEqual(list(starmap(operator.pow, g(ss))), 2050 [x**x for x in g(s)]) 2051 self.assertRaises(TypeError, starmap, operator.pow, X(ss)) 2052 self.assertRaises(TypeError, starmap, operator.pow, N(ss)) 2053 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss))) 2054 2055 def test_takewhile(self): 2056 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2057 for g in (G, I, Ig, S, L, R): 2058 tgt = [] 2059 for elem in g(s): 2060 if not isEven(elem): break 2061 tgt.append(elem) 2062 self.assertEqual(list(takewhile(isEven, g(s))), tgt) 2063 self.assertRaises(TypeError, takewhile, isEven, X(s)) 2064 self.assertRaises(TypeError, takewhile, isEven, N(s)) 2065 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s))) 2066 2067 def test_dropwhile(self): 2068 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2069 for g in (G, I, Ig, S, L, R): 2070 tgt = [] 2071 for elem in g(s): 2072 if not tgt and isOdd(elem): continue 2073 tgt.append(elem) 2074 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt) 2075 self.assertRaises(TypeError, dropwhile, isOdd, X(s)) 2076 self.assertRaises(TypeError, dropwhile, isOdd, N(s)) 2077 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s))) 2078 2079 def test_tee(self): 2080 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2081 for g in (G, I, Ig, S, L, R): 2082 it1, it2 = tee(g(s)) 2083 self.assertEqual(list(it1), list(g(s))) 2084 self.assertEqual(list(it2), list(g(s))) 2085 self.assertRaises(TypeError, tee, X(s)) 2086 self.assertRaises(TypeError, tee, N(s)) 2087 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0]) 2088 2089class LengthTransparency(unittest.TestCase): 2090 2091 def test_repeat(self): 2092 self.assertEqual(operator.length_hint(repeat(None, 50)), 50) 2093 self.assertEqual(operator.length_hint(repeat(None, 0)), 0) 2094 self.assertEqual(operator.length_hint(repeat(None), 12), 12) 2095 2096 def test_repeat_with_negative_times(self): 2097 self.assertEqual(operator.length_hint(repeat(None, -1)), 0) 2098 self.assertEqual(operator.length_hint(repeat(None, -2)), 0) 2099 self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0) 2100 self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0) 2101 2102class RegressionTests(unittest.TestCase): 2103 2104 def test_sf_793826(self): 2105 # Fix Armin Rigo's successful efforts to wreak havoc 2106 2107 def mutatingtuple(tuple1, f, tuple2): 2108 # this builds a tuple t which is a copy of tuple1, 2109 # then calls f(t), then mutates t to be equal to tuple2 2110 # (needs len(tuple1) == len(tuple2)). 2111 def g(value, first=[1]): 2112 if first: 2113 del first[:] 2114 f(next(z)) 2115 return value 2116 items = list(tuple2) 2117 items[1:1] = list(tuple1) 2118 gen = map(g, items) 2119 z = zip(*[gen]*len(tuple1)) 2120 next(z) 2121 2122 def f(t): 2123 global T 2124 T = t 2125 first[:] = list(T) 2126 2127 first = [] 2128 mutatingtuple((1,2,3), f, (4,5,6)) 2129 second = list(T) 2130 self.assertEqual(first, second) 2131 2132 2133 def test_sf_950057(self): 2134 # Make sure that chain() and cycle() catch exceptions immediately 2135 # rather than when shifting between input sources 2136 2137 def gen1(): 2138 hist.append(0) 2139 yield 1 2140 hist.append(1) 2141 raise AssertionError 2142 hist.append(2) 2143 2144 def gen2(x): 2145 hist.append(3) 2146 yield 2 2147 hist.append(4) 2148 2149 hist = [] 2150 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False))) 2151 self.assertEqual(hist, [0,1]) 2152 2153 hist = [] 2154 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True))) 2155 self.assertEqual(hist, [0,1]) 2156 2157 hist = [] 2158 self.assertRaises(AssertionError, list, cycle(gen1())) 2159 self.assertEqual(hist, [0,1]) 2160 2161 @support.skip_if_pgo_task 2162 def test_long_chain_of_empty_iterables(self): 2163 # Make sure itertools.chain doesn't run into recursion limits when 2164 # dealing with long chains of empty iterables. Even with a high 2165 # number this would probably only fail in Py_DEBUG mode. 2166 it = chain.from_iterable(() for unused in range(10000000)) 2167 with self.assertRaises(StopIteration): 2168 next(it) 2169 2170 def test_issue30347_1(self): 2171 def f(n): 2172 if n == 5: 2173 list(b) 2174 return n != 6 2175 for (k, b) in groupby(range(10), f): 2176 list(b) # shouldn't crash 2177 2178 def test_issue30347_2(self): 2179 class K: 2180 def __init__(self, v): 2181 pass 2182 def __eq__(self, other): 2183 nonlocal i 2184 i += 1 2185 if i == 1: 2186 next(g, None) 2187 return True 2188 i = 0 2189 g = next(groupby(range(10), K))[1] 2190 for j in range(2): 2191 next(g, None) # shouldn't crash 2192 2193 2194class SubclassWithKwargsTest(unittest.TestCase): 2195 def test_keywords_in_subclass(self): 2196 # count is not subclassable... 2197 for cls in (repeat, zip, filter, filterfalse, chain, map, 2198 starmap, islice, takewhile, dropwhile, cycle, compress): 2199 class Subclass(cls): 2200 def __init__(self, newarg=None, *args): 2201 cls.__init__(self, *args) 2202 try: 2203 Subclass(newarg=1) 2204 except TypeError as err: 2205 # we expect type errors because of wrong argument count 2206 self.assertNotIn("keyword arguments", err.args[0]) 2207 2208@support.cpython_only 2209class SizeofTest(unittest.TestCase): 2210 def setUp(self): 2211 self.ssize_t = struct.calcsize('n') 2212 2213 check_sizeof = support.check_sizeof 2214 2215 def test_product_sizeof(self): 2216 basesize = support.calcobjsize('3Pi') 2217 check = self.check_sizeof 2218 check(product('ab', '12'), basesize + 2 * self.ssize_t) 2219 check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t) 2220 2221 def test_combinations_sizeof(self): 2222 basesize = support.calcobjsize('3Pni') 2223 check = self.check_sizeof 2224 check(combinations('abcd', 3), basesize + 3 * self.ssize_t) 2225 check(combinations(range(10), 4), basesize + 4 * self.ssize_t) 2226 2227 def test_combinations_with_replacement_sizeof(self): 2228 cwr = combinations_with_replacement 2229 basesize = support.calcobjsize('3Pni') 2230 check = self.check_sizeof 2231 check(cwr('abcd', 3), basesize + 3 * self.ssize_t) 2232 check(cwr(range(10), 4), basesize + 4 * self.ssize_t) 2233 2234 def test_permutations_sizeof(self): 2235 basesize = support.calcobjsize('4Pni') 2236 check = self.check_sizeof 2237 check(permutations('abcd'), 2238 basesize + 4 * self.ssize_t + 4 * self.ssize_t) 2239 check(permutations('abcd', 3), 2240 basesize + 4 * self.ssize_t + 3 * self.ssize_t) 2241 check(permutations('abcde', 3), 2242 basesize + 5 * self.ssize_t + 3 * self.ssize_t) 2243 check(permutations(range(10), 4), 2244 basesize + 10 * self.ssize_t + 4 * self.ssize_t) 2245 2246 2247libreftest = """ Doctest for examples in the library reference: libitertools.tex 2248 2249 2250>>> amounts = [120.15, 764.05, 823.14] 2251>>> for checknum, amount in zip(count(1200), amounts): 2252... print('Check %d is for $%.2f' % (checknum, amount)) 2253... 2254Check 1200 is for $120.15 2255Check 1201 is for $764.05 2256Check 1202 is for $823.14 2257 2258>>> import operator 2259>>> for cube in map(operator.pow, range(1,4), repeat(3)): 2260... print(cube) 2261... 22621 22638 226427 2265 2266>>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele'] 2267>>> for name in islice(reportlines, 3, None, 2): 2268... print(name.title()) 2269... 2270Alex 2271Laura 2272Martin 2273Walter 2274Samuele 2275 2276>>> from operator import itemgetter 2277>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) 2278>>> di = sorted(sorted(d.items()), key=itemgetter(1)) 2279>>> for k, g in groupby(di, itemgetter(1)): 2280... print(k, list(map(itemgetter(0), g))) 2281... 22821 ['a', 'c', 'e'] 22832 ['b', 'd', 'f'] 22843 ['g'] 2285 2286# Find runs of consecutive numbers using groupby. The key to the solution 2287# is differencing with a range so that consecutive numbers all appear in 2288# same group. 2289>>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28] 2290>>> for k, g in groupby(enumerate(data), lambda t:t[0]-t[1]): 2291... print(list(map(operator.itemgetter(1), g))) 2292... 2293[1] 2294[4, 5, 6] 2295[10] 2296[15, 16, 17, 18] 2297[22] 2298[25, 26, 27, 28] 2299 2300>>> def take(n, iterable): 2301... "Return first n items of the iterable as a list" 2302... return list(islice(iterable, n)) 2303 2304>>> def prepend(value, iterator): 2305... "Prepend a single value in front of an iterator" 2306... # prepend(1, [2, 3, 4]) -> 1 2 3 4 2307... return chain([value], iterator) 2308 2309>>> def enumerate(iterable, start=0): 2310... return zip(count(start), iterable) 2311 2312>>> def tabulate(function, start=0): 2313... "Return function(0), function(1), ..." 2314... return map(function, count(start)) 2315 2316>>> import collections 2317>>> def consume(iterator, n=None): 2318... "Advance the iterator n-steps ahead. If n is None, consume entirely." 2319... # Use functions that consume iterators at C speed. 2320... if n is None: 2321... # feed the entire iterator into a zero-length deque 2322... collections.deque(iterator, maxlen=0) 2323... else: 2324... # advance to the empty slice starting at position n 2325... next(islice(iterator, n, n), None) 2326 2327>>> def nth(iterable, n, default=None): 2328... "Returns the nth item or a default value" 2329... return next(islice(iterable, n, None), default) 2330 2331>>> def all_equal(iterable): 2332... "Returns True if all the elements are equal to each other" 2333... g = groupby(iterable) 2334... return next(g, True) and not next(g, False) 2335 2336>>> def quantify(iterable, pred=bool): 2337... "Count how many times the predicate is true" 2338... return sum(map(pred, iterable)) 2339 2340>>> def pad_none(iterable): 2341... "Returns the sequence elements and then returns None indefinitely" 2342... return chain(iterable, repeat(None)) 2343 2344>>> def ncycles(iterable, n): 2345... "Returns the sequence elements n times" 2346... return chain(*repeat(iterable, n)) 2347 2348>>> def dotproduct(vec1, vec2): 2349... return sum(map(operator.mul, vec1, vec2)) 2350 2351>>> def flatten(listOfLists): 2352... return list(chain.from_iterable(listOfLists)) 2353 2354>>> def repeatfunc(func, times=None, *args): 2355... "Repeat calls to func with specified arguments." 2356... " Example: repeatfunc(random.random)" 2357... if times is None: 2358... return starmap(func, repeat(args)) 2359... else: 2360... return starmap(func, repeat(args, times)) 2361 2362>>> def pairwise(iterable): 2363... "s -> (s0,s1), (s1,s2), (s2, s3), ..." 2364... a, b = tee(iterable) 2365... try: 2366... next(b) 2367... except StopIteration: 2368... pass 2369... return zip(a, b) 2370 2371>>> def grouper(n, iterable, fillvalue=None): 2372... "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" 2373... args = [iter(iterable)] * n 2374... return zip_longest(*args, fillvalue=fillvalue) 2375 2376>>> def roundrobin(*iterables): 2377... "roundrobin('ABC', 'D', 'EF') --> A D E B F C" 2378... # Recipe credited to George Sakkis 2379... pending = len(iterables) 2380... nexts = cycle(iter(it).__next__ for it in iterables) 2381... while pending: 2382... try: 2383... for next in nexts: 2384... yield next() 2385... except StopIteration: 2386... pending -= 1 2387... nexts = cycle(islice(nexts, pending)) 2388 2389>>> def powerset(iterable): 2390... "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" 2391... s = list(iterable) 2392... return chain.from_iterable(combinations(s, r) for r in range(len(s)+1)) 2393 2394>>> def unique_everseen(iterable, key=None): 2395... "List unique elements, preserving order. Remember all elements ever seen." 2396... # unique_everseen('AAAABBBCCDAABBB') --> A B C D 2397... # unique_everseen('ABBCcAD', str.lower) --> A B C D 2398... seen = set() 2399... seen_add = seen.add 2400... if key is None: 2401... for element in iterable: 2402... if element not in seen: 2403... seen_add(element) 2404... yield element 2405... else: 2406... for element in iterable: 2407... k = key(element) 2408... if k not in seen: 2409... seen_add(k) 2410... yield element 2411 2412>>> def unique_justseen(iterable, key=None): 2413... "List unique elements, preserving order. Remember only the element just seen." 2414... # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B 2415... # unique_justseen('ABBCcAD', str.lower) --> A B C A D 2416... return map(next, map(itemgetter(1), groupby(iterable, key))) 2417 2418>>> def first_true(iterable, default=False, pred=None): 2419... '''Returns the first true value in the iterable. 2420... 2421... If no true value is found, returns *default* 2422... 2423... If *pred* is not None, returns the first item 2424... for which pred(item) is true. 2425... 2426... ''' 2427... # first_true([a,b,c], x) --> a or b or c or x 2428... # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x 2429... return next(filter(pred, iterable), default) 2430 2431>>> def nth_combination(iterable, r, index): 2432... 'Equivalent to list(combinations(iterable, r))[index]' 2433... pool = tuple(iterable) 2434... n = len(pool) 2435... if r < 0 or r > n: 2436... raise ValueError 2437... c = 1 2438... k = min(r, n-r) 2439... for i in range(1, k+1): 2440... c = c * (n - k + i) // i 2441... if index < 0: 2442... index += c 2443... if index < 0 or index >= c: 2444... raise IndexError 2445... result = [] 2446... while r: 2447... c, n, r = c*r//n, n-1, r-1 2448... while index >= c: 2449... index -= c 2450... c, n = c*(n-r)//n, n-1 2451... result.append(pool[-1-n]) 2452... return tuple(result) 2453 2454 2455This is not part of the examples but it tests to make sure the definitions 2456perform as purported. 2457 2458>>> take(10, count()) 2459[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2460 2461>>> list(prepend(1, [2, 3, 4])) 2462[1, 2, 3, 4] 2463 2464>>> list(enumerate('abc')) 2465[(0, 'a'), (1, 'b'), (2, 'c')] 2466 2467>>> list(islice(tabulate(lambda x: 2*x), 4)) 2468[0, 2, 4, 6] 2469 2470>>> it = iter(range(10)) 2471>>> consume(it, 3) 2472>>> next(it) 24733 2474>>> consume(it) 2475>>> next(it, 'Done') 2476'Done' 2477 2478>>> nth('abcde', 3) 2479'd' 2480 2481>>> nth('abcde', 9) is None 2482True 2483 2484>>> [all_equal(s) for s in ('', 'A', 'AAAA', 'AAAB', 'AAABA')] 2485[True, True, True, False, False] 2486 2487>>> quantify(range(99), lambda x: x%2==0) 248850 2489 2490>>> a = [[1, 2, 3], [4, 5, 6]] 2491>>> flatten(a) 2492[1, 2, 3, 4, 5, 6] 2493 2494>>> list(repeatfunc(pow, 5, 2, 3)) 2495[8, 8, 8, 8, 8] 2496 2497>>> import random 2498>>> take(5, map(int, repeatfunc(random.random))) 2499[0, 0, 0, 0, 0] 2500 2501>>> list(pairwise('abcd')) 2502[('a', 'b'), ('b', 'c'), ('c', 'd')] 2503 2504>>> list(pairwise([])) 2505[] 2506 2507>>> list(pairwise('a')) 2508[] 2509 2510>>> list(islice(pad_none('abc'), 0, 6)) 2511['a', 'b', 'c', None, None, None] 2512 2513>>> list(ncycles('abc', 3)) 2514['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'] 2515 2516>>> dotproduct([1,2,3], [4,5,6]) 251732 2518 2519>>> list(grouper(3, 'abcdefg', 'x')) 2520[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'x', 'x')] 2521 2522>>> list(roundrobin('abc', 'd', 'ef')) 2523['a', 'd', 'e', 'b', 'f', 'c'] 2524 2525>>> list(powerset([1,2,3])) 2526[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)] 2527 2528>>> all(len(list(powerset(range(n)))) == 2**n for n in range(18)) 2529True 2530 2531>>> list(powerset('abcde')) == sorted(sorted(set(powerset('abcde'))), key=len) 2532True 2533 2534>>> list(unique_everseen('AAAABBBCCDAABBB')) 2535['A', 'B', 'C', 'D'] 2536 2537>>> list(unique_everseen('ABBCcAD', str.lower)) 2538['A', 'B', 'C', 'D'] 2539 2540>>> list(unique_justseen('AAAABBBCCDAABBB')) 2541['A', 'B', 'C', 'D', 'A', 'B'] 2542 2543>>> list(unique_justseen('ABBCcAD', str.lower)) 2544['A', 'B', 'C', 'A', 'D'] 2545 2546>>> first_true('ABC0DEF1', '9', str.isdigit) 2547'0' 2548 2549>>> population = 'ABCDEFGH' 2550>>> for r in range(len(population) + 1): 2551... seq = list(combinations(population, r)) 2552... for i in range(len(seq)): 2553... assert nth_combination(population, r, i) == seq[i] 2554... for i in range(-len(seq), 0): 2555... assert nth_combination(population, r, i) == seq[i] 2556 2557 2558""" 2559 2560__test__ = {'libreftest' : libreftest} 2561 2562def test_main(verbose=None): 2563 test_classes = (TestBasicOps, TestVariousIteratorArgs, TestGC, 2564 RegressionTests, LengthTransparency, 2565 SubclassWithKwargsTest, TestExamples, 2566 TestPurePythonRoughEquivalents, 2567 SizeofTest) 2568 support.run_unittest(*test_classes) 2569 2570 # verify reference counting 2571 if verbose and hasattr(sys, "gettotalrefcount"): 2572 import gc 2573 counts = [None] * 5 2574 for i in range(len(counts)): 2575 support.run_unittest(*test_classes) 2576 gc.collect() 2577 counts[i] = sys.gettotalrefcount() 2578 print(counts) 2579 2580 # doctest the examples in the library reference 2581 support.run_doctest(sys.modules[__name__], verbose) 2582 2583if __name__ == "__main__": 2584 test_main(verbose=True) 2585