1# Defines classes that provide synchronization objects. Note that use of 2# this module requires that your Python support threads. 3# 4# condition(lock=None) # a POSIX-like condition-variable object 5# barrier(n) # an n-thread barrier 6# event() # an event object 7# semaphore(n=1) # a semaphore object, with initial count n 8# mrsw() # a multiple-reader single-writer lock 9# 10# CONDITIONS 11# 12# A condition object is created via 13# import this_module 14# your_condition_object = this_module.condition(lock=None) 15# 16# As explained below, a condition object has a lock associated with it, 17# used in the protocol to protect condition data. You can specify a 18# lock to use in the constructor, else the constructor will allocate 19# an anonymous lock for you. Specifying a lock explicitly can be useful 20# when more than one condition keys off the same set of shared data. 21# 22# Methods: 23# .acquire() 24# acquire the lock associated with the condition 25# .release() 26# release the lock associated with the condition 27# .wait() 28# block the thread until such time as some other thread does a 29# .signal or .broadcast on the same condition, and release the 30# lock associated with the condition. The lock associated with 31# the condition MUST be in the acquired state at the time 32# .wait is invoked. 33# .signal() 34# wake up exactly one thread (if any) that previously did a .wait 35# on the condition; that thread will awaken with the lock associated 36# with the condition in the acquired state. If no threads are 37# .wait'ing, this is a nop. If more than one thread is .wait'ing on 38# the condition, any of them may be awakened. 39# .broadcast() 40# wake up all threads (if any) that are .wait'ing on the condition; 41# the threads are woken up serially, each with the lock in the 42# acquired state, so should .release() as soon as possible. If no 43# threads are .wait'ing, this is a nop. 44# 45# Note that if a thread does a .wait *while* a signal/broadcast is 46# in progress, it's guaranteeed to block until a subsequent 47# signal/broadcast. 48# 49# Secret feature: `broadcast' actually takes an integer argument, 50# and will wake up exactly that many waiting threads (or the total 51# number waiting, if that's less). Use of this is dubious, though, 52# and probably won't be supported if this form of condition is 53# reimplemented in C. 54# 55# DIFFERENCES FROM POSIX 56# 57# + A separate mutex is not needed to guard condition data. Instead, a 58# condition object can (must) be .acquire'ed and .release'ed directly. 59# This eliminates a common error in using POSIX conditions. 60# 61# + Because of implementation difficulties, a POSIX `signal' wakes up 62# _at least_ one .wait'ing thread. Race conditions make it difficult 63# to stop that. This implementation guarantees to wake up only one, 64# but you probably shouldn't rely on that. 65# 66# PROTOCOL 67# 68# Condition objects are used to block threads until "some condition" is 69# true. E.g., a thread may wish to wait until a producer pumps out data 70# for it to consume, or a server may wish to wait until someone requests 71# its services, or perhaps a whole bunch of threads want to wait until a 72# preceding pass over the data is complete. Early models for conditions 73# relied on some other thread figuring out when a blocked thread's 74# condition was true, and made the other thread responsible both for 75# waking up the blocked thread and guaranteeing that it woke up with all 76# data in a correct state. This proved to be very delicate in practice, 77# and gave conditions a bad name in some circles. 78# 79# The POSIX model addresses these problems by making a thread responsible 80# for ensuring that its own state is correct when it wakes, and relies 81# on a rigid protocol to make this easy; so long as you stick to the 82# protocol, POSIX conditions are easy to "get right": 83# 84# A) The thread that's waiting for some arbitrarily-complex condition 85# (ACC) to become true does: 86# 87# condition.acquire() 88# while not (code to evaluate the ACC): 89# condition.wait() 90# # That blocks the thread, *and* releases the lock. When a 91# # condition.signal() happens, it will wake up some thread that 92# # did a .wait, *and* acquire the lock again before .wait 93# # returns. 94# # 95# # Because the lock is acquired at this point, the state used 96# # in evaluating the ACC is frozen, so it's safe to go back & 97# # reevaluate the ACC. 98# 99# # At this point, ACC is true, and the thread has the condition 100# # locked. 101# # So code here can safely muck with the shared state that 102# # went into evaluating the ACC -- if it wants to. 103# # When done mucking with the shared state, do 104# condition.release() 105# 106# B) Threads that are mucking with shared state that may affect the 107# ACC do: 108# 109# condition.acquire() 110# # muck with shared state 111# condition.release() 112# if it's possible that ACC is true now: 113# condition.signal() # or .broadcast() 114# 115# Note: You may prefer to put the "if" clause before the release(). 116# That's fine, but do note that anyone waiting on the signal will 117# stay blocked until the release() is done (since acquiring the 118# condition is part of what .wait() does before it returns). 119# 120# TRICK OF THE TRADE 121# 122# With simpler forms of conditions, it can be impossible to know when 123# a thread that's supposed to do a .wait has actually done it. But 124# because this form of condition releases a lock as _part_ of doing a 125# wait, the state of that lock can be used to guarantee it. 126# 127# E.g., suppose thread A spawns thread B and later wants to wait for B to 128# complete: 129# 130# In A: In B: 131# 132# B_done = condition() ... do work ... 133# B_done.acquire() B_done.acquire(); B_done.release() 134# spawn B B_done.signal() 135# ... some time later ... ... and B exits ... 136# B_done.wait() 137# 138# Because B_done was in the acquire'd state at the time B was spawned, 139# B's attempt to acquire B_done can't succeed until A has done its 140# B_done.wait() (which releases B_done). So B's B_done.signal() is 141# guaranteed to be seen by the .wait(). Without the lock trick, B 142# may signal before A .waits, and then A would wait forever. 143# 144# BARRIERS 145# 146# A barrier object is created via 147# import this_module 148# your_barrier = this_module.barrier(num_threads) 149# 150# Methods: 151# .enter() 152# the thread blocks until num_threads threads in all have done 153# .enter(). Then the num_threads threads that .enter'ed resume, 154# and the barrier resets to capture the next num_threads threads 155# that .enter it. 156# 157# EVENTS 158# 159# An event object is created via 160# import this_module 161# your_event = this_module.event() 162# 163# An event has two states, `posted' and `cleared'. An event is 164# created in the cleared state. 165# 166# Methods: 167# 168# .post() 169# Put the event in the posted state, and resume all threads 170# .wait'ing on the event (if any). 171# 172# .clear() 173# Put the event in the cleared state. 174# 175# .is_posted() 176# Returns 0 if the event is in the cleared state, or 1 if the event 177# is in the posted state. 178# 179# .wait() 180# If the event is in the posted state, returns immediately. 181# If the event is in the cleared state, blocks the calling thread 182# until the event is .post'ed by another thread. 183# 184# Note that an event, once posted, remains posted until explicitly 185# cleared. Relative to conditions, this is both the strength & weakness 186# of events. It's a strength because the .post'ing thread doesn't have to 187# worry about whether the threads it's trying to communicate with have 188# already done a .wait (a condition .signal is seen only by threads that 189# do a .wait _prior_ to the .signal; a .signal does not persist). But 190# it's a weakness because .clear'ing an event is error-prone: it's easy 191# to mistakenly .clear an event before all the threads you intended to 192# see the event get around to .wait'ing on it. But so long as you don't 193# need to .clear an event, events are easy to use safely. 194# 195# SEMAPHORES 196# 197# A semaphore object is created via 198# import this_module 199# your_semaphore = this_module.semaphore(count=1) 200# 201# A semaphore has an integer count associated with it. The initial value 202# of the count is specified by the optional argument (which defaults to 203# 1) passed to the semaphore constructor. 204# 205# Methods: 206# 207# .p() 208# If the semaphore's count is greater than 0, decrements the count 209# by 1 and returns. 210# Else if the semaphore's count is 0, blocks the calling thread 211# until a subsequent .v() increases the count. When that happens, 212# the count will be decremented by 1 and the calling thread resumed. 213# 214# .v() 215# Increments the semaphore's count by 1, and wakes up a thread (if 216# any) blocked by a .p(). It's an (detected) error for a .v() to 217# increase the semaphore's count to a value larger than the initial 218# count. 219# 220# MULTIPLE-READER SINGLE-WRITER LOCKS 221# 222# A mrsw lock is created via 223# import this_module 224# your_mrsw_lock = this_module.mrsw() 225# 226# This kind of lock is often useful with complex shared data structures. 227# The object lets any number of "readers" proceed, so long as no thread 228# wishes to "write". When a (one or more) thread declares its intention 229# to "write" (e.g., to update a shared structure), all current readers 230# are allowed to finish, and then a writer gets exclusive access; all 231# other readers & writers are blocked until the current writer completes. 232# Finally, if some thread is waiting to write and another is waiting to 233# read, the writer takes precedence. 234# 235# Methods: 236# 237# .read_in() 238# If no thread is writing or waiting to write, returns immediately. 239# Else blocks until no thread is writing or waiting to write. So 240# long as some thread has completed a .read_in but not a .read_out, 241# writers are blocked. 242# 243# .read_out() 244# Use sometime after a .read_in to declare that the thread is done 245# reading. When all threads complete reading, a writer can proceed. 246# 247# .write_in() 248# If no thread is writing (has completed a .write_in, but hasn't yet 249# done a .write_out) or reading (similarly), returns immediately. 250# Else blocks the calling thread, and threads waiting to read, until 251# the current writer completes writing or all the current readers 252# complete reading; if then more than one thread is waiting to 253# write, one of them is allowed to proceed, but which one is not 254# specified. 255# 256# .write_out() 257# Use sometime after a .write_in to declare that the thread is done 258# writing. Then if some other thread is waiting to write, it's 259# allowed to proceed. Else all threads (if any) waiting to read are 260# allowed to proceed. 261# 262# .write_to_read() 263# Use instead of a .write_in to declare that the thread is done 264# writing but wants to continue reading without other writers 265# intervening. If there are other threads waiting to write, they 266# are allowed to proceed only if the current thread calls 267# .read_out; threads waiting to read are only allowed to proceed 268# if there are no threads waiting to write. (This is a 269# weakness of the interface!) 270 271import thread 272 273class condition: 274 def __init__(self, lock=None): 275 # the lock actually used by .acquire() and .release() 276 if lock is None: 277 self.mutex = thread.allocate_lock() 278 else: 279 if hasattr(lock, 'acquire') and \ 280 hasattr(lock, 'release'): 281 self.mutex = lock 282 else: 283 raise TypeError, 'condition constructor requires ' \ 284 'a lock argument' 285 286 # lock used to block threads until a signal 287 self.checkout = thread.allocate_lock() 288 self.checkout.acquire() 289 290 # internal critical-section lock, & the data it protects 291 self.idlock = thread.allocate_lock() 292 self.id = 0 293 self.waiting = 0 # num waiters subject to current release 294 self.pending = 0 # num waiters awaiting next signal 295 self.torelease = 0 # num waiters to release 296 self.releasing = 0 # 1 iff release is in progress 297 298 def acquire(self): 299 self.mutex.acquire() 300 301 def release(self): 302 self.mutex.release() 303 304 def wait(self): 305 mutex, checkout, idlock = self.mutex, self.checkout, self.idlock 306 if not mutex.locked(): 307 raise ValueError, \ 308 "condition must be .acquire'd when .wait() invoked" 309 310 idlock.acquire() 311 myid = self.id 312 self.pending = self.pending + 1 313 idlock.release() 314 315 mutex.release() 316 317 while 1: 318 checkout.acquire(); idlock.acquire() 319 if myid < self.id: 320 break 321 checkout.release(); idlock.release() 322 323 self.waiting = self.waiting - 1 324 self.torelease = self.torelease - 1 325 if self.torelease: 326 checkout.release() 327 else: 328 self.releasing = 0 329 if self.waiting == self.pending == 0: 330 self.id = 0 331 idlock.release() 332 mutex.acquire() 333 334 def signal(self): 335 self.broadcast(1) 336 337 def broadcast(self, num = -1): 338 if num < -1: 339 raise ValueError, '.broadcast called with num %r' % (num,) 340 if num == 0: 341 return 342 self.idlock.acquire() 343 if self.pending: 344 self.waiting = self.waiting + self.pending 345 self.pending = 0 346 self.id = self.id + 1 347 if num == -1: 348 self.torelease = self.waiting 349 else: 350 self.torelease = min( self.waiting, 351 self.torelease + num ) 352 if self.torelease and not self.releasing: 353 self.releasing = 1 354 self.checkout.release() 355 self.idlock.release() 356 357class barrier: 358 def __init__(self, n): 359 self.n = n 360 self.togo = n 361 self.full = condition() 362 363 def enter(self): 364 full = self.full 365 full.acquire() 366 self.togo = self.togo - 1 367 if self.togo: 368 full.wait() 369 else: 370 self.togo = self.n 371 full.broadcast() 372 full.release() 373 374class event: 375 def __init__(self): 376 self.state = 0 377 self.posted = condition() 378 379 def post(self): 380 self.posted.acquire() 381 self.state = 1 382 self.posted.broadcast() 383 self.posted.release() 384 385 def clear(self): 386 self.posted.acquire() 387 self.state = 0 388 self.posted.release() 389 390 def is_posted(self): 391 self.posted.acquire() 392 answer = self.state 393 self.posted.release() 394 return answer 395 396 def wait(self): 397 self.posted.acquire() 398 if not self.state: 399 self.posted.wait() 400 self.posted.release() 401 402class semaphore: 403 def __init__(self, count=1): 404 if count <= 0: 405 raise ValueError, 'semaphore count %d; must be >= 1' % count 406 self.count = count 407 self.maxcount = count 408 self.nonzero = condition() 409 410 def p(self): 411 self.nonzero.acquire() 412 while self.count == 0: 413 self.nonzero.wait() 414 self.count = self.count - 1 415 self.nonzero.release() 416 417 def v(self): 418 self.nonzero.acquire() 419 if self.count == self.maxcount: 420 raise ValueError, '.v() tried to raise semaphore count above ' \ 421 'initial value %r' % self.maxcount 422 self.count = self.count + 1 423 self.nonzero.signal() 424 self.nonzero.release() 425 426class mrsw: 427 def __init__(self): 428 # critical-section lock & the data it protects 429 self.rwOK = thread.allocate_lock() 430 self.nr = 0 # number readers actively reading (not just waiting) 431 self.nw = 0 # number writers either waiting to write or writing 432 self.writing = 0 # 1 iff some thread is writing 433 434 # conditions 435 self.readOK = condition(self.rwOK) # OK to unblock readers 436 self.writeOK = condition(self.rwOK) # OK to unblock writers 437 438 def read_in(self): 439 self.rwOK.acquire() 440 while self.nw: 441 self.readOK.wait() 442 self.nr = self.nr + 1 443 self.rwOK.release() 444 445 def read_out(self): 446 self.rwOK.acquire() 447 if self.nr <= 0: 448 raise ValueError, \ 449 '.read_out() invoked without an active reader' 450 self.nr = self.nr - 1 451 if self.nr == 0: 452 self.writeOK.signal() 453 self.rwOK.release() 454 455 def write_in(self): 456 self.rwOK.acquire() 457 self.nw = self.nw + 1 458 while self.writing or self.nr: 459 self.writeOK.wait() 460 self.writing = 1 461 self.rwOK.release() 462 463 def write_out(self): 464 self.rwOK.acquire() 465 if not self.writing: 466 raise ValueError, \ 467 '.write_out() invoked without an active writer' 468 self.writing = 0 469 self.nw = self.nw - 1 470 if self.nw: 471 self.writeOK.signal() 472 else: 473 self.readOK.broadcast() 474 self.rwOK.release() 475 476 def write_to_read(self): 477 self.rwOK.acquire() 478 if not self.writing: 479 raise ValueError, \ 480 '.write_to_read() invoked without an active writer' 481 self.writing = 0 482 self.nw = self.nw - 1 483 self.nr = self.nr + 1 484 if not self.nw: 485 self.readOK.broadcast() 486 self.rwOK.release() 487 488# The rest of the file is a test case, that runs a number of parallelized 489# quicksorts in parallel. If it works, you'll get about 600 lines of 490# tracing output, with a line like 491# test passed! 209 threads created in all 492# as the last line. The content and order of preceding lines will 493# vary across runs. 494 495def _new_thread(func, *args): 496 global TID 497 tid.acquire(); id = TID = TID+1; tid.release() 498 io.acquire(); alive.append(id); \ 499 print 'starting thread', id, '--', len(alive), 'alive'; \ 500 io.release() 501 thread.start_new_thread( func, (id,) + args ) 502 503def _qsort(tid, a, l, r, finished): 504 # sort a[l:r]; post finished when done 505 io.acquire(); print 'thread', tid, 'qsort', l, r; io.release() 506 if r-l > 1: 507 pivot = a[l] 508 j = l+1 # make a[l:j] <= pivot, and a[j:r] > pivot 509 for i in range(j, r): 510 if a[i] <= pivot: 511 a[j], a[i] = a[i], a[j] 512 j = j + 1 513 a[l], a[j-1] = a[j-1], pivot 514 515 l_subarray_sorted = event() 516 r_subarray_sorted = event() 517 _new_thread(_qsort, a, l, j-1, l_subarray_sorted) 518 _new_thread(_qsort, a, j, r, r_subarray_sorted) 519 l_subarray_sorted.wait() 520 r_subarray_sorted.wait() 521 522 io.acquire(); print 'thread', tid, 'qsort done'; \ 523 alive.remove(tid); io.release() 524 finished.post() 525 526def _randarray(tid, a, finished): 527 io.acquire(); print 'thread', tid, 'randomizing array'; \ 528 io.release() 529 for i in range(1, len(a)): 530 wh.acquire(); j = randint(0,i); wh.release() 531 a[i], a[j] = a[j], a[i] 532 io.acquire(); print 'thread', tid, 'randomizing done'; \ 533 alive.remove(tid); io.release() 534 finished.post() 535 536def _check_sort(a): 537 if a != range(len(a)): 538 raise ValueError, ('a not sorted', a) 539 540def _run_one_sort(tid, a, bar, done): 541 # randomize a, and quicksort it 542 # for variety, all the threads running this enter a barrier 543 # at the end, and post `done' after the barrier exits 544 io.acquire(); print 'thread', tid, 'randomizing', a; \ 545 io.release() 546 finished = event() 547 _new_thread(_randarray, a, finished) 548 finished.wait() 549 550 io.acquire(); print 'thread', tid, 'sorting', a; io.release() 551 finished.clear() 552 _new_thread(_qsort, a, 0, len(a), finished) 553 finished.wait() 554 _check_sort(a) 555 556 io.acquire(); print 'thread', tid, 'entering barrier'; \ 557 io.release() 558 bar.enter() 559 io.acquire(); print 'thread', tid, 'leaving barrier'; \ 560 io.release() 561 io.acquire(); alive.remove(tid); io.release() 562 bar.enter() # make sure they've all removed themselves from alive 563 ## before 'done' is posted 564 bar.enter() # just to be cruel 565 done.post() 566 567def test(): 568 global TID, tid, io, wh, randint, alive 569 import random 570 randint = random.randint 571 572 TID = 0 # thread ID (1, 2, ...) 573 tid = thread.allocate_lock() # for changing TID 574 io = thread.allocate_lock() # for printing, and 'alive' 575 wh = thread.allocate_lock() # for calls to random 576 alive = [] # IDs of active threads 577 578 NSORTS = 5 579 arrays = [] 580 for i in range(NSORTS): 581 arrays.append( range( (i+1)*10 ) ) 582 583 bar = barrier(NSORTS) 584 finished = event() 585 for i in range(NSORTS): 586 _new_thread(_run_one_sort, arrays[i], bar, finished) 587 finished.wait() 588 589 print 'all threads done, and checking results ...' 590 if alive: 591 raise ValueError, ('threads still alive at end', alive) 592 for i in range(NSORTS): 593 a = arrays[i] 594 if len(a) != (i+1)*10: 595 raise ValueError, ('length of array', i, 'screwed up') 596 _check_sort(a) 597 598 print 'test passed!', TID, 'threads created in all' 599 600if __name__ == '__main__': 601 test() 602 603# end of module 604