1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 import sys, time, logging, thread, threading, Queue, traceback
25
26 from pysys import log
27
28
29
31 """All work requests have been processed."""
32 pass
33
34
36 """No worker threads available to process remaining requests."""
37 pass
38
39
40
42 """Default exception handler callback function.
43
44 This just prints the exception info via ``traceback.print_exception``.
45
46 """
47 traceback.print_exception(*exc_info)
48
49
50
52 """Thread to perform work requests managed by the thread pool object.
53
54 The thread polls the thread safe queue of the thread pool instance to retrieve
55 work requests in the form of a callable reference with parameters. On completion
56 of a work request the thread places the results on another thread safe queue of the
57 thread pool and waits to get a new request.
58
59 """
60
61 - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
62 """Class constructor.
63
64 @param requests_queue: Reference to the threadpool's request queue
65 @param results_queue: Reference to the threadpool's results queue
66 @param poll_timeout: The timeout when trying to obtain a request from the request queue
67 @param kwds: Variable arguments to be passed to the threading.Thread constructor
68
69 """
70 threading.Thread.__init__(self, **kwds)
71 log.info("[%s] Creating thread for test execution" % self.getName())
72 self.setDaemon(1)
73 self._requests_queue = requests_queue
74 self._results_queue = results_queue
75 self._poll_timeout = poll_timeout
76 self._dismissed = threading.Event()
77 self.start()
78
80 """Start running the worker thread."""
81 while True:
82 if self._dismissed.isSet():
83 break
84 try:
85 request = self._requests_queue.get(True, self._poll_timeout)
86 except Queue.Empty:
87 continue
88 else:
89 if self._dismissed.isSet():
90 self._requests_queue.put(request)
91 break
92 try:
93 result = request.callable(*request.args, **request.kwds)
94 self._results_queue.put((request, self.getName(), result))
95 except:
96 request.exception = True
97 self._results_queue.put((request, self.getName(), sys.exc_info()))
98 time.sleep(0.1)
99
101 """Stop running of the worker thread."""
102 self._dismissed.set()
103
104
105
107 """Holds the details of a request placed on the thread pool request queue.
108
109 """
110
113 """Class constructor.
114
115 @param callable_: The callable object or function
116 @param args: The argument list to the callable object or function
117 @param kwds: The keyword arguments to the callable object or function
118 @param requestID: An ID for the request
119 @param callback: A callback on completion of the request
120 @param exc_callback: A callback when the request throws an excetion
121
122 """
123 if requestID is None:
124 self.requestID = id(self)
125 else:
126 try:
127 self.requestID = hash(requestID)
128 except TypeError:
129 raise TypeError("requestID must be hashable.")
130 self.exception = False
131 self.callback = callback
132 self.exc_callback = exc_callback
133 self.callable = callable_
134 self.args = args or []
135 self.kwds = kwds or {}
136
137
138
140 """Main pool to manage worker threads processing an internal request queue.
141
142 """
143
144 - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
145 """Class constructor.
146
147 @param num_workers: The number of worker threads processing the queue
148 @param q_size: The request queue size
149 @param resq_size: The response queue size
150 @param poll_timeout: The polling timeout of worker threads when getting requests from the queue
151
152 """
153 self._requests_queue = Queue.Queue(q_size)
154 self._results_queue = Queue.Queue(resq_size)
155 self.workers = []
156 self.dismissedWorkers = []
157 self.workRequests = {}
158 self.createWorkers(num_workers, poll_timeout)
159
160
162 """Create additional threads on the workers stack.
163
164 @param num_workers: The number of workers to add to the stack
165 @param poll_timeout: The timeout of the threads when waiting for a request on the queue
166
167 """
168 for i in range(num_workers):
169 self.workers.append(WorkerThread(self._requests_queue,
170 self._results_queue, poll_timeout=poll_timeout))
171
172
174 """Dismiss worker threads from the workers stack.
175
176 Stops a set number of workers in the workers list by popping the workers of the
177 list stack.
178
179 @param num_workers: The number of workers to dismiss
180 @param do_join: If True wait for all threads to terminate before returning from the call
181
182 """
183 dismiss_list = []
184 for i in range(min(num_workers, len(self.workers))):
185 worker = self.workers.pop()
186 worker.dismiss()
187 dismiss_list.append(worker)
188
189 if do_join:
190 for worker in dismiss_list:
191 worker.join()
192 else:
193 self.dismissedWorkers.extend(dismiss_list)
194
195
197 """Join all dismissed workers.
198
199 Blocks until all dismissed worker threads terminate. Use when calling dismissWorkers
200 with do_join = False.
201
202 """
203 for worker in self.dismissedWorkers:
204 worker.join()
205 self.dismissedWorkers = []
206
207
208 - def putRequest(self, request, block=True, timeout=0):
209 """Place a WorkRequest on the request queue.
210
211 @param request: The WorkRequest to place on the request queue
212 @param block: If set to True, block queue operations until complete, otherwise use timeout
213 @param timeout: The timeout to use for queue operations when block is set to False
214
215 """
216 assert isinstance(request, WorkRequest)
217 assert not getattr(request, 'exception', None)
218 self._requests_queue.put(request, block, timeout)
219 self.workRequests[request.requestID] = request
220
221
222 - def poll(self, block=False):
223 """Poll the request queue until the queue is empty.
224
225 Raises a NoResultsPending or NoWorkersAvailable exception if the results queue
226 is initially empty, or there are no available workers. Otherwise processes the
227 results queue and calls the request callback with the result of the request.
228
229 """
230 while True:
231 if not self.workRequests:
232 raise NoResultsPending
233 elif block and not self.workers:
234 raise NoWorkersAvailable
235 try:
236 request, name, result = self._results_queue.get(block=block)
237 if request.exception and request.exc_callback:
238 request.exc_callback(name, result)
239 if request.callback and not \
240 (request.exception and request.exc_callback):
241 request.callback(name, result)
242 del self.workRequests[request.requestID]
243 except Queue.Empty:
244 break
245
246
248 """Block until there are no request results pending on the queue."""
249 while 1:
250 try:
251 self.poll(True)
252 except NoResultsPending:
253 break
254