# # Copyright 2008 Google Inc. # Released under the GPLv2 import threading, Queue class ThreadPool: """ A generic threading class for use in the CLI ThreadPool class takes the function to be executed as an argument and optionally number of threads. It then creates multiple threads for faster execution. """ def __init__(self, function, numthreads=40): assert(numthreads > 0) self.threads = Queue.Queue(0) self.function = function self.numthreads = 0 self.queue = Queue.Queue(0) self._start_threads(numthreads) def wait(self): """ Checks to see if any threads are still working and blocks until worker threads all complete. """ for x in xrange(self.numthreads): self.queue.put('die') # As only spawned threads are allowed to add new ones, # we can safely wait for the thread queue to be empty # (if we're at the last thread and it creates a new one, # it will get queued before it finishes). dead = 0 while True: try: thread = self.threads.get(block=True, timeout=1) if thread.isAlive(): thread.join() dead += 1 except Queue.Empty: assert(dead == self.numthreads) return def queue_work(self, data): """ Takes a list of items and appends them to the work queue. """ [self.queue.put(item) for item in data] def add_one_thread_post_wait(self): # Only a spawned thread (not the main one) # should call this (see wait() for details) self._start_threads(1) self.queue.put('die') def _start_threads(self, nthreads): """ Start up threads to spawn workers. """ self.numthreads += nthreads for i in xrange(nthreads): thread = threading.Thread(target=self._new_worker) thread.setDaemon(True) self.threads.put(thread) thread.start() def _new_worker(self): """ Spawned worker threads. These threads loop until queue is empty.""" while True: # Blocking call data = self.queue.get() if data == 'die': return try: self.function(data) except Exception, full_error: # Put a catch all here. print ('Unexpected failure in the thread calling %s: %s' % (self.function.__name__, full_error))