1 """An implementation of the Web Site Process Bus.
2
3 This module is completely standalone, depending only on the stdlib.
4
5 Web Site Process Bus
6 --------------------
7
8 A Bus object is used to contain and manage site-wide behavior:
9 daemonization, HTTP server start/stop, process reload, signal handling,
10 drop privileges, PID file management, logging for all of these,
11 and many more.
12
13 In addition, a Bus object provides a place for each web framework
14 to register code that runs in response to site-wide events (like
15 process start and stop), or which controls or otherwise interacts with
16 the site-wide components mentioned above. For example, a framework which
17 uses file-based templates would add known template filenames to an
18 autoreload component.
19
20 Ideally, a Bus object will be flexible enough to be useful in a variety
21 of invocation scenarios:
22
23 1. The deployer starts a site from the command line via a framework-
24 neutral deployment script; applications from multiple frameworks
25 are mixed in a single site. Command-line arguments and configuration
26 files are used to define site-wide components such as the HTTP server,
27 WSGI component graph, autoreload behavior, signal handling, etc.
28 2. The deployer starts a site via some other process, such as Apache;
29 applications from multiple frameworks are mixed in a single site.
30 Autoreload and signal handling (from Python at least) are disabled.
31 3. The deployer starts a site via a framework-specific mechanism;
32 for example, when running tests, exploring tutorials, or deploying
33 single applications from a single framework. The framework controls
34 which site-wide components are enabled as it sees fit.
35
36 The Bus object in this package uses topic-based publish-subscribe
37 messaging to accomplish all this. A few topic channels are built in
38 ('start', 'stop', 'exit', and 'graceful'). Frameworks and site containers
39 are free to define their own. If a message is sent to a channel that has
40 not been defined or has no listeners, there is no effect.
41
42 In general, there should only ever be a single Bus object per process.
43 Frameworks and site containers share a single Bus object by publishing
44 messages and subscribing listeners.
45
46 The Bus object works as a finite state machine which models the current
47 state of the process. Bus methods move it from one state to another;
48 those methods then publish to subscribed listeners on the channel for
49 the new state.
50
51 O
52 |
53 V
54 STOPPING --> STOPPED --> EXITING -> X
55 A A |
56 | \___ |
57 | \ |
58 | V V
59 STARTED <-- STARTING
60
61 """
62
63 import atexit
64 import os
65 try:
66 set
67 except NameError:
68 from sets import Set as set
69 import sys
70 import threading
71 import time
72 import traceback as _traceback
73 import warnings
74
75
76
79 name = None
81 return "states.%s" % self.name
82
87 states = _StateEnum()
88 states.STOPPED = states.State()
89 states.STARTING = states.State()
90 states.STARTED = states.State()
91 states.STOPPING = states.State()
92 states.EXITING = states.State()
93
94
96 """Process state-machine and messenger for HTTP site deployment.
97
98 All listeners for a given channel are guaranteed to be called even
99 if others at the same channel fail. Each failure is logged, but
100 execution proceeds on to the next listener. The only way to stop all
101 processing from inside a listener is to raise SystemExit and stop the
102 whole server.
103 """
104
105 states = states
106 state = states.STOPPED
107 execv = False
108
110 self.execv = False
111 self.state = states.STOPPED
112 self.listeners = dict(
113 [(channel, set()) for channel
114 in ('start', 'stop', 'exit', 'graceful', 'log')])
115 self._priorities = {}
116
117 - def subscribe(self, channel, callback, priority=None):
118 """Add the given callback at the given channel (if not present)."""
119 if channel not in self.listeners:
120 self.listeners[channel] = set()
121 self.listeners[channel].add(callback)
122
123 if priority is None:
124 priority = getattr(callback, 'priority', 50)
125 self._priorities[(channel, callback)] = priority
126
128 """Discard the given callback (if present)."""
129 listeners = self.listeners.get(channel)
130 if listeners and callback in listeners:
131 listeners.discard(callback)
132 del self._priorities[(channel, callback)]
133
134 - def publish(self, channel, *args, **kwargs):
135 """Return output of all subscribers for the given channel."""
136 if channel not in self.listeners:
137 return []
138
139 exc = None
140 output = []
141
142 items = [(self._priorities[(channel, listener)], listener)
143 for listener in self.listeners[channel]]
144 items.sort()
145 for priority, listener in items:
146 try:
147 output.append(listener(*args, **kwargs))
148 except KeyboardInterrupt:
149 raise
150 except SystemExit, e:
151
152 if exc and e.code == 0:
153 e.code = 1
154 raise
155 except:
156 exc = sys.exc_info()[1]
157 if channel == 'log':
158
159 pass
160 else:
161 self.log("Error in %r listener %r" % (channel, listener),
162 level=40, traceback=True)
163 if exc:
164 raise
165 return output
166
168 """An atexit handler which asserts the Bus is not running."""
169 if self.state != states.EXITING:
170 warnings.warn(
171 "The main thread is exiting, but the Bus is in the %r state; "
172 "shutting it down automatically now. You must either call "
173 "bus.block() after start(), or call bus.exit() before the "
174 "main thread exits." % self.state, RuntimeWarning)
175 self.exit()
176
178 """Start all services."""
179 atexit.register(self._clean_exit)
180
181 self.state = states.STARTING
182 self.log('Bus STARTING')
183 try:
184 self.publish('start')
185 self.state = states.STARTED
186 self.log('Bus STARTED')
187 except (KeyboardInterrupt, SystemExit):
188 raise
189 except:
190 self.log("Shutting down due to error in start listener:",
191 level=40, traceback=True)
192 e_info = sys.exc_info()
193 try:
194 self.exit()
195 except:
196
197 pass
198 raise e_info[0], e_info[1], e_info[2]
199
201 """Stop all services and prepare to exit the process."""
202 try:
203 self.stop()
204
205 self.state = states.EXITING
206 self.log('Bus EXITING')
207 self.publish('exit')
208
209
210 self.log('Bus EXITED')
211 except:
212
213
214
215
216 os._exit(70)
217
219 """Restart the process (may close connections).
220
221 This method does not restart the process from the calling thread;
222 instead, it stops the bus and asks the main thread to call execv.
223 """
224 self.execv = True
225 self.exit()
226
228 """Advise all services to reload."""
229 self.log('Bus graceful')
230 self.publish('graceful')
231
232 - def block(self, interval=0.1):
233 """Wait for the EXITING state, KeyboardInterrupt or SystemExit.
234
235 This function is intended to be called only by the main thread.
236 After waiting for the EXITING state, it also waits for all threads
237 to terminate, and then calls os.execv if self.execv is True. This
238 design allows another thread to call bus.restart, yet have the main
239 thread perform the actual execv call (required on some platforms).
240 """
241 try:
242 self.wait(states.EXITING, interval=interval)
243 except (KeyboardInterrupt, IOError):
244
245
246 self.log('Keyboard Interrupt: shutting down bus')
247 self.exit()
248 except SystemExit:
249 self.log('SystemExit raised: shutting down bus')
250 self.exit()
251 raise
252
253
254
255
256
257
258 self.log("Waiting for child threads to terminate...")
259 for t in threading.enumerate():
260 if t != threading.currentThread() and t.isAlive():
261
262 if hasattr(threading.Thread, "daemon"):
263
264 d = t.daemon
265 else:
266 d = t.isDaemon()
267 if not d:
268 t.join()
269
270 if self.execv:
271 self._do_execv()
272
273 - def wait(self, state, interval=0.1):
274 """Wait for the given state(s)."""
275 if isinstance(state, (tuple, list)):
276 states = state
277 else:
278 states = [state]
279
280 def _wait():
281 while self.state not in states:
282 time.sleep(interval)
283
284
285
286
287
288
289
290 try:
291 sys.modules['psyco'].cannotcompile(_wait)
292 except (KeyError, AttributeError):
293 pass
294
295 _wait()
296
298 """Re-execute the current process.
299
300 This must be called from the main thread, because certain platforms
301 (OS X) don't allow execv to be called in a child thread very well.
302 """
303 args = sys.argv[:]
304 self.log('Re-spawning %s' % ' '.join(args))
305 args.insert(0, sys.executable)
306 if sys.platform == 'win32':
307 args = ['"%s"' % arg for arg in args]
308
309 os.execv(sys.executable, args)
310
312 """Stop all services."""
313 self.state = states.STOPPING
314 self.log('Bus STOPPING')
315 self.publish('stop')
316 self.state = states.STOPPED
317 self.log('Bus STOPPED')
318
320 """Start 'func' in a new thread T, then start self (and return T)."""
321 if args is None:
322 args = ()
323 if kwargs is None:
324 kwargs = {}
325 args = (func,) + args
326
327 def _callback(func, *a, **kw):
328 self.wait(states.STARTED)
329 func(*a, **kw)
330 t = threading.Thread(target=_callback, args=args, kwargs=kwargs)
331 t.setName('Bus Callback ' + t.getName())
332 t.start()
333
334 self.start()
335
336 return t
337
338 - def log(self, msg="", level=20, traceback=False):
339 """Log the given message. Append the last traceback if requested."""
340 if traceback:
341 exc = sys.exc_info()
342 msg += "\n" + "".join(_traceback.format_exception(*exc))
343 self.publish('log', msg, level)
344
345 bus = Bus()
346