# barrier2.py import time import threading import thread class condition: # provides control for barrier def __init__(self): # the lock actually used by .acquire() and .release() self.mutex = thread.allocate_lock() # lock used to block threads until a signal self.checkout = thread.allocate_lock() self.checkout.acquire() # internal critical-section lock, & the data it protects self.idlock = thread.allocate_lock() self.id = 0 self.waiting = 0 # num waiters subject to current release self.pending = 0 # num waiters awaiting next signal self.torelease = 0 # num waiters to release self.releasing = 0 # 1 iff release is in progress def acquire(self): self.mutex.acquire() def release(self): self.mutex.release() def wait(self): mutex, checkout, idlock = self.mutex, self.checkout, self.idlock if not mutex.locked(): raise ValueError, \ "condition must be .acquire'd when .wait() invoked" idlock.acquire() myid = self.id self.pending = self.pending + 1 idlock.release() mutex.release() while 1: checkout.acquire(); idlock.acquire() if myid < self.id: break checkout.release(); idlock.release() self.waiting = self.waiting - 1 self.torelease = self.torelease - 1 if self.torelease: checkout.release() else: self.releasing = 0 if self.waiting == self.pending == 0: self.id = 0 idlock.release() mutex.acquire() def signal(self): self.broadcast(1) def broadcast(self, num = -1): if num < -1: raise ValueError, '.broadcast called with num ' + `num` if num == 0: return self.idlock.acquire() if self.pending: self.waiting = self.waiting + self.pending self.pending = 0 self.id = self.id + 1 if num == -1: self.torelease = self.waiting else: self.torelease = min( self.waiting, self.torelease + num ) if self.torelease and not self.releasing: self.releasing = 1 self.checkout.release() self.idlock.release() class barrier: def __init__(self, n): self.n = n self.togo = n self.full = condition() def wait(self): full = self.full full.acquire() self.togo = self.togo - 1 if self.togo: full.wait() else: self.togo = self.n full.broadcast() full.release() # setup for master control global N global Njob global bar N = 4 Njob = 2 print "barrier2.py running, N=", print N, print ", Njob=", print Njob bar=barrier(N+1) # N workers and one master class Worker(threading.Thread): # worker initialization def __init__ (self,myid): self.myid=myid threading.Thread.__init__(self) print "worker myid=", print self.myid, print " init, started" def run(self): # worker running for job in range(0,Njob): print "worker myid=", print self.myid, print ", job=", print job print " running, at first barrier" bar.wait() #1a master runs bar.wait() #1b master completed print "worker myid=", print self.myid, print ", job=", print job print " running, at second barrier" bar.wait() #2a master runs bar.wait() #2b master completed # end job loop print "worker myid=", print self.myid, print " ending" bar.wait() #3a master waits for all tasks to end, join # master control for i in range(1,N+1): # start N threads Worker(i).start() for job in range(0,Njob): bar.wait() #1a print "master at first barrier, job ", print job bar.wait() #1b bar.wait() #2a print "master at second barrier, job ", print job bar.wait() #2b # end job loop - must be exactly number of iterations as worker bar.wait() #3a print "master ending"