1
2
3
4
5
6
7
8
9 """
10 @undocumented: XArgsHolder, _initProject, _allocateUniqueProcessStdOutErr, stringToUnicode
11 """
12
13 import time
14 from pysys import log
15 from pysys.constants import *
16 from pysys.exceptions import *
17 from pysys.process.helper import ProcessWrapper
18
20 """
21 Internal-only, do not use.
22 """
23 try:
24 assert project.APAMA_HOME
25 assert project.APAMA_WORK
26 except Exception:
27 raise Exception("Not running inside an Apama environment. APAMA_HOME and APAMA_WORK not set")
28
29
30 if not getattr(project, "APAMA_BIN_DIR", None):
31 project.APAMA_BIN_DIR = os.path.join(project.APAMA_HOME, 'bin')
32 if not getattr(project, "APAMA_COMMON_JRE", None):
33 project.APAMA_COMMON_JRE = os.getenv('APAMA_COMMON_JRE', project.APAMA_HOME+'/../jvm/jvm/jre')
34 if not getattr(project, "APAMA_LIBRARY_VERSION", None):
35 project.APAMA_LIBRARY_VERSION = os.getenv('APAMA_LIBRARY_VERSION', 'UNKNOWN_APAMA_LIBRARY_VERSION')
36
38 """Abstract parent helper class for Apama server processes.
39
40 @ivar parent: Reference to the PySys testcase instantiating this class instance
41 @type parent: pysys.basetest
42 @ivar port: Port used for starting and interaction with the process
43 @type port: integer
44 @ivar host: Hostname for interaction with a remote process
45 @type host: string
46 @ivar environ: The environment for running the process
47 @type environ: dictionary
48
49 """
50
51 - def __init__(self, parent, name, port=None, host=None):
52 """Create an instance of the class.
53
54 If no port parameter is used in the argument list an available port will be dynamically found from
55 the OS and used for starting the process, and performing all operations against it. The host
56 parameter is only used to perform operations against a remote process started externally to the
57 PySys framework - the class does not support the starting of a remote process.
58
59 @param parent: Reference to the parent PySys testcase
60 @param port: The port used for starting and interacting with the Correlator
61 @param host: The hostname used for interaction with a remote Correlator
62
63
64 """
65 self.parent = parent
66 self.process = None
67 _initProject(self.parent.project)
68
69 def cleanup():
70 try:
71 if self.process and self.process.running():
72 self.shutdown(ignoreExitStatus=True)
73 except Exception, e:
74
75 self.parent.log.error('Failed to shutdown %s: %s', self, e)
76 if getattr(self.parent.project, 'shutdownApamaComponentsAfterTest', 'true').lower() == 'true':
77 parent.addCleanupFunction(cleanup)
78
79 self.port = int(port) if port else port
80 self.host = host
81
82 if self.port == None: self.port = parent.getNextAvailableTCPPort()
83 if self.host == None: self.host = "localhost"
84 self.environ = {}
85 self.log = self.parent.log
86 for key in os.environ: self.environ[stringToUnicode(key)] = stringToUnicode(os.environ[key])
87
88 if not self.parent.project: raise Exception("PROJECT is not set - pysys environment not configured correctly")
89
90 _initProject(self.parent.project)
91
92
93 self.name = name
94
96 """Block until the the component declares itself to be ready for processing.
97
98 """
99
100
101 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'component_management')
102
103
104 stdout, stderr = self.parent.allocateUniqueStdOutErr('waitForComponentUp_%s'%self.name)
105
106
107 arguments = []
108
109 arguments.append("-w")
110 arguments.extend(["-p", "%d" % self.port])
111 arguments.extend(["-n", self.host])
112
113 waitprocess = ProcessWrapper(command, arguments, self.environ, self.parent.output, BACKGROUND, timeout, stdout, stderr, displayName='waitForComponentUp %s'%self)
114 waitprocess.start()
115 try:
116
117 endtime = time.time()+timeout
118 while time.time() < endtime and waitprocess.running():
119
120 if self.process and not self.process.running():
121 self.parent.addOutcome(BLOCKED, '%s failed with exit code %d in waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError)
122 return
123 time.sleep(0.05)
124 if waitprocess.running():
125 self.parent.addOutcome(TIMEDOUT, 'Timed out waiting for %s to come up (after %d secs)'%(self, timeout), abortOnError=self.parent.defaultAbortOnError)
126 elif waitprocess.exitStatus != 0:
127
128 self.parent.addOutcome(BLOCKED, 'component_management failed while waiting for %s to come up, with exit status %d'%
129 (self, waitprocess.exitStatus), abortOnError=self.parent.defaultAbortOnError)
130 else:
131
132
133
134
135 sltime = float(getattr(self.parent.project, 'APAMA_START_COMPONENT_PORT_IN_USE_SLEEP_SECS', '0'))
136 if sltime > 0:
137 time.sleep(sltime)
138
139
140 if self.process and not self.process.running():
141 self.parent.addOutcome(BLOCKED, '%s failed with exit code %d during waitForComponentUp'%(self.process, self.process.exitStatus), abortOnError=self.parent.defaultAbortOnError)
142
143 self.parent.log.info(" Component %s is now running", self)
144 finally:
145 if waitprocess.running(): waitprocess.stop()
146
147
148 - def shutdown(self, message='Shutdown requested by test', timeout=60, **args):
149 """ Requests a clean shutdown of the component.
150
151 If it was started by this object, also waits for the process to
152 terminate, and silently ignores requests to shutdown if the process
153 was already stopped.
154 """
155 if self.process and not self.process.running(): return
156 self.manage(arguments=['--shutdown', message], timeout=timeout, displayName='shutdown', **args)
157 if self.process:
158 self.process.wait(timeout=timeout)
159
161 """ Returns True if this has a local process that is running, or
162 False if it has stopped, or was not started by this object.
163 """
164 return self.process and self.process.running()
165
166 - def __repr__(self): return '%s<%s%s:%d>'%(
167 self.name,
168 ('pid %s on '%self.process.pid) if (self.process and self.process.pid) else '',
169 self.host, self.port
170 )
171
172 - def manage(self, arguments=[], displayName="manage", **xargs):
173 """Execute component_management operations against the process.
174
175 @param arguments: The arguments to be passed to component_management
176 @param xargs: Optional startProcess keyword arguments, e.g. timeout, ignoreExitStatus, arguments, workingDir
177
178 """
179
180 command = os.path.join(self.parent.project.APAMA_BIN_DIR, 'component_management')
181
182 assert not isinstance(arguments, basestring)
183
184 dstdout,dstderr = _allocateUniqueProcessStdOutErr(self.parent, displayName)
185
186
187 xargs=XArgsHolder(xargs, stdout=dstdout, stderr=dstderr, arguments=arguments, project=self.parent.project)
188
189
190 arguments = []
191 arguments.extend(["-p", "%d" % self.port])
192 if self.host: arguments.extend(["-n", self.host])
193 if xargs.arguments: arguments.extend(xargs.arguments)
194
195 if displayName == 'manage':
196 displayName='%s <%s> [%s]'%(displayName, self.name, ' '.join(xargs.arguments))
197 if '-s' in xargs.arguments or '--shutdown' in xargs.arguments:
198 displayName = 'manage <%s> --shutdown'%self.name
199 else:
200 displayName = '%s <%s>'%(displayName, self.name)
201
202
203 return self.parent.startProcess(command, arguments, self.environ, displayName=displayName, **xargs.kwargs)
204
206 """ Allocate filenames of the form processKey.out[.n] (similarly for .err)
207 for a process that is about to be started, such that the names are not
208 repeated within the specified parent's lifetime.
209
210 Not for customer use, and legacy only - the pysys allocateUniqueStdOutErr method is recommended instead.
211
212 Returns a tuple of (stdout, stderr).
213 """
214
215
216
217 if not hasattr(parent, '_apama_uniqueProcessKeys'): parent._apama_uniqueProcessKeys = {}
218 newval = parent._apama_uniqueProcessKeys.get(processKey, -1)+1
219 parent._apama_uniqueProcessKeys[processKey] = newval
220
221 suffix = '.%d'%newval if newval > 0 else ''
222
223 return (
224 os.path.join(parent.output, processKey+'.out'+suffix),
225 os.path.join(parent.output, processKey+'.err'+suffix),
226 )
227
229 """
230 @deprecated: Internal helper function: do not use, will be removed in a future release.
231
232 Converts a unicode string or a utf-8 bit string into a unicode string.
233
234 """
235 if isinstance(s, unicode):
236 return s
237 else:
238 return unicode(s, "utf8")
239
240
242 """
243
244 @deprecated: Internal helper class: do not use, will be removed in a future release.
245
246 Class to store all supported xargs method arguments used in the helper classes.
247
248 Methods in the Apama helper classes define process related methods with a signature including
249 named parameters for the most commonly used options to the process, and an **xargs parameter to allow
250 passing through of additional supported named parameters, e.g. workingDir, state, timeout, stdout, stderr and
251 arguments. The XArgsHolder class takes the **xargs parameter from a method call (which is treated by
252 python as a dictionary of name value pairs) and default values for the workingDir, state, timeout, stdout, stderr
253 and arguments; these are used to set data attributes to the class instance with the default values. The class then
254 iterates over the **xargs and over-writes the default values if they exist in the parameter. This allows a
255 user of the class to create an instance to hold the additional arguments with default values in the first
256 case, but for these to be replaced if an alternative value is supplied via **xargs, e.g. the user of the method wants
257 to explicitly set the sdtout etc.
258
259 The kwargs member contains a dictionary that can be passed through to startProcess using **kwargs,
260 containing every setting except for arguments which should always be handled directly.
261
262 Typical usage would be startProcess(arguments=xargs.arguments, **xargs.kwargs)
263
264 """
265
266 - def __init__(self, xargs, workingDir=None, state=FOREGROUND, timeout=TIMEOUTS['WaitForProcess'], stdout=None, stderr=None, arguments=[], ignoreExitStatus=None, project=None):
267 """Create an instance of the XArgsHolder class.
268
269 @param xargs: The variable argument list passed into the method, which override the defaults provided by the other arguments
270 @param workingDir: The default value for the working directory of a process
271 @param state: The default state of the process (L{pysys.constants.BACKGROUND} | L{pysys.constants.FOREGROUND})
272 @param timeout: The default value of the process timeout
273 @param stdout: The default value of the process stdout
274 @param stderr: The default value of the process stderr
275 @param arguments: Extra command line arguments to be passed to the process.
276 @param ignoreExitStatus: Set to True to change default behaviour for this process to prevent
277 non-zero exit code being treated as a test failure. The project setting
278 defaultApamaIgnoreExitStatus can be used to control this globally.
279
280 """
281
282 self.kwargs = dict(xargs)
283
284
285 self.workingDir = self.kwargs['workingDir'] = xargs.get('workingDir', workingDir)
286 self.state = self.kwargs['state'] = xargs.get('state', state)
287 self.timeout = self.kwargs['timeout'] = xargs.get('timeout', timeout)
288 self.stdout = self.kwargs['stdout'] = xargs.get('stdout', stdout)
289 self.stderr = self.kwargs['stderr'] = xargs.get('stderr', stderr)
290
291 if ignoreExitStatus==None:
292 ignoreExitStatus = project.defaultApamaIgnoreExitStatus.lower()=='true' if project and hasattr(project, 'defaultApamaIgnoreExitStatus') else False
293 self.ignoreExitStatus = self.kwargs['ignoreExitStatus'] = xargs.get('ignoreExitStatus', ignoreExitStatus)
294
295
296
297 self.arguments = self.kwargs.pop('arguments', arguments)
298
299
300 if self.arguments == None: self.arguments = []
301 if isinstance(self.arguments, basestring): raise Exception('arguments must be a list not a string')
302