Package pysys :: Package utils :: Module threadpool
[hide private]
[frames] | no frames]

Source Code for Module pysys.utils.threadpool

  1  #!/usr/bin/env python 
  2  # PySys System Test Framework, Copyright (C) 2006-2016  M.B.Grieve 
  3   
  4  # This library is free software; you can redistribute it and/or 
  5  # modify it under the terms of the GNU Lesser General Public 
  6  # License as published by the Free Software Foundation; either 
  7  # version 2.1 of the License, or (at your option) any later version. 
  8   
  9  # This library is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 12  # Lesser General Public License for more details. 
 13   
 14  # You should have received a copy of the GNU Lesser General Public 
 15  # License along with this library; if not, write to the Free Software 
 16  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 17   
 18  # Contact: moraygrieve@users.sourceforge.net 
 19    
 20  # Note that the threadpool implementation is based from that proposed 
 21  # by Christopher Arndt (http://chrisarndt.de/en/software/python/threadpool/) 
 22  # with minor modifications. 
 23    
 24  import sys, time, logging, thread, threading, Queue, traceback 
 25   
 26  from pysys import log 
 27   
 28   
 29  # exceptions 
30 -class NoResultsPending(Exception):
31 """All work requests have been processed.""" 32 pass
33 34
35 -class NoWorkersAvailable(Exception):
36 """No worker threads available to process remaining requests.""" 37 pass
38 39 40 # internal module helper functions
41 -def _handle_thread_exception(request, exc_info):
42 """Default exception handler callback function. 43 44 This just prints the exception info via ``traceback.print_exception``. 45 46 """ 47 traceback.print_exception(*exc_info)
48 49 50
51 -class WorkerThread(threading.Thread):
52 """Thread to perform work requests managed by the thread pool object. 53 54 The thread polls the thread safe queue of the thread pool instance to retrieve 55 work requests in the form of a callable reference with parameters. On completion 56 of a work request the thread places the results on another thread safe queue of the 57 thread pool and waits to get a new request. 58 59 """ 60
61 - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
62 """Class constructor. 63 64 @param requests_queue: Reference to the threadpool's request queue 65 @param results_queue: Reference to the threadpool's results queue 66 @param poll_timeout: The timeout when trying to obtain a request from the request queue 67 @param kwds: Variable arguments to be passed to the threading.Thread constructor 68 69 """ 70 threading.Thread.__init__(self, **kwds) 71 log.info("[%s] Creating thread for test execution" % self.getName()) 72 self.setDaemon(1) 73 self._requests_queue = requests_queue 74 self._results_queue = results_queue 75 self._poll_timeout = poll_timeout 76 self._dismissed = threading.Event() 77 self.start()
78
79 - def run(self):
80 """Start running the worker thread.""" 81 while True: 82 if self._dismissed.isSet(): 83 break 84 try: 85 request = self._requests_queue.get(True, self._poll_timeout) 86 except Queue.Empty: 87 continue 88 else: 89 if self._dismissed.isSet(): 90 self._requests_queue.put(request) 91 break 92 try: 93 result = request.callable(*request.args, **request.kwds) 94 self._results_queue.put((request, self.getName(), result)) 95 except: 96 request.exception = True 97 self._results_queue.put((request, self.getName(), sys.exc_info())) 98 time.sleep(0.1)
99
100 - def dismiss(self):
101 """Stop running of the worker thread.""" 102 self._dismissed.set()
103 104 105
106 -class WorkRequest:
107 """Holds the details of a request placed on the thread pool request queue. 108 109 """ 110
111 - def __init__(self, callable_, args=None, kwds=None, requestID=None, 112 callback=None, exc_callback=_handle_thread_exception):
113 """Class constructor. 114 115 @param callable_: The callable object or function 116 @param args: The argument list to the callable object or function 117 @param kwds: The keyword arguments to the callable object or function 118 @param requestID: An ID for the request 119 @param callback: A callback on completion of the request 120 @param exc_callback: A callback when the request throws an excetion 121 122 """ 123 if requestID is None: 124 self.requestID = id(self) 125 else: 126 try: 127 self.requestID = hash(requestID) 128 except TypeError: 129 raise TypeError("requestID must be hashable.") 130 self.exception = False 131 self.callback = callback 132 self.exc_callback = exc_callback 133 self.callable = callable_ 134 self.args = args or [] 135 self.kwds = kwds or {}
136 137 138
139 -class ThreadPool:
140 """Main pool to manage worker threads processing an internal request queue. 141 142 """ 143
144 - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
145 """Class constructor. 146 147 @param num_workers: The number of worker threads processing the queue 148 @param q_size: The request queue size 149 @param resq_size: The response queue size 150 @param poll_timeout: The polling timeout of worker threads when getting requests from the queue 151 152 """ 153 self._requests_queue = Queue.Queue(q_size) 154 self._results_queue = Queue.Queue(resq_size) 155 self.workers = [] 156 self.dismissedWorkers = [] 157 self.workRequests = {} 158 self.createWorkers(num_workers, poll_timeout)
159 160
161 - def createWorkers(self, num_workers, poll_timeout=5):
162 """Create additional threads on the workers stack. 163 164 @param num_workers: The number of workers to add to the stack 165 @param poll_timeout: The timeout of the threads when waiting for a request on the queue 166 167 """ 168 for i in range(num_workers): 169 self.workers.append(WorkerThread(self._requests_queue, 170 self._results_queue, poll_timeout=poll_timeout))
171 172
173 - def dismissWorkers(self, num_workers, do_join=False):
174 """Dismiss worker threads from the workers stack. 175 176 Stops a set number of workers in the workers list by popping the workers of the 177 list stack. 178 179 @param num_workers: The number of workers to dismiss 180 @param do_join: If True wait for all threads to terminate before returning from the call 181 182 """ 183 dismiss_list = [] 184 for i in range(min(num_workers, len(self.workers))): 185 worker = self.workers.pop() 186 worker.dismiss() 187 dismiss_list.append(worker) 188 189 if do_join: 190 for worker in dismiss_list: 191 worker.join() 192 else: 193 self.dismissedWorkers.extend(dismiss_list)
194 195
196 - def joinAllDismissedWorkers(self):
197 """Join all dismissed workers. 198 199 Blocks until all dismissed worker threads terminate. Use when calling dismissWorkers 200 with do_join = False. 201 202 """ 203 for worker in self.dismissedWorkers: 204 worker.join() 205 self.dismissedWorkers = []
206 207
208 - def putRequest(self, request, block=True, timeout=0):
209 """Place a WorkRequest on the request queue. 210 211 @param request: The WorkRequest to place on the request queue 212 @param block: If set to True, block queue operations until complete, otherwise use timeout 213 @param timeout: The timeout to use for queue operations when block is set to False 214 215 """ 216 assert isinstance(request, WorkRequest) 217 assert not getattr(request, 'exception', None) 218 self._requests_queue.put(request, block, timeout) 219 self.workRequests[request.requestID] = request
220 221
222 - def poll(self, block=False):
223 """Poll the request queue until the queue is empty. 224 225 Raises a NoResultsPending or NoWorkersAvailable exception if the results queue 226 is initially empty, or there are no available workers. Otherwise processes the 227 results queue and calls the request callback with the result of the request. 228 229 """ 230 while True: 231 if not self.workRequests: 232 raise NoResultsPending 233 elif block and not self.workers: 234 raise NoWorkersAvailable 235 try: 236 request, name, result = self._results_queue.get(block=block) 237 if request.exception and request.exc_callback: 238 request.exc_callback(name, result) 239 if request.callback and not \ 240 (request.exception and request.exc_callback): 241 request.callback(name, result) 242 del self.workRequests[request.requestID] 243 except Queue.Empty: 244 break
245 246
247 - def wait(self):
248 """Block until there are no request results pending on the queue.""" 249 while 1: 250 try: 251 self.poll(True) 252 except NoResultsPending: 253 break
254