1import unittest 2from test import support 3from itertools import * 4import weakref 5from decimal import Decimal 6from fractions import Fraction 7import operator 8import random 9import copy 10import pickle 11from functools import reduce 12import sys 13import struct 14import threading 15import gc 16 17maxsize = support.MAX_Py_ssize_t 18minsize = -maxsize-1 19 20def lzip(*args): 21 return list(zip(*args)) 22 23def onearg(x): 24 'Test function of one argument' 25 return 2*x 26 27def errfunc(*args): 28 'Test function that raises an error' 29 raise ValueError 30 31def gen3(): 32 'Non-restartable source sequence' 33 for i in (0, 1, 2): 34 yield i 35 36def isEven(x): 37 'Test predicate' 38 return x%2==0 39 40def isOdd(x): 41 'Test predicate' 42 return x%2==1 43 44def tupleize(*args): 45 return args 46 47def irange(n): 48 for i in range(n): 49 yield i 50 51class StopNow: 52 'Class emulating an empty iterable.' 53 def __iter__(self): 54 return self 55 def __next__(self): 56 raise StopIteration 57 58def take(n, seq): 59 'Convenience function for partially consuming a long of infinite iterable' 60 return list(islice(seq, n)) 61 62def prod(iterable): 63 return reduce(operator.mul, iterable, 1) 64 65def fact(n): 66 'Factorial' 67 return prod(range(1, n+1)) 68 69# root level methods for pickling ability 70def testR(r): 71 return r[0] 72 73def testR2(r): 74 return r[2] 75 76def underten(x): 77 return x<10 78 79picklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto)) 80 for proto in range(pickle.HIGHEST_PROTOCOL + 1)] 81 82class TestBasicOps(unittest.TestCase): 83 84 def pickletest(self, protocol, it, stop=4, take=1, compare=None): 85 """Test that an iterator is the same after pickling, also when part-consumed""" 86 def expand(it, i=0): 87 # Recursively expand iterables, within sensible bounds 88 if i > 10: 89 raise RuntimeError("infinite recursion encountered") 90 if isinstance(it, str): 91 return it 92 try: 93 l = list(islice(it, stop)) 94 except TypeError: 95 return it # can't expand it 96 return [expand(e, i+1) for e in l] 97 98 # Test the initial copy against the original 99 dump = pickle.dumps(it, protocol) 100 i2 = pickle.loads(dump) 101 self.assertEqual(type(it), type(i2)) 102 a, b = expand(it), expand(i2) 103 self.assertEqual(a, b) 104 if compare: 105 c = expand(compare) 106 self.assertEqual(a, c) 107 108 # Take from the copy, and create another copy and compare them. 109 i3 = pickle.loads(dump) 110 took = 0 111 try: 112 for i in range(take): 113 next(i3) 114 took += 1 115 except StopIteration: 116 pass #in case there is less data than 'take' 117 dump = pickle.dumps(i3, protocol) 118 i4 = pickle.loads(dump) 119 a, b = expand(i3), expand(i4) 120 self.assertEqual(a, b) 121 if compare: 122 c = expand(compare[took:]) 123 self.assertEqual(a, c); 124 125 def test_accumulate(self): 126 self.assertEqual(list(accumulate(range(10))), # one positional arg 127 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 128 self.assertEqual(list(accumulate(iterable=range(10))), # kw arg 129 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 130 for typ in int, complex, Decimal, Fraction: # multiple types 131 self.assertEqual( 132 list(accumulate(map(typ, range(10)))), 133 list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]))) 134 self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric 135 self.assertEqual(list(accumulate([])), []) # empty iterable 136 self.assertEqual(list(accumulate([7])), [7]) # iterable of length one 137 self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args 138 self.assertRaises(TypeError, accumulate) # too few args 139 self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg 140 self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add 141 142 s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6] 143 self.assertEqual(list(accumulate(s, min)), 144 [2, 2, 2, 2, 2, 0, 0, 0, 0, 0]) 145 self.assertEqual(list(accumulate(s, max)), 146 [2, 8, 9, 9, 9, 9, 9, 9, 9, 9]) 147 self.assertEqual(list(accumulate(s, operator.mul)), 148 [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0]) 149 with self.assertRaises(TypeError): 150 list(accumulate(s, chr)) # unary-operation 151 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 152 self.pickletest(proto, accumulate(range(10))) # test pickling 153 self.pickletest(proto, accumulate(range(10), initial=7)) 154 self.assertEqual(list(accumulate([10, 5, 1], initial=None)), [10, 15, 16]) 155 self.assertEqual(list(accumulate([10, 5, 1], initial=100)), [100, 110, 115, 116]) 156 self.assertEqual(list(accumulate([], initial=100)), [100]) 157 with self.assertRaises(TypeError): 158 list(accumulate([10, 20], 100)) 159 160 def test_chain(self): 161 162 def chain2(*iterables): 163 'Pure python version in the docs' 164 for it in iterables: 165 for element in it: 166 yield element 167 168 for c in (chain, chain2): 169 self.assertEqual(list(c('abc', 'def')), list('abcdef')) 170 self.assertEqual(list(c('abc')), list('abc')) 171 self.assertEqual(list(c('')), []) 172 self.assertEqual(take(4, c('abc', 'def')), list('abcd')) 173 self.assertRaises(TypeError, list,c(2, 3)) 174 175 def test_chain_from_iterable(self): 176 self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef')) 177 self.assertEqual(list(chain.from_iterable(['abc'])), list('abc')) 178 self.assertEqual(list(chain.from_iterable([''])), []) 179 self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd')) 180 self.assertRaises(TypeError, list, chain.from_iterable([2, 3])) 181 182 def test_chain_reducible(self): 183 for oper in [copy.deepcopy] + picklecopiers: 184 it = chain('abc', 'def') 185 self.assertEqual(list(oper(it)), list('abcdef')) 186 self.assertEqual(next(it), 'a') 187 self.assertEqual(list(oper(it)), list('bcdef')) 188 189 self.assertEqual(list(oper(chain(''))), []) 190 self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd')) 191 self.assertRaises(TypeError, list, oper(chain(2, 3))) 192 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 193 self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef')) 194 195 def test_chain_setstate(self): 196 self.assertRaises(TypeError, chain().__setstate__, ()) 197 self.assertRaises(TypeError, chain().__setstate__, []) 198 self.assertRaises(TypeError, chain().__setstate__, 0) 199 self.assertRaises(TypeError, chain().__setstate__, ([],)) 200 self.assertRaises(TypeError, chain().__setstate__, (iter([]), [])) 201 it = chain() 202 it.__setstate__((iter(['abc', 'def']),)) 203 self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f']) 204 it = chain() 205 it.__setstate__((iter(['abc', 'def']), iter(['ghi']))) 206 self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f']) 207 208 def test_combinations(self): 209 self.assertRaises(TypeError, combinations, 'abc') # missing r argument 210 self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments 211 self.assertRaises(TypeError, combinations, None) # pool is not iterable 212 self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative 213 214 for op in [lambda a:a] + picklecopiers: 215 self.assertEqual(list(op(combinations('abc', 32))), []) # r > n 216 217 self.assertEqual(list(op(combinations('ABCD', 2))), 218 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 219 testIntermediate = combinations('ABCD', 2) 220 next(testIntermediate) 221 self.assertEqual(list(op(testIntermediate)), 222 [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 223 224 self.assertEqual(list(op(combinations(range(4), 3))), 225 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 226 testIntermediate = combinations(range(4), 3) 227 next(testIntermediate) 228 self.assertEqual(list(op(testIntermediate)), 229 [(0,1,3), (0,2,3), (1,2,3)]) 230 231 232 def combinations1(iterable, r): 233 'Pure python version shown in the docs' 234 pool = tuple(iterable) 235 n = len(pool) 236 if r > n: 237 return 238 indices = list(range(r)) 239 yield tuple(pool[i] for i in indices) 240 while 1: 241 for i in reversed(range(r)): 242 if indices[i] != i + n - r: 243 break 244 else: 245 return 246 indices[i] += 1 247 for j in range(i+1, r): 248 indices[j] = indices[j-1] + 1 249 yield tuple(pool[i] for i in indices) 250 251 def combinations2(iterable, r): 252 'Pure python version shown in the docs' 253 pool = tuple(iterable) 254 n = len(pool) 255 for indices in permutations(range(n), r): 256 if sorted(indices) == list(indices): 257 yield tuple(pool[i] for i in indices) 258 259 def combinations3(iterable, r): 260 'Pure python version from cwr()' 261 pool = tuple(iterable) 262 n = len(pool) 263 for indices in combinations_with_replacement(range(n), r): 264 if len(set(indices)) == r: 265 yield tuple(pool[i] for i in indices) 266 267 for n in range(7): 268 values = [5*x-12 for x in range(n)] 269 for r in range(n+2): 270 result = list(combinations(values, r)) 271 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs 272 self.assertEqual(len(result), len(set(result))) # no repeats 273 self.assertEqual(result, sorted(result)) # lexicographic order 274 for c in result: 275 self.assertEqual(len(c), r) # r-length combinations 276 self.assertEqual(len(set(c)), r) # no duplicate elements 277 self.assertEqual(list(c), sorted(c)) # keep original ordering 278 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 279 self.assertEqual(list(c), 280 [e for e in values if e in c]) # comb is a subsequence of the input iterable 281 self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version 282 self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version 283 self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version 284 285 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 286 self.pickletest(proto, combinations(values, r)) # test pickling 287 288 @support.bigaddrspacetest 289 def test_combinations_overflow(self): 290 with self.assertRaises((OverflowError, MemoryError)): 291 combinations("AA", 2**29) 292 293 # Test implementation detail: tuple re-use 294 @support.impl_detail("tuple reuse is specific to CPython") 295 def test_combinations_tuple_reuse(self): 296 self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1) 297 self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1) 298 299 def test_combinations_with_replacement(self): 300 cwr = combinations_with_replacement 301 self.assertRaises(TypeError, cwr, 'abc') # missing r argument 302 self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments 303 self.assertRaises(TypeError, cwr, None) # pool is not iterable 304 self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative 305 306 for op in [lambda a:a] + picklecopiers: 307 self.assertEqual(list(op(cwr('ABC', 2))), 308 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 309 testIntermediate = cwr('ABC', 2) 310 next(testIntermediate) 311 self.assertEqual(list(op(testIntermediate)), 312 [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 313 314 315 def cwr1(iterable, r): 316 'Pure python version shown in the docs' 317 # number items returned: (n+r-1)! / r! / (n-1)! when n>0 318 pool = tuple(iterable) 319 n = len(pool) 320 if not n and r: 321 return 322 indices = [0] * r 323 yield tuple(pool[i] for i in indices) 324 while 1: 325 for i in reversed(range(r)): 326 if indices[i] != n - 1: 327 break 328 else: 329 return 330 indices[i:] = [indices[i] + 1] * (r - i) 331 yield tuple(pool[i] for i in indices) 332 333 def cwr2(iterable, r): 334 'Pure python version shown in the docs' 335 pool = tuple(iterable) 336 n = len(pool) 337 for indices in product(range(n), repeat=r): 338 if sorted(indices) == list(indices): 339 yield tuple(pool[i] for i in indices) 340 341 def numcombs(n, r): 342 if not n: 343 return 0 if r else 1 344 return fact(n+r-1) / fact(r)/ fact(n-1) 345 346 for n in range(7): 347 values = [5*x-12 for x in range(n)] 348 for r in range(n+2): 349 result = list(cwr(values, r)) 350 351 self.assertEqual(len(result), numcombs(n, r)) # right number of combs 352 self.assertEqual(len(result), len(set(result))) # no repeats 353 self.assertEqual(result, sorted(result)) # lexicographic order 354 355 regular_combs = list(combinations(values, r)) # compare to combs without replacement 356 if n == 0 or r <= 1: 357 self.assertEqual(result, regular_combs) # cases that should be identical 358 else: 359 self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs 360 361 for c in result: 362 self.assertEqual(len(c), r) # r-length combinations 363 noruns = [k for k,v in groupby(c)] # combo without consecutive repeats 364 self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive 365 self.assertEqual(list(c), sorted(c)) # keep original ordering 366 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 367 self.assertEqual(noruns, 368 [e for e in values if e in c]) # comb is a subsequence of the input iterable 369 self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version 370 self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version 371 372 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 373 self.pickletest(proto, cwr(values,r)) # test pickling 374 375 @support.bigaddrspacetest 376 def test_combinations_with_replacement_overflow(self): 377 with self.assertRaises((OverflowError, MemoryError)): 378 combinations_with_replacement("AA", 2**30) 379 380 # Test implementation detail: tuple re-use 381 @support.impl_detail("tuple reuse is specific to CPython") 382 def test_combinations_with_replacement_tuple_reuse(self): 383 cwr = combinations_with_replacement 384 self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1) 385 self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1) 386 387 def test_permutations(self): 388 self.assertRaises(TypeError, permutations) # too few arguments 389 self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments 390 self.assertRaises(TypeError, permutations, None) # pool is not iterable 391 self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative 392 self.assertEqual(list(permutations('abc', 32)), []) # r > n 393 self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None 394 self.assertEqual(list(permutations(range(3), 2)), 395 [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)]) 396 397 def permutations1(iterable, r=None): 398 'Pure python version shown in the docs' 399 pool = tuple(iterable) 400 n = len(pool) 401 r = n if r is None else r 402 if r > n: 403 return 404 indices = list(range(n)) 405 cycles = list(range(n-r+1, n+1))[::-1] 406 yield tuple(pool[i] for i in indices[:r]) 407 while n: 408 for i in reversed(range(r)): 409 cycles[i] -= 1 410 if cycles[i] == 0: 411 indices[i:] = indices[i+1:] + indices[i:i+1] 412 cycles[i] = n - i 413 else: 414 j = cycles[i] 415 indices[i], indices[-j] = indices[-j], indices[i] 416 yield tuple(pool[i] for i in indices[:r]) 417 break 418 else: 419 return 420 421 def permutations2(iterable, r=None): 422 'Pure python version shown in the docs' 423 pool = tuple(iterable) 424 n = len(pool) 425 r = n if r is None else r 426 for indices in product(range(n), repeat=r): 427 if len(set(indices)) == r: 428 yield tuple(pool[i] for i in indices) 429 430 for n in range(7): 431 values = [5*x-12 for x in range(n)] 432 for r in range(n+2): 433 result = list(permutations(values, r)) 434 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms 435 self.assertEqual(len(result), len(set(result))) # no repeats 436 self.assertEqual(result, sorted(result)) # lexicographic order 437 for p in result: 438 self.assertEqual(len(p), r) # r-length permutations 439 self.assertEqual(len(set(p)), r) # no duplicate elements 440 self.assertTrue(all(e in values for e in p)) # elements taken from input iterable 441 self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version 442 self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version 443 if r == n: 444 self.assertEqual(result, list(permutations(values, None))) # test r as None 445 self.assertEqual(result, list(permutations(values))) # test default r 446 447 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 448 self.pickletest(proto, permutations(values, r)) # test pickling 449 450 @support.bigaddrspacetest 451 def test_permutations_overflow(self): 452 with self.assertRaises((OverflowError, MemoryError)): 453 permutations("A", 2**30) 454 455 @support.impl_detail("tuple reuse is specific to CPython") 456 def test_permutations_tuple_reuse(self): 457 self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1) 458 self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1) 459 460 def test_combinatorics(self): 461 # Test relationships between product(), permutations(), 462 # combinations() and combinations_with_replacement(). 463 464 for n in range(6): 465 s = 'ABCDEFG'[:n] 466 for r in range(8): 467 prod = list(product(s, repeat=r)) 468 cwr = list(combinations_with_replacement(s, r)) 469 perm = list(permutations(s, r)) 470 comb = list(combinations(s, r)) 471 472 # Check size 473 self.assertEqual(len(prod), n**r) 474 self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r)) 475 self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r)) 476 self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r)) 477 478 # Check lexicographic order without repeated tuples 479 self.assertEqual(prod, sorted(set(prod))) 480 self.assertEqual(cwr, sorted(set(cwr))) 481 self.assertEqual(perm, sorted(set(perm))) 482 self.assertEqual(comb, sorted(set(comb))) 483 484 # Check interrelationships 485 self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted 486 self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups 487 self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted 488 self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups 489 self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr 490 self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm 491 self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm 492 493 def test_compress(self): 494 self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF')) 495 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 496 self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list('')) 497 self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF')) 498 self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC')) 499 self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC')) 500 n = 10000 501 data = chain.from_iterable(repeat(range(6), n)) 502 selectors = chain.from_iterable(repeat((0, 1))) 503 self.assertEqual(list(compress(data, selectors)), [1,3,5] * n) 504 self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable 505 self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable 506 self.assertRaises(TypeError, compress, range(6)) # too few args 507 self.assertRaises(TypeError, compress, range(6), None) # too many args 508 509 # check copy, deepcopy, pickle 510 for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers: 511 for data, selectors, result1, result2 in [ 512 ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'), 513 ('ABCDEF', [0,0,0,0,0,0], '', ''), 514 ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'), 515 ('ABCDEF', [1,0,1], 'AC', 'C'), 516 ('ABC', [0,1,1,1,1,1], 'BC', 'C'), 517 ]: 518 519 self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1)) 520 self.assertEqual(list(op(compress(data, selectors))), list(result1)) 521 testIntermediate = compress(data, selectors) 522 if result1: 523 next(testIntermediate) 524 self.assertEqual(list(op(testIntermediate)), list(result2)) 525 526 527 def test_count(self): 528 self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)]) 529 self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)]) 530 self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)]) 531 self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)]) 532 self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)]) 533 self.assertRaises(TypeError, count, 2, 3, 4) 534 self.assertRaises(TypeError, count, 'a') 535 self.assertEqual(take(10, count(maxsize-5)), 536 list(range(maxsize-5, maxsize+5))) 537 self.assertEqual(take(10, count(-maxsize-5)), 538 list(range(-maxsize-5, -maxsize+5))) 539 self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25]) 540 self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j]) 541 self.assertEqual(take(3, count(Decimal('1.1'))), 542 [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')]) 543 self.assertEqual(take(3, count(Fraction(2, 3))), 544 [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)]) 545 BIGINT = 1<<1000 546 self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2]) 547 c = count(3) 548 self.assertEqual(repr(c), 'count(3)') 549 next(c) 550 self.assertEqual(repr(c), 'count(4)') 551 c = count(-9) 552 self.assertEqual(repr(c), 'count(-9)') 553 next(c) 554 self.assertEqual(next(c), -8) 555 self.assertEqual(repr(count(10.25)), 'count(10.25)') 556 self.assertEqual(repr(count(10.0)), 'count(10.0)') 557 self.assertEqual(type(next(count(10.0))), float) 558 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 559 # Test repr 560 r1 = repr(count(i)) 561 r2 = 'count(%r)'.__mod__(i) 562 self.assertEqual(r1, r2) 563 564 # check copy, deepcopy, pickle 565 for value in -3, 3, maxsize-5, maxsize+5: 566 c = count(value) 567 self.assertEqual(next(copy.copy(c)), value) 568 self.assertEqual(next(copy.deepcopy(c)), value) 569 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 570 self.pickletest(proto, count(value)) 571 572 #check proper internal error handling for large "step' sizes 573 count(1, maxsize+5); sys.exc_info() 574 575 def test_count_with_stride(self): 576 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 577 self.assertEqual(lzip('abc',count(start=2,step=3)), 578 [('a', 2), ('b', 5), ('c', 8)]) 579 self.assertEqual(lzip('abc',count(step=-1)), 580 [('a', 0), ('b', -1), ('c', -2)]) 581 self.assertRaises(TypeError, count, 'a', 'b') 582 self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)]) 583 self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)]) 584 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 585 self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3))) 586 self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3))) 587 self.assertEqual(take(3, count(10, maxsize+5)), 588 list(range(10, 10+3*(maxsize+5), maxsize+5))) 589 self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5]) 590 self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j]) 591 self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))), 592 [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')]) 593 self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))), 594 [Fraction(2,3), Fraction(17,21), Fraction(20,21)]) 595 BIGINT = 1<<1000 596 self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT]) 597 self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0])) 598 c = count(3, 5) 599 self.assertEqual(repr(c), 'count(3, 5)') 600 next(c) 601 self.assertEqual(repr(c), 'count(8, 5)') 602 c = count(-9, 0) 603 self.assertEqual(repr(c), 'count(-9, 0)') 604 next(c) 605 self.assertEqual(repr(c), 'count(-9, 0)') 606 c = count(-9, -3) 607 self.assertEqual(repr(c), 'count(-9, -3)') 608 next(c) 609 self.assertEqual(repr(c), 'count(-12, -3)') 610 self.assertEqual(repr(c), 'count(-12, -3)') 611 self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)') 612 self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int 613 self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0 614 self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)') 615 c = count(10, 1.0) 616 self.assertEqual(type(next(c)), int) 617 self.assertEqual(type(next(c)), float) 618 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 619 for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5): 620 # Test repr 621 r1 = repr(count(i, j)) 622 if j == 1: 623 r2 = ('count(%r)' % i) 624 else: 625 r2 = ('count(%r, %r)' % (i, j)) 626 self.assertEqual(r1, r2) 627 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 628 self.pickletest(proto, count(i, j)) 629 630 def test_cycle(self): 631 self.assertEqual(take(10, cycle('abc')), list('abcabcabca')) 632 self.assertEqual(list(cycle('')), []) 633 self.assertRaises(TypeError, cycle) 634 self.assertRaises(TypeError, cycle, 5) 635 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0]) 636 637 # check copy, deepcopy, pickle 638 c = cycle('abc') 639 self.assertEqual(next(c), 'a') 640 #simple copy currently not supported, because __reduce__ returns 641 #an internal iterator 642 #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab')) 643 self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab')) 644 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 645 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 646 list('bcabcabcab')) 647 next(c) 648 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 649 list('cabcabcabc')) 650 next(c) 651 next(c) 652 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 653 self.pickletest(proto, cycle('abc')) 654 655 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 656 # test with partial consumed input iterable 657 it = iter('abcde') 658 c = cycle(it) 659 _ = [next(c) for i in range(2)] # consume 2 of 5 inputs 660 p = pickle.dumps(c, proto) 661 d = pickle.loads(p) # rebuild the cycle object 662 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 663 664 # test with completely consumed input iterable 665 it = iter('abcde') 666 c = cycle(it) 667 _ = [next(c) for i in range(7)] # consume 7 of 5 inputs 668 p = pickle.dumps(c, proto) 669 d = pickle.loads(p) # rebuild the cycle object 670 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 671 672 def test_cycle_setstate(self): 673 # Verify both modes for restoring state 674 675 # Mode 0 is efficient. It uses an incompletely consumed input 676 # iterator to build a cycle object and then passes in state with 677 # a list of previously consumed values. There is no data 678 # overlap between the two. 679 c = cycle('defg') 680 c.__setstate__((list('abc'), 0)) 681 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 682 683 # Mode 1 is inefficient. It starts with a cycle object built 684 # from an iterator over the remaining elements in a partial 685 # cycle and then passes in state with all of the previously 686 # seen values (this overlaps values included in the iterator). 687 c = cycle('defg') 688 c.__setstate__((list('abcdefg'), 1)) 689 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 690 691 # The first argument to setstate needs to be a tuple 692 with self.assertRaises(TypeError): 693 cycle('defg').__setstate__([list('abcdefg'), 0]) 694 695 # The first argument in the setstate tuple must be a list 696 with self.assertRaises(TypeError): 697 c = cycle('defg') 698 c.__setstate__((tuple('defg'), 0)) 699 take(20, c) 700 701 # The second argument in the setstate tuple must be an int 702 with self.assertRaises(TypeError): 703 cycle('defg').__setstate__((list('abcdefg'), 'x')) 704 705 self.assertRaises(TypeError, cycle('').__setstate__, ()) 706 self.assertRaises(TypeError, cycle('').__setstate__, ([],)) 707 708 def test_groupby(self): 709 # Check whether it accepts arguments correctly 710 self.assertEqual([], list(groupby([]))) 711 self.assertEqual([], list(groupby([], key=id))) 712 self.assertRaises(TypeError, list, groupby('abc', [])) 713 self.assertRaises(TypeError, groupby, None) 714 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10) 715 716 # Check normal input 717 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22), 718 (2,15,22), (3,16,23), (3,17,23)] 719 dup = [] 720 for k, g in groupby(s, lambda r:r[0]): 721 for elem in g: 722 self.assertEqual(k, elem[0]) 723 dup.append(elem) 724 self.assertEqual(s, dup) 725 726 # Check normal pickled 727 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 728 dup = [] 729 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 730 for elem in g: 731 self.assertEqual(k, elem[0]) 732 dup.append(elem) 733 self.assertEqual(s, dup) 734 735 # Check nested case 736 dup = [] 737 for k, g in groupby(s, testR): 738 for ik, ig in groupby(g, testR2): 739 for elem in ig: 740 self.assertEqual(k, elem[0]) 741 self.assertEqual(ik, elem[2]) 742 dup.append(elem) 743 self.assertEqual(s, dup) 744 745 # Check nested and pickled 746 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 747 dup = [] 748 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 749 for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)): 750 for elem in ig: 751 self.assertEqual(k, elem[0]) 752 self.assertEqual(ik, elem[2]) 753 dup.append(elem) 754 self.assertEqual(s, dup) 755 756 757 # Check case where inner iterator is not used 758 keys = [k for k, g in groupby(s, testR)] 759 expectedkeys = set([r[0] for r in s]) 760 self.assertEqual(set(keys), expectedkeys) 761 self.assertEqual(len(keys), len(expectedkeys)) 762 763 # Check case where inner iterator is used after advancing the groupby 764 # iterator 765 s = list(zip('AABBBAAAA', range(9))) 766 it = groupby(s, testR) 767 _, g1 = next(it) 768 _, g2 = next(it) 769 _, g3 = next(it) 770 self.assertEqual(list(g1), []) 771 self.assertEqual(list(g2), []) 772 self.assertEqual(next(g3), ('A', 5)) 773 list(it) # exhaust the groupby iterator 774 self.assertEqual(list(g3), []) 775 776 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 777 it = groupby(s, testR) 778 _, g = next(it) 779 next(it) 780 next(it) 781 self.assertEqual(list(pickle.loads(pickle.dumps(g, proto))), []) 782 783 # Exercise pipes and filters style 784 s = 'abracadabra' 785 # sort s | uniq 786 r = [k for k, g in groupby(sorted(s))] 787 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r']) 788 # sort s | uniq -d 789 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))] 790 self.assertEqual(r, ['a', 'b', 'r']) 791 # sort s | uniq -c 792 r = [(len(list(g)), k) for k, g in groupby(sorted(s))] 793 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')]) 794 # sort s | uniq -c | sort -rn | head -3 795 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3] 796 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')]) 797 798 # iter.__next__ failure 799 class ExpectedError(Exception): 800 pass 801 def delayed_raise(n=0): 802 for i in range(n): 803 yield 'yo' 804 raise ExpectedError 805 def gulp(iterable, keyp=None, func=list): 806 return [func(g) for k, g in groupby(iterable, keyp)] 807 808 # iter.__next__ failure on outer object 809 self.assertRaises(ExpectedError, gulp, delayed_raise(0)) 810 # iter.__next__ failure on inner object 811 self.assertRaises(ExpectedError, gulp, delayed_raise(1)) 812 813 # __eq__ failure 814 class DummyCmp: 815 def __eq__(self, dst): 816 raise ExpectedError 817 s = [DummyCmp(), DummyCmp(), None] 818 819 # __eq__ failure on outer object 820 self.assertRaises(ExpectedError, gulp, s, func=id) 821 # __eq__ failure on inner object 822 self.assertRaises(ExpectedError, gulp, s) 823 824 # keyfunc failure 825 def keyfunc(obj): 826 if keyfunc.skip > 0: 827 keyfunc.skip -= 1 828 return obj 829 else: 830 raise ExpectedError 831 832 # keyfunc failure on outer object 833 keyfunc.skip = 0 834 self.assertRaises(ExpectedError, gulp, [None], keyfunc) 835 keyfunc.skip = 1 836 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc) 837 838 def test_filter(self): 839 self.assertEqual(list(filter(isEven, range(6))), [0,2,4]) 840 self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2]) 841 self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2]) 842 self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6]) 843 self.assertRaises(TypeError, filter) 844 self.assertRaises(TypeError, filter, lambda x:x) 845 self.assertRaises(TypeError, filter, lambda x:x, range(6), 7) 846 self.assertRaises(TypeError, filter, isEven, 3) 847 self.assertRaises(TypeError, next, filter(range(6), range(6))) 848 849 # check copy, deepcopy, pickle 850 ans = [0,2,4] 851 852 c = filter(isEven, range(6)) 853 self.assertEqual(list(copy.copy(c)), ans) 854 c = filter(isEven, range(6)) 855 self.assertEqual(list(copy.deepcopy(c)), ans) 856 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 857 c = filter(isEven, range(6)) 858 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans) 859 next(c) 860 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:]) 861 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 862 c = filter(isEven, range(6)) 863 self.pickletest(proto, c) 864 865 def test_filterfalse(self): 866 self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5]) 867 self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0]) 868 self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0]) 869 self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7]) 870 self.assertRaises(TypeError, filterfalse) 871 self.assertRaises(TypeError, filterfalse, lambda x:x) 872 self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7) 873 self.assertRaises(TypeError, filterfalse, isEven, 3) 874 self.assertRaises(TypeError, next, filterfalse(range(6), range(6))) 875 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 876 self.pickletest(proto, filterfalse(isEven, range(6))) 877 878 def test_zip(self): 879 # XXX This is rather silly now that builtin zip() calls zip()... 880 ans = [(x,y) for x, y in zip('abc',count())] 881 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 882 self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6))) 883 self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3))) 884 self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3))) 885 self.assertEqual(list(zip('abcdef')), lzip('abcdef')) 886 self.assertEqual(list(zip()), lzip()) 887 self.assertRaises(TypeError, zip, 3) 888 self.assertRaises(TypeError, zip, range(3), 3) 889 self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')], 890 lzip('abc', 'def')) 891 self.assertEqual([pair for pair in zip('abc', 'def')], 892 lzip('abc', 'def')) 893 894 @support.impl_detail("tuple reuse is specific to CPython") 895 def test_zip_tuple_reuse(self): 896 ids = list(map(id, zip('abc', 'def'))) 897 self.assertEqual(min(ids), max(ids)) 898 ids = list(map(id, list(zip('abc', 'def')))) 899 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 900 901 # check copy, deepcopy, pickle 902 ans = [(x,y) for x, y in copy.copy(zip('abc',count()))] 903 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 904 905 ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))] 906 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 907 908 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 909 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))] 910 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 911 912 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 913 testIntermediate = zip('abc',count()) 914 next(testIntermediate) 915 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))] 916 self.assertEqual(ans, [('b', 1), ('c', 2)]) 917 918 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 919 self.pickletest(proto, zip('abc', count())) 920 921 def test_ziplongest(self): 922 for args in [ 923 ['abc', range(6)], 924 [range(6), 'abc'], 925 [range(1000), range(2000,2100), range(3000,3050)], 926 [range(1000), range(0), range(3000,3050), range(1200), range(1500)], 927 [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)], 928 ]: 929 target = [tuple([arg[i] if i < len(arg) else None for arg in args]) 930 for i in range(max(map(len, args)))] 931 self.assertEqual(list(zip_longest(*args)), target) 932 self.assertEqual(list(zip_longest(*args, **{})), target) 933 target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X' 934 self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target) 935 936 self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input 937 938 self.assertEqual(list(zip_longest()), list(zip())) 939 self.assertEqual(list(zip_longest([])), list(zip([]))) 940 self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef'))) 941 942 self.assertEqual(list(zip_longest('abc', 'defg', **{})), 943 list(zip(list('abc')+[None], 'defg'))) # empty keyword dict 944 self.assertRaises(TypeError, zip_longest, 3) 945 self.assertRaises(TypeError, zip_longest, range(3), 3) 946 947 for stmt in [ 948 "zip_longest('abc', fv=1)", 949 "zip_longest('abc', fillvalue=1, bogus_keyword=None)", 950 ]: 951 try: 952 eval(stmt, globals(), locals()) 953 except TypeError: 954 pass 955 else: 956 self.fail('Did not raise Type in: ' + stmt) 957 958 self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')], 959 list(zip('abc', 'def'))) 960 self.assertEqual([pair for pair in zip_longest('abc', 'def')], 961 list(zip('abc', 'def'))) 962 963 @support.impl_detail("tuple reuse is specific to CPython") 964 def test_zip_longest_tuple_reuse(self): 965 ids = list(map(id, zip_longest('abc', 'def'))) 966 self.assertEqual(min(ids), max(ids)) 967 ids = list(map(id, list(zip_longest('abc', 'def')))) 968 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 969 970 def test_zip_longest_pickling(self): 971 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 972 self.pickletest(proto, zip_longest("abc", "def")) 973 self.pickletest(proto, zip_longest("abc", "defgh")) 974 self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1)) 975 self.pickletest(proto, zip_longest("", "defgh")) 976 977 def test_zip_longest_bad_iterable(self): 978 exception = TypeError() 979 980 class BadIterable: 981 def __iter__(self): 982 raise exception 983 984 with self.assertRaises(TypeError) as cm: 985 zip_longest(BadIterable()) 986 987 self.assertIs(cm.exception, exception) 988 989 def test_bug_7244(self): 990 991 class Repeater: 992 # this class is similar to itertools.repeat 993 def __init__(self, o, t, e): 994 self.o = o 995 self.t = int(t) 996 self.e = e 997 def __iter__(self): # its iterator is itself 998 return self 999 def __next__(self): 1000 if self.t > 0: 1001 self.t -= 1 1002 return self.o 1003 else: 1004 raise self.e 1005 1006 # Formerly this code in would fail in debug mode 1007 # with Undetected Error and Stop Iteration 1008 r1 = Repeater(1, 3, StopIteration) 1009 r2 = Repeater(2, 4, StopIteration) 1010 def run(r1, r2): 1011 result = [] 1012 for i, j in zip_longest(r1, r2, fillvalue=0): 1013 with support.captured_output('stdout'): 1014 print((i, j)) 1015 result.append((i, j)) 1016 return result 1017 self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)]) 1018 1019 # Formerly, the RuntimeError would be lost 1020 # and StopIteration would stop as expected 1021 r1 = Repeater(1, 3, RuntimeError) 1022 r2 = Repeater(2, 4, StopIteration) 1023 it = zip_longest(r1, r2, fillvalue=0) 1024 self.assertEqual(next(it), (1, 2)) 1025 self.assertEqual(next(it), (1, 2)) 1026 self.assertEqual(next(it), (1, 2)) 1027 self.assertRaises(RuntimeError, next, it) 1028 1029 def test_pairwise(self): 1030 self.assertEqual(list(pairwise('')), []) 1031 self.assertEqual(list(pairwise('a')), []) 1032 self.assertEqual(list(pairwise('ab')), 1033 [('a', 'b')]), 1034 self.assertEqual(list(pairwise('abcde')), 1035 [('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]) 1036 self.assertEqual(list(pairwise(range(10_000))), 1037 list(zip(range(10_000), range(1, 10_000)))) 1038 1039 with self.assertRaises(TypeError): 1040 pairwise() # too few arguments 1041 with self.assertRaises(TypeError): 1042 pairwise('abc', 10) # too many arguments 1043 with self.assertRaises(TypeError): 1044 pairwise(iterable='abc') # keyword arguments 1045 with self.assertRaises(TypeError): 1046 pairwise(None) # non-iterable argument 1047 1048 def test_product(self): 1049 for args, result in [ 1050 ([], [()]), # zero iterables 1051 (['ab'], [('a',), ('b',)]), # one iterable 1052 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1053 ([range(0), range(2), range(3)], []), # first iterable with zero length 1054 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1055 ([range(2), range(3), range(0)], []), # last iterable with zero length 1056 ]: 1057 self.assertEqual(list(product(*args)), result) 1058 for r in range(4): 1059 self.assertEqual(list(product(*(args*r))), 1060 list(product(*args, **dict(repeat=r)))) 1061 self.assertEqual(len(list(product(*[range(7)]*6))), 7**6) 1062 self.assertRaises(TypeError, product, range(6), None) 1063 1064 def product1(*args, **kwds): 1065 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1066 n = len(pools) 1067 if n == 0: 1068 yield () 1069 return 1070 if any(len(pool) == 0 for pool in pools): 1071 return 1072 indices = [0] * n 1073 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1074 while 1: 1075 for i in reversed(range(n)): # right to left 1076 if indices[i] == len(pools[i]) - 1: 1077 continue 1078 indices[i] += 1 1079 for j in range(i+1, n): 1080 indices[j] = 0 1081 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1082 break 1083 else: 1084 return 1085 1086 def product2(*args, **kwds): 1087 'Pure python version used in docs' 1088 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1089 result = [[]] 1090 for pool in pools: 1091 result = [x+[y] for x in result for y in pool] 1092 for prod in result: 1093 yield tuple(prod) 1094 1095 argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3), 1096 set('abcdefg'), range(11), tuple(range(13))] 1097 for i in range(100): 1098 args = [random.choice(argtypes) for j in range(random.randrange(5))] 1099 expected_len = prod(map(len, args)) 1100 self.assertEqual(len(list(product(*args))), expected_len) 1101 self.assertEqual(list(product(*args)), list(product1(*args))) 1102 self.assertEqual(list(product(*args)), list(product2(*args))) 1103 args = map(iter, args) 1104 self.assertEqual(len(list(product(*args))), expected_len) 1105 1106 @support.bigaddrspacetest 1107 def test_product_overflow(self): 1108 with self.assertRaises((OverflowError, MemoryError)): 1109 product(*(['ab']*2**5), repeat=2**25) 1110 1111 @support.impl_detail("tuple reuse is specific to CPython") 1112 def test_product_tuple_reuse(self): 1113 self.assertEqual(len(set(map(id, product('abc', 'def')))), 1) 1114 self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1) 1115 1116 def test_product_pickling(self): 1117 # check copy, deepcopy, pickle 1118 for args, result in [ 1119 ([], [()]), # zero iterables 1120 (['ab'], [('a',), ('b',)]), # one iterable 1121 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1122 ([range(0), range(2), range(3)], []), # first iterable with zero length 1123 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1124 ([range(2), range(3), range(0)], []), # last iterable with zero length 1125 ]: 1126 self.assertEqual(list(copy.copy(product(*args))), result) 1127 self.assertEqual(list(copy.deepcopy(product(*args))), result) 1128 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1129 self.pickletest(proto, product(*args)) 1130 1131 def test_product_issue_25021(self): 1132 # test that indices are properly clamped to the length of the tuples 1133 p = product((1, 2),(3,)) 1134 p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped 1135 self.assertEqual(next(p), (2, 3)) 1136 # test that empty tuple in the list will result in an immediate StopIteration 1137 p = product((1, 2), (), (3,)) 1138 p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped 1139 self.assertRaises(StopIteration, next, p) 1140 1141 def test_repeat(self): 1142 self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a']) 1143 self.assertEqual(lzip(range(3),repeat('a')), 1144 [(0, 'a'), (1, 'a'), (2, 'a')]) 1145 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a']) 1146 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a']) 1147 self.assertEqual(list(repeat('a', 0)), []) 1148 self.assertEqual(list(repeat('a', -3)), []) 1149 self.assertRaises(TypeError, repeat) 1150 self.assertRaises(TypeError, repeat, None, 3, 4) 1151 self.assertRaises(TypeError, repeat, None, 'a') 1152 r = repeat(1+0j) 1153 self.assertEqual(repr(r), 'repeat((1+0j))') 1154 r = repeat(1+0j, 5) 1155 self.assertEqual(repr(r), 'repeat((1+0j), 5)') 1156 list(r) 1157 self.assertEqual(repr(r), 'repeat((1+0j), 0)') 1158 1159 # check copy, deepcopy, pickle 1160 c = repeat(object='a', times=10) 1161 self.assertEqual(next(c), 'a') 1162 self.assertEqual(take(2, copy.copy(c)), list('a' * 2)) 1163 self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2)) 1164 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1165 self.pickletest(proto, repeat(object='a', times=10)) 1166 1167 def test_repeat_with_negative_times(self): 1168 self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)") 1169 self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)") 1170 self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)") 1171 self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)") 1172 1173 def test_map(self): 1174 self.assertEqual(list(map(operator.pow, range(3), range(1,7))), 1175 [0**1, 1**2, 2**3]) 1176 self.assertEqual(list(map(tupleize, 'abc', range(5))), 1177 [('a',0),('b',1),('c',2)]) 1178 self.assertEqual(list(map(tupleize, 'abc', count())), 1179 [('a',0),('b',1),('c',2)]) 1180 self.assertEqual(take(2,map(tupleize, 'abc', count())), 1181 [('a',0),('b',1)]) 1182 self.assertEqual(list(map(operator.pow, [])), []) 1183 self.assertRaises(TypeError, map) 1184 self.assertRaises(TypeError, list, map(None, range(3), range(3))) 1185 self.assertRaises(TypeError, map, operator.neg) 1186 self.assertRaises(TypeError, next, map(10, range(5))) 1187 self.assertRaises(ValueError, next, map(errfunc, [4], [5])) 1188 self.assertRaises(TypeError, next, map(onearg, [4], [5])) 1189 1190 # check copy, deepcopy, pickle 1191 ans = [('a',0),('b',1),('c',2)] 1192 1193 c = map(tupleize, 'abc', count()) 1194 self.assertEqual(list(copy.copy(c)), ans) 1195 1196 c = map(tupleize, 'abc', count()) 1197 self.assertEqual(list(copy.deepcopy(c)), ans) 1198 1199 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1200 c = map(tupleize, 'abc', count()) 1201 self.pickletest(proto, c) 1202 1203 def test_starmap(self): 1204 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))), 1205 [0**1, 1**2, 2**3]) 1206 self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))), 1207 [0**1, 1**2, 2**3]) 1208 self.assertEqual(list(starmap(operator.pow, [])), []) 1209 self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5]) 1210 self.assertRaises(TypeError, list, starmap(operator.pow, [None])) 1211 self.assertRaises(TypeError, starmap) 1212 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra') 1213 self.assertRaises(TypeError, next, starmap(10, [(4,5)])) 1214 self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)])) 1215 self.assertRaises(TypeError, next, starmap(onearg, [(4,5)])) 1216 1217 # check copy, deepcopy, pickle 1218 ans = [0**1, 1**2, 2**3] 1219 1220 c = starmap(operator.pow, zip(range(3), range(1,7))) 1221 self.assertEqual(list(copy.copy(c)), ans) 1222 1223 c = starmap(operator.pow, zip(range(3), range(1,7))) 1224 self.assertEqual(list(copy.deepcopy(c)), ans) 1225 1226 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1227 c = starmap(operator.pow, zip(range(3), range(1,7))) 1228 self.pickletest(proto, c) 1229 1230 def test_islice(self): 1231 for args in [ # islice(args) should agree with range(args) 1232 (10, 20, 3), 1233 (10, 3, 20), 1234 (10, 20), 1235 (10, 10), 1236 (10, 3), 1237 (20,) 1238 ]: 1239 self.assertEqual(list(islice(range(100), *args)), 1240 list(range(*args))) 1241 1242 for args, tgtargs in [ # Stop when seqn is exhausted 1243 ((10, 110, 3), ((10, 100, 3))), 1244 ((10, 110), ((10, 100))), 1245 ((110,), (100,)) 1246 ]: 1247 self.assertEqual(list(islice(range(100), *args)), 1248 list(range(*tgtargs))) 1249 1250 # Test stop=None 1251 self.assertEqual(list(islice(range(10), None)), list(range(10))) 1252 self.assertEqual(list(islice(range(10), None, None)), list(range(10))) 1253 self.assertEqual(list(islice(range(10), None, None, None)), list(range(10))) 1254 self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10))) 1255 self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2))) 1256 1257 # Test number of items consumed SF #1171417 1258 it = iter(range(10)) 1259 self.assertEqual(list(islice(it, 3)), list(range(3))) 1260 self.assertEqual(list(it), list(range(3, 10))) 1261 1262 it = iter(range(10)) 1263 self.assertEqual(list(islice(it, 3, 3)), []) 1264 self.assertEqual(list(it), list(range(3, 10))) 1265 1266 # Test invalid arguments 1267 ra = range(10) 1268 self.assertRaises(TypeError, islice, ra) 1269 self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4) 1270 self.assertRaises(ValueError, islice, ra, -5, 10, 1) 1271 self.assertRaises(ValueError, islice, ra, 1, -5, -1) 1272 self.assertRaises(ValueError, islice, ra, 1, 10, -1) 1273 self.assertRaises(ValueError, islice, ra, 1, 10, 0) 1274 self.assertRaises(ValueError, islice, ra, 'a') 1275 self.assertRaises(ValueError, islice, ra, 'a', 1) 1276 self.assertRaises(ValueError, islice, ra, 1, 'a') 1277 self.assertRaises(ValueError, islice, ra, 'a', 1, 1) 1278 self.assertRaises(ValueError, islice, ra, 1, 'a', 1) 1279 self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1) 1280 1281 # Issue #10323: Less islice in a predictable state 1282 c = count() 1283 self.assertEqual(list(islice(c, 1, 3, 50)), [1]) 1284 self.assertEqual(next(c), 3) 1285 1286 # check copy, deepcopy, pickle 1287 for args in [ # islice(args) should agree with range(args) 1288 (10, 20, 3), 1289 (10, 3, 20), 1290 (10, 20), 1291 (10, 3), 1292 (20,) 1293 ]: 1294 self.assertEqual(list(copy.copy(islice(range(100), *args))), 1295 list(range(*args))) 1296 self.assertEqual(list(copy.deepcopy(islice(range(100), *args))), 1297 list(range(*args))) 1298 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1299 self.pickletest(proto, islice(range(100), *args)) 1300 1301 # Issue #21321: check source iterator is not referenced 1302 # from islice() after the latter has been exhausted 1303 it = (x for x in (1, 2)) 1304 wr = weakref.ref(it) 1305 it = islice(it, 1) 1306 self.assertIsNotNone(wr()) 1307 list(it) # exhaust the iterator 1308 support.gc_collect() 1309 self.assertIsNone(wr()) 1310 1311 # Issue #30537: islice can accept integer-like objects as 1312 # arguments 1313 class IntLike(object): 1314 def __init__(self, val): 1315 self.val = val 1316 def __index__(self): 1317 return self.val 1318 self.assertEqual(list(islice(range(100), IntLike(10))), list(range(10))) 1319 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50))), 1320 list(range(10, 50))) 1321 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))), 1322 list(range(10,50,5))) 1323 1324 def test_takewhile(self): 1325 data = [1, 3, 5, 20, 2, 4, 6, 8] 1326 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5]) 1327 self.assertEqual(list(takewhile(underten, [])), []) 1328 self.assertRaises(TypeError, takewhile) 1329 self.assertRaises(TypeError, takewhile, operator.pow) 1330 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra') 1331 self.assertRaises(TypeError, next, takewhile(10, [(4,5)])) 1332 self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)])) 1333 t = takewhile(bool, [1, 1, 1, 0, 0, 0]) 1334 self.assertEqual(list(t), [1, 1, 1]) 1335 self.assertRaises(StopIteration, next, t) 1336 1337 # check copy, deepcopy, pickle 1338 self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5]) 1339 self.assertEqual(list(copy.deepcopy(takewhile(underten, data))), 1340 [1, 3, 5]) 1341 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1342 self.pickletest(proto, takewhile(underten, data)) 1343 1344 def test_dropwhile(self): 1345 data = [1, 3, 5, 20, 2, 4, 6, 8] 1346 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8]) 1347 self.assertEqual(list(dropwhile(underten, [])), []) 1348 self.assertRaises(TypeError, dropwhile) 1349 self.assertRaises(TypeError, dropwhile, operator.pow) 1350 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra') 1351 self.assertRaises(TypeError, next, dropwhile(10, [(4,5)])) 1352 self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)])) 1353 1354 # check copy, deepcopy, pickle 1355 self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8]) 1356 self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))), 1357 [20, 2, 4, 6, 8]) 1358 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1359 self.pickletest(proto, dropwhile(underten, data)) 1360 1361 def test_tee(self): 1362 n = 200 1363 1364 a, b = tee([]) # test empty iterator 1365 self.assertEqual(list(a), []) 1366 self.assertEqual(list(b), []) 1367 1368 a, b = tee(irange(n)) # test 100% interleaved 1369 self.assertEqual(lzip(a,b), lzip(range(n), range(n))) 1370 1371 a, b = tee(irange(n)) # test 0% interleaved 1372 self.assertEqual(list(a), list(range(n))) 1373 self.assertEqual(list(b), list(range(n))) 1374 1375 a, b = tee(irange(n)) # test dealloc of leading iterator 1376 for i in range(100): 1377 self.assertEqual(next(a), i) 1378 del a 1379 self.assertEqual(list(b), list(range(n))) 1380 1381 a, b = tee(irange(n)) # test dealloc of trailing iterator 1382 for i in range(100): 1383 self.assertEqual(next(a), i) 1384 del b 1385 self.assertEqual(list(a), list(range(100, n))) 1386 1387 for j in range(5): # test randomly interleaved 1388 order = [0]*n + [1]*n 1389 random.shuffle(order) 1390 lists = ([], []) 1391 its = tee(irange(n)) 1392 for i in order: 1393 value = next(its[i]) 1394 lists[i].append(value) 1395 self.assertEqual(lists[0], list(range(n))) 1396 self.assertEqual(lists[1], list(range(n))) 1397 1398 # test argument format checking 1399 self.assertRaises(TypeError, tee) 1400 self.assertRaises(TypeError, tee, 3) 1401 self.assertRaises(TypeError, tee, [1,2], 'x') 1402 self.assertRaises(TypeError, tee, [1,2], 3, 'x') 1403 1404 # tee object should be instantiable 1405 a, b = tee('abc') 1406 c = type(a)('def') 1407 self.assertEqual(list(c), list('def')) 1408 1409 # test long-lagged and multi-way split 1410 a, b, c = tee(range(2000), 3) 1411 for i in range(100): 1412 self.assertEqual(next(a), i) 1413 self.assertEqual(list(b), list(range(2000))) 1414 self.assertEqual([next(c), next(c)], list(range(2))) 1415 self.assertEqual(list(a), list(range(100,2000))) 1416 self.assertEqual(list(c), list(range(2,2000))) 1417 1418 # test values of n 1419 self.assertRaises(TypeError, tee, 'abc', 'invalid') 1420 self.assertRaises(ValueError, tee, [], -1) 1421 for n in range(5): 1422 result = tee('abc', n) 1423 self.assertEqual(type(result), tuple) 1424 self.assertEqual(len(result), n) 1425 self.assertEqual([list(x) for x in result], [list('abc')]*n) 1426 1427 # tee pass-through to copyable iterator 1428 a, b = tee('abc') 1429 c, d = tee(a) 1430 self.assertTrue(a is c) 1431 1432 # test tee_new 1433 t1, t2 = tee('abc') 1434 tnew = type(t1) 1435 self.assertRaises(TypeError, tnew) 1436 self.assertRaises(TypeError, tnew, 10) 1437 t3 = tnew(t1) 1438 self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc')) 1439 1440 # test that tee objects are weak referencable 1441 a, b = tee(range(10)) 1442 p = weakref.proxy(a) 1443 self.assertEqual(getattr(p, '__class__'), type(b)) 1444 del a 1445 support.gc_collect() # For PyPy or other GCs. 1446 self.assertRaises(ReferenceError, getattr, p, '__class__') 1447 1448 ans = list('abc') 1449 long_ans = list(range(10000)) 1450 1451 # check copy 1452 a, b = tee('abc') 1453 self.assertEqual(list(copy.copy(a)), ans) 1454 self.assertEqual(list(copy.copy(b)), ans) 1455 a, b = tee(list(range(10000))) 1456 self.assertEqual(list(copy.copy(a)), long_ans) 1457 self.assertEqual(list(copy.copy(b)), long_ans) 1458 1459 # check partially consumed copy 1460 a, b = tee('abc') 1461 take(2, a) 1462 take(1, b) 1463 self.assertEqual(list(copy.copy(a)), ans[2:]) 1464 self.assertEqual(list(copy.copy(b)), ans[1:]) 1465 self.assertEqual(list(a), ans[2:]) 1466 self.assertEqual(list(b), ans[1:]) 1467 a, b = tee(range(10000)) 1468 take(100, a) 1469 take(60, b) 1470 self.assertEqual(list(copy.copy(a)), long_ans[100:]) 1471 self.assertEqual(list(copy.copy(b)), long_ans[60:]) 1472 self.assertEqual(list(a), long_ans[100:]) 1473 self.assertEqual(list(b), long_ans[60:]) 1474 1475 # check deepcopy 1476 a, b = tee('abc') 1477 self.assertEqual(list(copy.deepcopy(a)), ans) 1478 self.assertEqual(list(copy.deepcopy(b)), ans) 1479 self.assertEqual(list(a), ans) 1480 self.assertEqual(list(b), ans) 1481 a, b = tee(range(10000)) 1482 self.assertEqual(list(copy.deepcopy(a)), long_ans) 1483 self.assertEqual(list(copy.deepcopy(b)), long_ans) 1484 self.assertEqual(list(a), long_ans) 1485 self.assertEqual(list(b), long_ans) 1486 1487 # check partially consumed deepcopy 1488 a, b = tee('abc') 1489 take(2, a) 1490 take(1, b) 1491 self.assertEqual(list(copy.deepcopy(a)), ans[2:]) 1492 self.assertEqual(list(copy.deepcopy(b)), ans[1:]) 1493 self.assertEqual(list(a), ans[2:]) 1494 self.assertEqual(list(b), ans[1:]) 1495 a, b = tee(range(10000)) 1496 take(100, a) 1497 take(60, b) 1498 self.assertEqual(list(copy.deepcopy(a)), long_ans[100:]) 1499 self.assertEqual(list(copy.deepcopy(b)), long_ans[60:]) 1500 self.assertEqual(list(a), long_ans[100:]) 1501 self.assertEqual(list(b), long_ans[60:]) 1502 1503 # check pickle 1504 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1505 self.pickletest(proto, iter(tee('abc'))) 1506 a, b = tee('abc') 1507 self.pickletest(proto, a, compare=ans) 1508 self.pickletest(proto, b, compare=ans) 1509 1510 # Issue 13454: Crash when deleting backward iterator from tee() 1511 def test_tee_del_backward(self): 1512 forward, backward = tee(repeat(None, 20000000)) 1513 try: 1514 any(forward) # exhaust the iterator 1515 del backward 1516 except: 1517 del forward, backward 1518 raise 1519 1520 def test_tee_reenter(self): 1521 class I: 1522 first = True 1523 def __iter__(self): 1524 return self 1525 def __next__(self): 1526 first = self.first 1527 self.first = False 1528 if first: 1529 return next(b) 1530 1531 a, b = tee(I()) 1532 with self.assertRaisesRegex(RuntimeError, "tee"): 1533 next(a) 1534 1535 def test_tee_concurrent(self): 1536 start = threading.Event() 1537 finish = threading.Event() 1538 class I: 1539 def __iter__(self): 1540 return self 1541 def __next__(self): 1542 start.set() 1543 finish.wait() 1544 1545 a, b = tee(I()) 1546 thread = threading.Thread(target=next, args=[a]) 1547 thread.start() 1548 try: 1549 start.wait() 1550 with self.assertRaisesRegex(RuntimeError, "tee"): 1551 next(b) 1552 finally: 1553 finish.set() 1554 thread.join() 1555 1556 def test_StopIteration(self): 1557 self.assertRaises(StopIteration, next, zip()) 1558 1559 for f in (chain, cycle, zip, groupby): 1560 self.assertRaises(StopIteration, next, f([])) 1561 self.assertRaises(StopIteration, next, f(StopNow())) 1562 1563 self.assertRaises(StopIteration, next, islice([], None)) 1564 self.assertRaises(StopIteration, next, islice(StopNow(), None)) 1565 1566 p, q = tee([]) 1567 self.assertRaises(StopIteration, next, p) 1568 self.assertRaises(StopIteration, next, q) 1569 p, q = tee(StopNow()) 1570 self.assertRaises(StopIteration, next, p) 1571 self.assertRaises(StopIteration, next, q) 1572 1573 self.assertRaises(StopIteration, next, repeat(None, 0)) 1574 1575 for f in (filter, filterfalse, map, takewhile, dropwhile, starmap): 1576 self.assertRaises(StopIteration, next, f(lambda x:x, [])) 1577 self.assertRaises(StopIteration, next, f(lambda x:x, StopNow())) 1578 1579 @support.cpython_only 1580 def test_combinations_result_gc(self): 1581 # bpo-42536: combinations's tuple-reuse speed trick breaks the GC's 1582 # assumptions about what can be untracked. Make sure we re-track result 1583 # tuples whenever we reuse them. 1584 it = combinations([None, []], 1) 1585 next(it) 1586 gc.collect() 1587 # That GC collection probably untracked the recycled internal result 1588 # tuple, which has the value (None,). Make sure it's re-tracked when 1589 # it's mutated and returned from __next__: 1590 self.assertTrue(gc.is_tracked(next(it))) 1591 1592 @support.cpython_only 1593 def test_combinations_with_replacement_result_gc(self): 1594 # Ditto for combinations_with_replacement. 1595 it = combinations_with_replacement([None, []], 1) 1596 next(it) 1597 gc.collect() 1598 self.assertTrue(gc.is_tracked(next(it))) 1599 1600 @support.cpython_only 1601 def test_permutations_result_gc(self): 1602 # Ditto for permutations. 1603 it = permutations([None, []], 1) 1604 next(it) 1605 gc.collect() 1606 self.assertTrue(gc.is_tracked(next(it))) 1607 1608 @support.cpython_only 1609 def test_product_result_gc(self): 1610 # Ditto for product. 1611 it = product([None, []]) 1612 next(it) 1613 gc.collect() 1614 self.assertTrue(gc.is_tracked(next(it))) 1615 1616 @support.cpython_only 1617 def test_zip_longest_result_gc(self): 1618 # Ditto for zip_longest. 1619 it = zip_longest([[]]) 1620 gc.collect() 1621 self.assertTrue(gc.is_tracked(next(it))) 1622 1623 1624class TestExamples(unittest.TestCase): 1625 1626 def test_accumulate(self): 1627 self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15]) 1628 1629 def test_accumulate_reducible(self): 1630 # check copy, deepcopy, pickle 1631 data = [1, 2, 3, 4, 5] 1632 accumulated = [1, 3, 6, 10, 15] 1633 1634 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1635 it = accumulate(data) 1636 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:]) 1637 self.assertEqual(next(it), 1) 1638 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:]) 1639 it = accumulate(data) 1640 self.assertEqual(next(it), 1) 1641 self.assertEqual(list(copy.deepcopy(it)), accumulated[1:]) 1642 self.assertEqual(list(copy.copy(it)), accumulated[1:]) 1643 1644 def test_accumulate_reducible_none(self): 1645 # Issue #25718: total is None 1646 it = accumulate([None, None, None], operator.is_) 1647 self.assertEqual(next(it), None) 1648 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1649 it_copy = pickle.loads(pickle.dumps(it, proto)) 1650 self.assertEqual(list(it_copy), [True, False]) 1651 self.assertEqual(list(copy.deepcopy(it)), [True, False]) 1652 self.assertEqual(list(copy.copy(it)), [True, False]) 1653 1654 def test_chain(self): 1655 self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF') 1656 1657 def test_chain_from_iterable(self): 1658 self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF') 1659 1660 def test_combinations(self): 1661 self.assertEqual(list(combinations('ABCD', 2)), 1662 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 1663 self.assertEqual(list(combinations(range(4), 3)), 1664 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 1665 1666 def test_combinations_with_replacement(self): 1667 self.assertEqual(list(combinations_with_replacement('ABC', 2)), 1668 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 1669 1670 def test_compress(self): 1671 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 1672 1673 def test_count(self): 1674 self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14]) 1675 1676 def test_cycle(self): 1677 self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD')) 1678 1679 def test_dropwhile(self): 1680 self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1]) 1681 1682 def test_groupby(self): 1683 self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')], 1684 list('ABCDAB')) 1685 self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')], 1686 [list('AAAA'), list('BBB'), list('CC'), list('D')]) 1687 1688 def test_filter(self): 1689 self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9]) 1690 1691 def test_filterfalse(self): 1692 self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8]) 1693 1694 def test_map(self): 1695 self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000]) 1696 1697 def test_islice(self): 1698 self.assertEqual(list(islice('ABCDEFG', 2)), list('AB')) 1699 self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD')) 1700 self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG')) 1701 self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1702 1703 def test_zip(self): 1704 self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')]) 1705 1706 def test_zip_longest(self): 1707 self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')), 1708 [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')]) 1709 1710 def test_permutations(self): 1711 self.assertEqual(list(permutations('ABCD', 2)), 1712 list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split()))) 1713 self.assertEqual(list(permutations(range(3))), 1714 [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)]) 1715 1716 def test_product(self): 1717 self.assertEqual(list(product('ABCD', 'xy')), 1718 list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split()))) 1719 self.assertEqual(list(product(range(2), repeat=3)), 1720 [(0,0,0), (0,0,1), (0,1,0), (0,1,1), 1721 (1,0,0), (1,0,1), (1,1,0), (1,1,1)]) 1722 1723 def test_repeat(self): 1724 self.assertEqual(list(repeat(10, 3)), [10, 10, 10]) 1725 1726 def test_stapmap(self): 1727 self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])), 1728 [32, 9, 1000]) 1729 1730 def test_takewhile(self): 1731 self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4]) 1732 1733 1734class TestPurePythonRoughEquivalents(unittest.TestCase): 1735 1736 @staticmethod 1737 def islice(iterable, *args): 1738 s = slice(*args) 1739 start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1 1740 it = iter(range(start, stop, step)) 1741 try: 1742 nexti = next(it) 1743 except StopIteration: 1744 # Consume *iterable* up to the *start* position. 1745 for i, element in zip(range(start), iterable): 1746 pass 1747 return 1748 try: 1749 for i, element in enumerate(iterable): 1750 if i == nexti: 1751 yield element 1752 nexti = next(it) 1753 except StopIteration: 1754 # Consume to *stop*. 1755 for i, element in zip(range(i + 1, stop), iterable): 1756 pass 1757 1758 def test_islice_recipe(self): 1759 self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB')) 1760 self.assertEqual(list(self.islice('ABCDEFG', 2, 4)), list('CD')) 1761 self.assertEqual(list(self.islice('ABCDEFG', 2, None)), list('CDEFG')) 1762 self.assertEqual(list(self.islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1763 # Test items consumed. 1764 it = iter(range(10)) 1765 self.assertEqual(list(self.islice(it, 3)), list(range(3))) 1766 self.assertEqual(list(it), list(range(3, 10))) 1767 it = iter(range(10)) 1768 self.assertEqual(list(self.islice(it, 3, 3)), []) 1769 self.assertEqual(list(it), list(range(3, 10))) 1770 # Test that slice finishes in predictable state. 1771 c = count() 1772 self.assertEqual(list(self.islice(c, 1, 3, 50)), [1]) 1773 self.assertEqual(next(c), 3) 1774 1775 1776class TestGC(unittest.TestCase): 1777 1778 def makecycle(self, iterator, container): 1779 container.append(iterator) 1780 next(iterator) 1781 del container, iterator 1782 1783 def test_accumulate(self): 1784 a = [] 1785 self.makecycle(accumulate([1,2,a,3]), a) 1786 1787 def test_chain(self): 1788 a = [] 1789 self.makecycle(chain(a), a) 1790 1791 def test_chain_from_iterable(self): 1792 a = [] 1793 self.makecycle(chain.from_iterable([a]), a) 1794 1795 def test_combinations(self): 1796 a = [] 1797 self.makecycle(combinations([1,2,a,3], 3), a) 1798 1799 def test_combinations_with_replacement(self): 1800 a = [] 1801 self.makecycle(combinations_with_replacement([1,2,a,3], 3), a) 1802 1803 def test_compress(self): 1804 a = [] 1805 self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a) 1806 1807 def test_count(self): 1808 a = [] 1809 Int = type('Int', (int,), dict(x=a)) 1810 self.makecycle(count(Int(0), Int(1)), a) 1811 1812 def test_cycle(self): 1813 a = [] 1814 self.makecycle(cycle([a]*2), a) 1815 1816 def test_dropwhile(self): 1817 a = [] 1818 self.makecycle(dropwhile(bool, [0, a, a]), a) 1819 1820 def test_groupby(self): 1821 a = [] 1822 self.makecycle(groupby([a]*2, lambda x:x), a) 1823 1824 def test_issue2246(self): 1825 # Issue 2246 -- the _grouper iterator was not included in GC 1826 n = 10 1827 keyfunc = lambda x: x 1828 for i, j in groupby(range(n), key=keyfunc): 1829 keyfunc.__dict__.setdefault('x',[]).append(j) 1830 1831 def test_filter(self): 1832 a = [] 1833 self.makecycle(filter(lambda x:True, [a]*2), a) 1834 1835 def test_filterfalse(self): 1836 a = [] 1837 self.makecycle(filterfalse(lambda x:False, a), a) 1838 1839 def test_zip(self): 1840 a = [] 1841 self.makecycle(zip([a]*2, [a]*3), a) 1842 1843 def test_zip_longest(self): 1844 a = [] 1845 self.makecycle(zip_longest([a]*2, [a]*3), a) 1846 b = [a, None] 1847 self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a) 1848 1849 def test_map(self): 1850 a = [] 1851 self.makecycle(map(lambda x:x, [a]*2), a) 1852 1853 def test_islice(self): 1854 a = [] 1855 self.makecycle(islice([a]*2, None), a) 1856 1857 def test_pairwise(self): 1858 a = [] 1859 self.makecycle(pairwise([a]*5), a) 1860 1861 def test_permutations(self): 1862 a = [] 1863 self.makecycle(permutations([1,2,a,3], 3), a) 1864 1865 def test_product(self): 1866 a = [] 1867 self.makecycle(product([1,2,a,3], repeat=3), a) 1868 1869 def test_repeat(self): 1870 a = [] 1871 self.makecycle(repeat(a), a) 1872 1873 def test_starmap(self): 1874 a = [] 1875 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a) 1876 1877 def test_takewhile(self): 1878 a = [] 1879 self.makecycle(takewhile(bool, [1, 0, a, a]), a) 1880 1881def R(seqn): 1882 'Regular generator' 1883 for i in seqn: 1884 yield i 1885 1886class G: 1887 'Sequence using __getitem__' 1888 def __init__(self, seqn): 1889 self.seqn = seqn 1890 def __getitem__(self, i): 1891 return self.seqn[i] 1892 1893class I: 1894 'Sequence using iterator protocol' 1895 def __init__(self, seqn): 1896 self.seqn = seqn 1897 self.i = 0 1898 def __iter__(self): 1899 return self 1900 def __next__(self): 1901 if self.i >= len(self.seqn): raise StopIteration 1902 v = self.seqn[self.i] 1903 self.i += 1 1904 return v 1905 1906class Ig: 1907 'Sequence using iterator protocol defined with a generator' 1908 def __init__(self, seqn): 1909 self.seqn = seqn 1910 self.i = 0 1911 def __iter__(self): 1912 for val in self.seqn: 1913 yield val 1914 1915class X: 1916 'Missing __getitem__ and __iter__' 1917 def __init__(self, seqn): 1918 self.seqn = seqn 1919 self.i = 0 1920 def __next__(self): 1921 if self.i >= len(self.seqn): raise StopIteration 1922 v = self.seqn[self.i] 1923 self.i += 1 1924 return v 1925 1926class N: 1927 'Iterator missing __next__()' 1928 def __init__(self, seqn): 1929 self.seqn = seqn 1930 self.i = 0 1931 def __iter__(self): 1932 return self 1933 1934class E: 1935 'Test propagation of exceptions' 1936 def __init__(self, seqn): 1937 self.seqn = seqn 1938 self.i = 0 1939 def __iter__(self): 1940 return self 1941 def __next__(self): 1942 3 // 0 1943 1944class S: 1945 'Test immediate stop' 1946 def __init__(self, seqn): 1947 pass 1948 def __iter__(self): 1949 return self 1950 def __next__(self): 1951 raise StopIteration 1952 1953def L(seqn): 1954 'Test multiple tiers of iterators' 1955 return chain(map(lambda x:x, R(Ig(G(seqn))))) 1956 1957 1958class TestVariousIteratorArgs(unittest.TestCase): 1959 1960 def test_accumulate(self): 1961 s = [1,2,3,4,5] 1962 r = [1,3,6,10,15] 1963 n = len(s) 1964 for g in (G, I, Ig, L, R): 1965 self.assertEqual(list(accumulate(g(s))), r) 1966 self.assertEqual(list(accumulate(S(s))), []) 1967 self.assertRaises(TypeError, accumulate, X(s)) 1968 self.assertRaises(TypeError, accumulate, N(s)) 1969 self.assertRaises(ZeroDivisionError, list, accumulate(E(s))) 1970 1971 def test_chain(self): 1972 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1973 for g in (G, I, Ig, S, L, R): 1974 self.assertEqual(list(chain(g(s))), list(g(s))) 1975 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s))) 1976 self.assertRaises(TypeError, list, chain(X(s))) 1977 self.assertRaises(TypeError, list, chain(N(s))) 1978 self.assertRaises(ZeroDivisionError, list, chain(E(s))) 1979 1980 def test_compress(self): 1981 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1982 n = len(s) 1983 for g in (G, I, Ig, S, L, R): 1984 self.assertEqual(list(compress(g(s), repeat(1))), list(g(s))) 1985 self.assertRaises(TypeError, compress, X(s), repeat(1)) 1986 self.assertRaises(TypeError, compress, N(s), repeat(1)) 1987 self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1))) 1988 1989 def test_product(self): 1990 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1991 self.assertRaises(TypeError, product, X(s)) 1992 self.assertRaises(TypeError, product, N(s)) 1993 self.assertRaises(ZeroDivisionError, product, E(s)) 1994 1995 def test_cycle(self): 1996 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 1997 for g in (G, I, Ig, S, L, R): 1998 tgtlen = len(s) * 3 1999 expected = list(g(s))*3 2000 actual = list(islice(cycle(g(s)), tgtlen)) 2001 self.assertEqual(actual, expected) 2002 self.assertRaises(TypeError, cycle, X(s)) 2003 self.assertRaises(TypeError, cycle, N(s)) 2004 self.assertRaises(ZeroDivisionError, list, cycle(E(s))) 2005 2006 def test_groupby(self): 2007 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2008 for g in (G, I, Ig, S, L, R): 2009 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s))) 2010 self.assertRaises(TypeError, groupby, X(s)) 2011 self.assertRaises(TypeError, groupby, N(s)) 2012 self.assertRaises(ZeroDivisionError, list, groupby(E(s))) 2013 2014 def test_filter(self): 2015 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2016 for g in (G, I, Ig, S, L, R): 2017 self.assertEqual(list(filter(isEven, g(s))), 2018 [x for x in g(s) if isEven(x)]) 2019 self.assertRaises(TypeError, filter, isEven, X(s)) 2020 self.assertRaises(TypeError, filter, isEven, N(s)) 2021 self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s))) 2022 2023 def test_filterfalse(self): 2024 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2025 for g in (G, I, Ig, S, L, R): 2026 self.assertEqual(list(filterfalse(isEven, g(s))), 2027 [x for x in g(s) if isOdd(x)]) 2028 self.assertRaises(TypeError, filterfalse, isEven, X(s)) 2029 self.assertRaises(TypeError, filterfalse, isEven, N(s)) 2030 self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s))) 2031 2032 def test_zip(self): 2033 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2034 for g in (G, I, Ig, S, L, R): 2035 self.assertEqual(list(zip(g(s))), lzip(g(s))) 2036 self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s))) 2037 self.assertRaises(TypeError, zip, X(s)) 2038 self.assertRaises(TypeError, zip, N(s)) 2039 self.assertRaises(ZeroDivisionError, list, zip(E(s))) 2040 2041 def test_ziplongest(self): 2042 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2043 for g in (G, I, Ig, S, L, R): 2044 self.assertEqual(list(zip_longest(g(s))), list(zip(g(s)))) 2045 self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s)))) 2046 self.assertRaises(TypeError, zip_longest, X(s)) 2047 self.assertRaises(TypeError, zip_longest, N(s)) 2048 self.assertRaises(ZeroDivisionError, list, zip_longest(E(s))) 2049 2050 def test_map(self): 2051 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2052 for g in (G, I, Ig, S, L, R): 2053 self.assertEqual(list(map(onearg, g(s))), 2054 [onearg(x) for x in g(s)]) 2055 self.assertEqual(list(map(operator.pow, g(s), g(s))), 2056 [x**x for x in g(s)]) 2057 self.assertRaises(TypeError, map, onearg, X(s)) 2058 self.assertRaises(TypeError, map, onearg, N(s)) 2059 self.assertRaises(ZeroDivisionError, list, map(onearg, E(s))) 2060 2061 def test_islice(self): 2062 for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2063 for g in (G, I, Ig, S, L, R): 2064 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2]) 2065 self.assertRaises(TypeError, islice, X(s), 10) 2066 self.assertRaises(TypeError, islice, N(s), 10) 2067 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10)) 2068 2069 def test_pairwise(self): 2070 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2071 for g in (G, I, Ig, S, L, R): 2072 seq = list(g(s)) 2073 expected = list(zip(seq, seq[1:])) 2074 actual = list(pairwise(g(s))) 2075 self.assertEqual(actual, expected) 2076 self.assertRaises(TypeError, pairwise, X(s)) 2077 self.assertRaises(TypeError, pairwise, N(s)) 2078 self.assertRaises(ZeroDivisionError, list, pairwise(E(s))) 2079 2080 def test_starmap(self): 2081 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2082 for g in (G, I, Ig, S, L, R): 2083 ss = lzip(s, s) 2084 self.assertEqual(list(starmap(operator.pow, g(ss))), 2085 [x**x for x in g(s)]) 2086 self.assertRaises(TypeError, starmap, operator.pow, X(ss)) 2087 self.assertRaises(TypeError, starmap, operator.pow, N(ss)) 2088 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss))) 2089 2090 def test_takewhile(self): 2091 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2092 for g in (G, I, Ig, S, L, R): 2093 tgt = [] 2094 for elem in g(s): 2095 if not isEven(elem): break 2096 tgt.append(elem) 2097 self.assertEqual(list(takewhile(isEven, g(s))), tgt) 2098 self.assertRaises(TypeError, takewhile, isEven, X(s)) 2099 self.assertRaises(TypeError, takewhile, isEven, N(s)) 2100 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s))) 2101 2102 def test_dropwhile(self): 2103 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2104 for g in (G, I, Ig, S, L, R): 2105 tgt = [] 2106 for elem in g(s): 2107 if not tgt and isOdd(elem): continue 2108 tgt.append(elem) 2109 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt) 2110 self.assertRaises(TypeError, dropwhile, isOdd, X(s)) 2111 self.assertRaises(TypeError, dropwhile, isOdd, N(s)) 2112 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s))) 2113 2114 def test_tee(self): 2115 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2116 for g in (G, I, Ig, S, L, R): 2117 it1, it2 = tee(g(s)) 2118 self.assertEqual(list(it1), list(g(s))) 2119 self.assertEqual(list(it2), list(g(s))) 2120 self.assertRaises(TypeError, tee, X(s)) 2121 self.assertRaises(TypeError, tee, N(s)) 2122 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0]) 2123 2124class LengthTransparency(unittest.TestCase): 2125 2126 def test_repeat(self): 2127 self.assertEqual(operator.length_hint(repeat(None, 50)), 50) 2128 self.assertEqual(operator.length_hint(repeat(None, 0)), 0) 2129 self.assertEqual(operator.length_hint(repeat(None), 12), 12) 2130 2131 def test_repeat_with_negative_times(self): 2132 self.assertEqual(operator.length_hint(repeat(None, -1)), 0) 2133 self.assertEqual(operator.length_hint(repeat(None, -2)), 0) 2134 self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0) 2135 self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0) 2136 2137class RegressionTests(unittest.TestCase): 2138 2139 def test_sf_793826(self): 2140 # Fix Armin Rigo's successful efforts to wreak havoc 2141 2142 def mutatingtuple(tuple1, f, tuple2): 2143 # this builds a tuple t which is a copy of tuple1, 2144 # then calls f(t), then mutates t to be equal to tuple2 2145 # (needs len(tuple1) == len(tuple2)). 2146 def g(value, first=[1]): 2147 if first: 2148 del first[:] 2149 f(next(z)) 2150 return value 2151 items = list(tuple2) 2152 items[1:1] = list(tuple1) 2153 gen = map(g, items) 2154 z = zip(*[gen]*len(tuple1)) 2155 next(z) 2156 2157 def f(t): 2158 global T 2159 T = t 2160 first[:] = list(T) 2161 2162 first = [] 2163 mutatingtuple((1,2,3), f, (4,5,6)) 2164 second = list(T) 2165 self.assertEqual(first, second) 2166 2167 2168 def test_sf_950057(self): 2169 # Make sure that chain() and cycle() catch exceptions immediately 2170 # rather than when shifting between input sources 2171 2172 def gen1(): 2173 hist.append(0) 2174 yield 1 2175 hist.append(1) 2176 raise AssertionError 2177 hist.append(2) 2178 2179 def gen2(x): 2180 hist.append(3) 2181 yield 2 2182 hist.append(4) 2183 2184 hist = [] 2185 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False))) 2186 self.assertEqual(hist, [0,1]) 2187 2188 hist = [] 2189 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True))) 2190 self.assertEqual(hist, [0,1]) 2191 2192 hist = [] 2193 self.assertRaises(AssertionError, list, cycle(gen1())) 2194 self.assertEqual(hist, [0,1]) 2195 2196 @support.skip_if_pgo_task 2197 def test_long_chain_of_empty_iterables(self): 2198 # Make sure itertools.chain doesn't run into recursion limits when 2199 # dealing with long chains of empty iterables. Even with a high 2200 # number this would probably only fail in Py_DEBUG mode. 2201 it = chain.from_iterable(() for unused in range(10000000)) 2202 with self.assertRaises(StopIteration): 2203 next(it) 2204 2205 def test_issue30347_1(self): 2206 def f(n): 2207 if n == 5: 2208 list(b) 2209 return n != 6 2210 for (k, b) in groupby(range(10), f): 2211 list(b) # shouldn't crash 2212 2213 def test_issue30347_2(self): 2214 class K: 2215 def __init__(self, v): 2216 pass 2217 def __eq__(self, other): 2218 nonlocal i 2219 i += 1 2220 if i == 1: 2221 next(g, None) 2222 return True 2223 i = 0 2224 g = next(groupby(range(10), K))[1] 2225 for j in range(2): 2226 next(g, None) # shouldn't crash 2227 2228 2229class SubclassWithKwargsTest(unittest.TestCase): 2230 def test_keywords_in_subclass(self): 2231 # count is not subclassable... 2232 for cls in (repeat, zip, filter, filterfalse, chain, map, 2233 starmap, islice, takewhile, dropwhile, cycle, compress): 2234 class Subclass(cls): 2235 def __init__(self, newarg=None, *args): 2236 cls.__init__(self, *args) 2237 try: 2238 Subclass(newarg=1) 2239 except TypeError as err: 2240 # we expect type errors because of wrong argument count 2241 self.assertNotIn("keyword arguments", err.args[0]) 2242 2243@support.cpython_only 2244class SizeofTest(unittest.TestCase): 2245 def setUp(self): 2246 self.ssize_t = struct.calcsize('n') 2247 2248 check_sizeof = support.check_sizeof 2249 2250 def test_product_sizeof(self): 2251 basesize = support.calcobjsize('3Pi') 2252 check = self.check_sizeof 2253 check(product('ab', '12'), basesize + 2 * self.ssize_t) 2254 check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t) 2255 2256 def test_combinations_sizeof(self): 2257 basesize = support.calcobjsize('3Pni') 2258 check = self.check_sizeof 2259 check(combinations('abcd', 3), basesize + 3 * self.ssize_t) 2260 check(combinations(range(10), 4), basesize + 4 * self.ssize_t) 2261 2262 def test_combinations_with_replacement_sizeof(self): 2263 cwr = combinations_with_replacement 2264 basesize = support.calcobjsize('3Pni') 2265 check = self.check_sizeof 2266 check(cwr('abcd', 3), basesize + 3 * self.ssize_t) 2267 check(cwr(range(10), 4), basesize + 4 * self.ssize_t) 2268 2269 def test_permutations_sizeof(self): 2270 basesize = support.calcobjsize('4Pni') 2271 check = self.check_sizeof 2272 check(permutations('abcd'), 2273 basesize + 4 * self.ssize_t + 4 * self.ssize_t) 2274 check(permutations('abcd', 3), 2275 basesize + 4 * self.ssize_t + 3 * self.ssize_t) 2276 check(permutations('abcde', 3), 2277 basesize + 5 * self.ssize_t + 3 * self.ssize_t) 2278 check(permutations(range(10), 4), 2279 basesize + 10 * self.ssize_t + 4 * self.ssize_t) 2280 2281 2282libreftest = """ Doctest for examples in the library reference: libitertools.tex 2283 2284 2285>>> amounts = [120.15, 764.05, 823.14] 2286>>> for checknum, amount in zip(count(1200), amounts): 2287... print('Check %d is for $%.2f' % (checknum, amount)) 2288... 2289Check 1200 is for $120.15 2290Check 1201 is for $764.05 2291Check 1202 is for $823.14 2292 2293>>> import operator 2294>>> for cube in map(operator.pow, range(1,4), repeat(3)): 2295... print(cube) 2296... 22971 22988 229927 2300 2301>>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele'] 2302>>> for name in islice(reportlines, 3, None, 2): 2303... print(name.title()) 2304... 2305Alex 2306Laura 2307Martin 2308Walter 2309Samuele 2310 2311>>> from operator import itemgetter 2312>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3) 2313>>> di = sorted(sorted(d.items()), key=itemgetter(1)) 2314>>> for k, g in groupby(di, itemgetter(1)): 2315... print(k, list(map(itemgetter(0), g))) 2316... 23171 ['a', 'c', 'e'] 23182 ['b', 'd', 'f'] 23193 ['g'] 2320 2321# Find runs of consecutive numbers using groupby. The key to the solution 2322# is differencing with a range so that consecutive numbers all appear in 2323# same group. 2324>>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28] 2325>>> for k, g in groupby(enumerate(data), lambda t:t[0]-t[1]): 2326... print(list(map(operator.itemgetter(1), g))) 2327... 2328[1] 2329[4, 5, 6] 2330[10] 2331[15, 16, 17, 18] 2332[22] 2333[25, 26, 27, 28] 2334 2335>>> def take(n, iterable): 2336... "Return first n items of the iterable as a list" 2337... return list(islice(iterable, n)) 2338 2339>>> def prepend(value, iterator): 2340... "Prepend a single value in front of an iterator" 2341... # prepend(1, [2, 3, 4]) -> 1 2 3 4 2342... return chain([value], iterator) 2343 2344>>> def enumerate(iterable, start=0): 2345... return zip(count(start), iterable) 2346 2347>>> def tabulate(function, start=0): 2348... "Return function(0), function(1), ..." 2349... return map(function, count(start)) 2350 2351>>> import collections 2352>>> def consume(iterator, n=None): 2353... "Advance the iterator n-steps ahead. If n is None, consume entirely." 2354... # Use functions that consume iterators at C speed. 2355... if n is None: 2356... # feed the entire iterator into a zero-length deque 2357... collections.deque(iterator, maxlen=0) 2358... else: 2359... # advance to the empty slice starting at position n 2360... next(islice(iterator, n, n), None) 2361 2362>>> def nth(iterable, n, default=None): 2363... "Returns the nth item or a default value" 2364... return next(islice(iterable, n, None), default) 2365 2366>>> def all_equal(iterable): 2367... "Returns True if all the elements are equal to each other" 2368... g = groupby(iterable) 2369... return next(g, True) and not next(g, False) 2370 2371>>> def quantify(iterable, pred=bool): 2372... "Count how many times the predicate is true" 2373... return sum(map(pred, iterable)) 2374 2375>>> def pad_none(iterable): 2376... "Returns the sequence elements and then returns None indefinitely" 2377... return chain(iterable, repeat(None)) 2378 2379>>> def ncycles(iterable, n): 2380... "Returns the sequence elements n times" 2381... return chain(*repeat(iterable, n)) 2382 2383>>> def dotproduct(vec1, vec2): 2384... return sum(map(operator.mul, vec1, vec2)) 2385 2386>>> def flatten(listOfLists): 2387... return list(chain.from_iterable(listOfLists)) 2388 2389>>> def repeatfunc(func, times=None, *args): 2390... "Repeat calls to func with specified arguments." 2391... " Example: repeatfunc(random.random)" 2392... if times is None: 2393... return starmap(func, repeat(args)) 2394... else: 2395... return starmap(func, repeat(args, times)) 2396 2397>>> def triplewise(iterable): 2398... "Return overlapping triplets from an iterable" 2399... # pairwise('ABCDEFG') -> ABC BCD CDE DEF EFG 2400... for (a, _), (b, c) in pairwise(pairwise(iterable)): 2401... yield a, b, c 2402 2403>>> import collections 2404>>> def sliding_window(iterable, n): 2405... # sliding_window('ABCDEFG', 4) -> ABCD BCDE CDEF DEFG 2406... it = iter(iterable) 2407... window = collections.deque(islice(it, n), maxlen=n) 2408... if len(window) == n: 2409... yield tuple(window) 2410... for x in it: 2411... window.append(x) 2412... yield tuple(window) 2413 2414>>> def grouper(n, iterable, fillvalue=None): 2415... "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" 2416... args = [iter(iterable)] * n 2417... return zip_longest(*args, fillvalue=fillvalue) 2418 2419>>> def roundrobin(*iterables): 2420... "roundrobin('ABC', 'D', 'EF') --> A D E B F C" 2421... # Recipe credited to George Sakkis 2422... pending = len(iterables) 2423... nexts = cycle(iter(it).__next__ for it in iterables) 2424... while pending: 2425... try: 2426... for next in nexts: 2427... yield next() 2428... except StopIteration: 2429... pending -= 1 2430... nexts = cycle(islice(nexts, pending)) 2431 2432>>> def partition(pred, iterable): 2433... "Use a predicate to partition entries into false entries and true entries" 2434... # partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9 2435... t1, t2 = tee(iterable) 2436... return filterfalse(pred, t1), filter(pred, t2) 2437 2438>>> def before_and_after(predicate, it): 2439... ''' Variant of takewhile() that allows complete 2440... access to the remainder of the iterator. 2441... 2442... >>> all_upper, remainder = before_and_after(str.isupper, 'ABCdEfGhI') 2443... >>> str.join('', all_upper) 2444... 'ABC' 2445... >>> str.join('', remainder) 2446... 'dEfGhI' 2447... 2448... Note that the first iterator must be fully 2449... consumed before the second iterator can 2450... generate valid results. 2451... ''' 2452... it = iter(it) 2453... transition = [] 2454... def true_iterator(): 2455... for elem in it: 2456... if predicate(elem): 2457... yield elem 2458... else: 2459... transition.append(elem) 2460... return 2461... def remainder_iterator(): 2462... yield from transition 2463... yield from it 2464... return true_iterator(), remainder_iterator() 2465 2466>>> def powerset(iterable): 2467... "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" 2468... s = list(iterable) 2469... return chain.from_iterable(combinations(s, r) for r in range(len(s)+1)) 2470 2471>>> def unique_everseen(iterable, key=None): 2472... "List unique elements, preserving order. Remember all elements ever seen." 2473... # unique_everseen('AAAABBBCCDAABBB') --> A B C D 2474... # unique_everseen('ABBCcAD', str.lower) --> A B C D 2475... seen = set() 2476... seen_add = seen.add 2477... if key is None: 2478... for element in iterable: 2479... if element not in seen: 2480... seen_add(element) 2481... yield element 2482... else: 2483... for element in iterable: 2484... k = key(element) 2485... if k not in seen: 2486... seen_add(k) 2487... yield element 2488 2489>>> def unique_justseen(iterable, key=None): 2490... "List unique elements, preserving order. Remember only the element just seen." 2491... # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B 2492... # unique_justseen('ABBCcAD', str.lower) --> A B C A D 2493... return map(next, map(itemgetter(1), groupby(iterable, key))) 2494 2495>>> def first_true(iterable, default=False, pred=None): 2496... '''Returns the first true value in the iterable. 2497... 2498... If no true value is found, returns *default* 2499... 2500... If *pred* is not None, returns the first item 2501... for which pred(item) is true. 2502... 2503... ''' 2504... # first_true([a,b,c], x) --> a or b or c or x 2505... # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x 2506... return next(filter(pred, iterable), default) 2507 2508>>> def nth_combination(iterable, r, index): 2509... 'Equivalent to list(combinations(iterable, r))[index]' 2510... pool = tuple(iterable) 2511... n = len(pool) 2512... if r < 0 or r > n: 2513... raise ValueError 2514... c = 1 2515... k = min(r, n-r) 2516... for i in range(1, k+1): 2517... c = c * (n - k + i) // i 2518... if index < 0: 2519... index += c 2520... if index < 0 or index >= c: 2521... raise IndexError 2522... result = [] 2523... while r: 2524... c, n, r = c*r//n, n-1, r-1 2525... while index >= c: 2526... index -= c 2527... c, n = c*(n-r)//n, n-1 2528... result.append(pool[-1-n]) 2529... return tuple(result) 2530 2531 2532This is not part of the examples but it tests to make sure the definitions 2533perform as purported. 2534 2535>>> take(10, count()) 2536[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2537 2538>>> list(prepend(1, [2, 3, 4])) 2539[1, 2, 3, 4] 2540 2541>>> list(enumerate('abc')) 2542[(0, 'a'), (1, 'b'), (2, 'c')] 2543 2544>>> list(islice(tabulate(lambda x: 2*x), 4)) 2545[0, 2, 4, 6] 2546 2547>>> it = iter(range(10)) 2548>>> consume(it, 3) 2549>>> next(it) 25503 2551>>> consume(it) 2552>>> next(it, 'Done') 2553'Done' 2554 2555>>> nth('abcde', 3) 2556'd' 2557 2558>>> nth('abcde', 9) is None 2559True 2560 2561>>> [all_equal(s) for s in ('', 'A', 'AAAA', 'AAAB', 'AAABA')] 2562[True, True, True, False, False] 2563 2564>>> quantify(range(99), lambda x: x%2==0) 256550 2566 2567>>> a = [[1, 2, 3], [4, 5, 6]] 2568>>> flatten(a) 2569[1, 2, 3, 4, 5, 6] 2570 2571>>> list(repeatfunc(pow, 5, 2, 3)) 2572[8, 8, 8, 8, 8] 2573 2574>>> import random 2575>>> take(5, map(int, repeatfunc(random.random))) 2576[0, 0, 0, 0, 0] 2577 2578>>> list(islice(pad_none('abc'), 0, 6)) 2579['a', 'b', 'c', None, None, None] 2580 2581>>> list(ncycles('abc', 3)) 2582['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'] 2583 2584>>> dotproduct([1,2,3], [4,5,6]) 258532 2586 2587>>> list(grouper(3, 'abcdefg', 'x')) 2588[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'x', 'x')] 2589 2590>>> list(triplewise('ABCDEFG')) 2591[('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E'), ('D', 'E', 'F'), ('E', 'F', 'G')] 2592 2593>>> list(sliding_window('ABCDEFG', 4)) 2594[('A', 'B', 'C', 'D'), ('B', 'C', 'D', 'E'), ('C', 'D', 'E', 'F'), ('D', 'E', 'F', 'G')] 2595 2596>>> list(roundrobin('abc', 'd', 'ef')) 2597['a', 'd', 'e', 'b', 'f', 'c'] 2598 2599>>> def is_odd(x): 2600... return x % 2 == 1 2601 2602>>> evens, odds = partition(is_odd, range(10)) 2603>>> list(evens) 2604[0, 2, 4, 6, 8] 2605>>> list(odds) 2606[1, 3, 5, 7, 9] 2607 2608>>> it = iter('ABCdEfGhI') 2609>>> all_upper, remainder = before_and_after(str.isupper, it) 2610>>> ''.join(all_upper) 2611'ABC' 2612>>> ''.join(remainder) 2613'dEfGhI' 2614 2615>>> list(powerset([1,2,3])) 2616[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)] 2617 2618>>> all(len(list(powerset(range(n)))) == 2**n for n in range(18)) 2619True 2620 2621>>> list(powerset('abcde')) == sorted(sorted(set(powerset('abcde'))), key=len) 2622True 2623 2624>>> list(unique_everseen('AAAABBBCCDAABBB')) 2625['A', 'B', 'C', 'D'] 2626 2627>>> list(unique_everseen('ABBCcAD', str.lower)) 2628['A', 'B', 'C', 'D'] 2629 2630>>> list(unique_justseen('AAAABBBCCDAABBB')) 2631['A', 'B', 'C', 'D', 'A', 'B'] 2632 2633>>> list(unique_justseen('ABBCcAD', str.lower)) 2634['A', 'B', 'C', 'A', 'D'] 2635 2636>>> first_true('ABC0DEF1', '9', str.isdigit) 2637'0' 2638 2639>>> population = 'ABCDEFGH' 2640>>> for r in range(len(population) + 1): 2641... seq = list(combinations(population, r)) 2642... for i in range(len(seq)): 2643... assert nth_combination(population, r, i) == seq[i] 2644... for i in range(-len(seq), 0): 2645... assert nth_combination(population, r, i) == seq[i] 2646 2647 2648""" 2649 2650__test__ = {'libreftest' : libreftest} 2651 2652def test_main(verbose=None): 2653 test_classes = (TestBasicOps, TestVariousIteratorArgs, TestGC, 2654 RegressionTests, LengthTransparency, 2655 SubclassWithKwargsTest, TestExamples, 2656 TestPurePythonRoughEquivalents, 2657 SizeofTest) 2658 support.run_unittest(*test_classes) 2659 2660 # verify reference counting 2661 if verbose and hasattr(sys, "gettotalrefcount"): 2662 import gc 2663 counts = [None] * 5 2664 for i in range(len(counts)): 2665 support.run_unittest(*test_classes) 2666 gc.collect() 2667 counts[i] = sys.gettotalrefcount() 2668 print(counts) 2669 2670 # doctest the examples in the library reference 2671 support.run_doctest(sys.modules[__name__], verbose) 2672 2673if __name__ == "__main__": 2674 test_main(verbose=True) 2675