1 """Site services for use with a Web Site Process Bus."""
2
3 import os
4 import re
5 try:
6 set
7 except NameError:
8 from sets import Set as set
9 import signal as _signal
10 import sys
11 import time
12 import threading
13
14
16 """Plugin base class which auto-subscribes methods for known channels."""
17
20
22 """Register this object as a (multi-channel) listener on the bus."""
23 for channel in self.bus.listeners:
24
25 method = getattr(self, channel, None)
26 if method is not None:
27 self.bus.subscribe(channel, method)
28
30 """Unregister this object as a listener on the bus."""
31 for channel in self.bus.listeners:
32
33 method = getattr(self, channel, None)
34 if method is not None:
35 self.bus.unsubscribe(channel, method)
36
37
38
40 """Register bus channels (and listeners) for system signals.
41
42 By default, instantiating this object subscribes the following signals
43 and listeners:
44
45 TERM: bus.exit
46 HUP : bus.restart
47 USR1: bus.graceful
48 """
49
50
51 signals = {}
52 for k, v in vars(_signal).items():
53 if k.startswith('SIG') and not k.startswith('SIG_'):
54 signals[v] = k
55 del k, v
56
66
68 for sig, func in self.handlers.iteritems():
69 try:
70 self.set_handler(sig, func)
71 except ValueError:
72 pass
73
75 for signum, handler in self._previous_handlers.iteritems():
76 signame = self.signals[signum]
77
78 if handler is None:
79 self.bus.log("Restoring %s handler to SIG_DFL." % signame)
80 handler = _signal.SIG_DFL
81 else:
82 self.bus.log("Restoring %s handler %r." % (signame, handler))
83
84 try:
85 our_handler = _signal.signal(signum, handler)
86 if our_handler is None:
87 self.bus.log("Restored old %s handler %r, but our "
88 "handler was not registered." %
89 (signame, handler), level=30)
90 except ValueError:
91 self.bus.log("Unable to restore %s handler %r." %
92 (signame, handler), level=40, traceback=True)
93
95 """Subscribe a handler for the given signal (number or name).
96
97 If the optional 'listener' argument is provided, it will be
98 subscribed as a listener for the given signal's channel.
99
100 If the given signal name or number is not available on the current
101 platform, ValueError is raised.
102 """
103 if isinstance(signal, basestring):
104 signum = getattr(_signal, signal, None)
105 if signum is None:
106 raise ValueError("No such signal: %r" % signal)
107 signame = signal
108 else:
109 try:
110 signame = self.signals[signal]
111 except KeyError:
112 raise ValueError("No such signal: %r" % signal)
113 signum = signal
114
115 prev = _signal.signal(signum, self._handle_signal)
116 self._previous_handlers[signum] = prev
117
118 if listener is not None:
119 self.bus.log("Listening for %s." % signame)
120 self.bus.subscribe(signame, listener)
121
123 """Python signal handler (self.set_handler subscribes it for you)."""
124 signame = self.signals[signum]
125 self.bus.log("Caught signal %s." % signame)
126 self.bus.publish(signame)
127
129 if os.isatty(sys.stdin.fileno()):
130
131 self.bus.log("SIGHUP caught but not daemonized. Exiting.")
132 self.bus.exit()
133 else:
134 self.bus.log("SIGHUP caught while daemonized. Restarting.")
135 self.bus.restart()
136
137
138 try:
139 import pwd, grp
140 except ImportError:
141 pwd, grp = None, None
142
143
145 """Drop privileges. uid/gid arguments not available on Windows.
146
147 Special thanks to Gavin Baker: http://antonym.org/node/100.
148 """
149
150 - def __init__(self, bus, umask=None, uid=None, gid=None):
156
160 if val is not None:
161 if pwd is None:
162 self.bus.log("pwd module not available; ignoring uid.",
163 level=30)
164 val = None
165 elif isinstance(val, basestring):
166 val = pwd.getpwnam(val)[2]
167 self._uid = val
168 uid = property(_get_uid, _set_uid, doc="The uid under which to run.")
169
173 if val is not None:
174 if grp is None:
175 self.bus.log("grp module not available; ignoring gid.",
176 level=30)
177 val = None
178 elif isinstance(val, basestring):
179 val = grp.getgrnam(val)[2]
180 self._gid = val
181 gid = property(_get_gid, _set_gid, doc="The gid under which to run.")
182
186 if val is not None:
187 try:
188 os.umask
189 except AttributeError:
190 self.bus.log("umask function not available; ignoring umask.",
191 level=30)
192 val = None
193 self._umask = val
194 umask = property(_get_umask, _set_umask, doc="The umask under which to run.")
195
197
198 def current_ids():
199 """Return the current (uid, gid) if available."""
200 name, group = None, None
201 if pwd:
202 name = pwd.getpwuid(os.getuid())[0]
203 if grp:
204 group = grp.getgrgid(os.getgid())[0]
205 return name, group
206
207 if self.finalized:
208 if not (self.uid is None and self.gid is None):
209 self.bus.log('Already running as uid: %r gid: %r' %
210 current_ids())
211 else:
212 if self.uid is None and self.gid is None:
213 if pwd or grp:
214 self.bus.log('uid/gid not set', level=30)
215 else:
216 self.bus.log('Started as uid: %r gid: %r' % current_ids())
217 if self.gid is not None:
218 os.setgid(self.gid)
219 if self.uid is not None:
220 os.setuid(self.uid)
221 self.bus.log('Running as uid: %r gid: %r' % current_ids())
222
223
224 if self.finalized:
225 if self.umask is not None:
226 self.bus.log('umask already set to: %03o' % self.umask)
227 else:
228 if self.umask is None:
229 self.bus.log('umask not set', level=30)
230 else:
231 old_umask = os.umask(self.umask)
232 self.bus.log('umask old: %03o, new: %03o' %
233 (old_umask, self.umask))
234
235 self.finalized = True
236
237
238
239 start.priority = 77
240
241
243 """Daemonize the running script.
244
245 Use this with a Web Site Process Bus via:
246
247 Daemonizer(bus).subscribe()
248
249 When this component finishes, the process is completely decoupled from
250 the parent environment. Please note that when this component is used,
251 the return code from the parent process will still be 0 if a startup
252 error occurs in the forked children. Errors in the initial daemonizing
253 process still return proper exit codes. Therefore, if you use this
254 plugin to daemonize, don't use the return code as an accurate indicator
255 of whether the process fully started. In fact, that return code only
256 indicates if the process succesfully finished the first fork.
257 """
258
259 - def __init__(self, bus, stdin='/dev/null', stdout='/dev/null',
260 stderr='/dev/null'):
261 SimplePlugin.__init__(self, bus)
262 self.stdin = stdin
263 self.stdout = stdout
264 self.stderr = stderr
265 self.finalized = False
266
268 if self.finalized:
269 self.bus.log('Already deamonized.')
270
271
272
273
274
275
276 if threading.activeCount() != 1:
277 self.bus.log('There are %r active threads. '
278 'Daemonizing now may cause strange failures.' %
279 threading.enumerate(), level=30)
280
281
282
283
284
285
286 sys.stdout.flush()
287 sys.stderr.flush()
288
289
290 try:
291 pid = os.fork()
292 if pid == 0:
293
294 pass
295 else:
296
297 self.bus.log('Forking once.')
298 os._exit(0)
299 except OSError, exc:
300
301 sys.exit("%s: fork #1 failed: (%d) %s\n"
302 % (sys.argv[0], exc.errno, exc.strerror))
303
304 os.setsid()
305
306
307 try:
308 pid = os.fork()
309 if pid > 0:
310 self.bus.log('Forking twice.')
311 os._exit(0)
312 except OSError, exc:
313 sys.exit("%s: fork #2 failed: (%d) %s\n"
314 % (sys.argv[0], exc.errno, exc.strerror))
315
316 os.chdir("/")
317 os.umask(0)
318
319 si = open(self.stdin, "r")
320 so = open(self.stdout, "a+")
321 se = open(self.stderr, "a+", 0)
322
323
324
325
326 os.dup2(si.fileno(), sys.stdin.fileno())
327 os.dup2(so.fileno(), sys.stdout.fileno())
328 os.dup2(se.fileno(), sys.stderr.fileno())
329
330 self.bus.log('Daemonized to PID: %s' % os.getpid())
331 self.finalized = True
332 start.priority = 65
333
334
336 """Maintain a PID file via a WSPBus."""
337
342
344 pid = os.getpid()
345 if self.finalized:
346 self.bus.log('PID %r already written to %r.' % (pid, self.pidfile))
347 else:
348 open(self.pidfile, "wb").write(str(pid))
349 self.bus.log('PID %r written to %r.' % (pid, self.pidfile))
350 self.finalized = True
351 start.priority = 70
352
354 try:
355 os.remove(self.pidfile)
356 self.bus.log('PID file removed: %r.' % self.pidfile)
357 except (KeyboardInterrupt, SystemExit):
358 raise
359 except:
360 pass
361
362
364 """A subclass of threading._Timer whose run() method repeats."""
365
367 while True:
368 self.finished.wait(self.interval)
369 if self.finished.isSet():
370 return
371 self.function(*self.args, **self.kwargs)
372
373
375 """WSPBus listener to periodically run a callback in its own thread.
376
377 bus: a Web Site Process Bus object.
378 callback: the function to call at intervals.
379 frequency: the time in seconds between callback runs.
380 """
381
382 frequency = 60
383
384 - def __init__(self, bus, callback, frequency=60):
389
391 """Start our callback in its own perpetual timer thread."""
392 if self.frequency > 0:
393 threadname = self.__class__.__name__
394 if self.thread is None:
395 self.thread = PerpetualTimer(self.frequency, self.callback)
396 self.thread.setName(threadname)
397 self.thread.start()
398 self.bus.log("Started monitor thread %r." % threadname)
399 else:
400 self.bus.log("Monitor thread %r already started." % threadname)
401 start.priority = 70
402
404 """Stop our callback's perpetual timer thread."""
405 if self.thread is None:
406 self.bus.log("No thread running for %s." % self.__class__.__name__)
407 else:
408 if self.thread is not threading.currentThread():
409 name = self.thread.getName()
410 self.thread.cancel()
411 self.thread.join()
412 self.bus.log("Stopped thread %r." % name)
413 self.thread = None
414
416 """Stop the callback's perpetual timer thread and restart it."""
417 self.stop()
418 self.start()
419
420
422 """Monitor which re-executes the process when files change."""
423
424 frequency = 1
425 match = '.*'
426
427 - def __init__(self, bus, frequency=1, match='.*'):
432
434 """Start our own perpetual timer thread for self.run."""
435 if self.thread is None:
436 self.mtimes = {}
437 Monitor.start(self)
438 start.priority = 70
439
441 """Reload the process if registered files have been modified."""
442 sysfiles = set()
443 for k, m in sys.modules.items():
444 if re.match(self.match, k):
445 if hasattr(m, '__loader__'):
446 if hasattr(m.__loader__, 'archive'):
447 k = m.__loader__.archive
448 k = getattr(m, '__file__', None)
449 sysfiles.add(k)
450
451 for filename in sysfiles | self.files:
452 if filename:
453 if filename.endswith('.pyc'):
454 filename = filename[:-1]
455
456 oldtime = self.mtimes.get(filename, 0)
457 if oldtime is None:
458
459 continue
460
461 try:
462 mtime = os.stat(filename).st_mtime
463 except OSError:
464
465 mtime = None
466
467 if filename not in self.mtimes:
468
469 self.mtimes[filename] = mtime
470 else:
471 if mtime is None or mtime > oldtime:
472
473 self.bus.log("Restarting because %s changed." % filename)
474 self.thread.cancel()
475 self.bus.log("Stopped thread %r." % self.thread.getName())
476 self.bus.restart()
477 return
478
479
481 """Manager for HTTP request threads.
482
483 If you have control over thread creation and destruction, publish to
484 the 'acquire_thread' and 'release_thread' channels (for each thread).
485 This will register/unregister the current thread and publish to
486 'start_thread' and 'stop_thread' listeners in the bus as needed.
487
488 If threads are created and destroyed by code you do not control
489 (e.g., Apache), then, at the beginning of every HTTP request,
490 publish to 'acquire_thread' only. You should not publish to
491 'release_thread' in this case, since you do not know whether
492 the thread will be re-used or not. The bus will call
493 'stop_thread' listeners for you when it stops.
494 """
495
501
503 """Run 'start_thread' listeners for the current thread.
504
505 If the current thread has already been seen, any 'start_thread'
506 listeners will not be run again.
507 """
508 thread_ident = threading._get_ident()
509 if thread_ident not in self.threads:
510
511
512 i = len(self.threads) + 1
513 self.threads[thread_ident] = i
514 self.bus.publish('start_thread', i)
515
517 """Release the current thread and run 'stop_thread' listeners."""
518 thread_ident = threading._get_ident()
519 i = self.threads.pop(thread_ident, None)
520 if i is not None:
521 self.bus.publish('stop_thread', i)
522
524 """Release all threads and run all 'stop_thread' listeners."""
525 for thread_ident, i in self.threads.iteritems():
526 self.bus.publish('stop_thread', i)
527 self.threads.clear()
528 graceful = stop
529