Package cherrypy :: Package process :: Module wspbus
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.process.wspbus

  1  """An implementation of the Web Site Process Bus. 
  2   
  3  This module is completely standalone, depending only on the stdlib. 
  4   
  5  Web Site Process Bus 
  6  -------------------- 
  7   
  8  A Bus object is used to contain and manage site-wide behavior: 
  9  daemonization, HTTP server start/stop, process reload, signal handling, 
 10  drop privileges, PID file management, logging for all of these, 
 11  and many more. 
 12   
 13  In addition, a Bus object provides a place for each web framework 
 14  to register code that runs in response to site-wide events (like 
 15  process start and stop), or which controls or otherwise interacts with 
 16  the site-wide components mentioned above. For example, a framework which 
 17  uses file-based templates would add known template filenames to an 
 18  autoreload component. 
 19   
 20  Ideally, a Bus object will be flexible enough to be useful in a variety 
 21  of invocation scenarios: 
 22   
 23   1. The deployer starts a site from the command line via a framework- 
 24       neutral deployment script; applications from multiple frameworks 
 25       are mixed in a single site. Command-line arguments and configuration 
 26       files are used to define site-wide components such as the HTTP server, 
 27       WSGI component graph, autoreload behavior, signal handling, etc. 
 28   2. The deployer starts a site via some other process, such as Apache; 
 29       applications from multiple frameworks are mixed in a single site. 
 30       Autoreload and signal handling (from Python at least) are disabled. 
 31   3. The deployer starts a site via a framework-specific mechanism; 
 32       for example, when running tests, exploring tutorials, or deploying 
 33       single applications from a single framework. The framework controls 
 34       which site-wide components are enabled as it sees fit. 
 35   
 36  The Bus object in this package uses topic-based publish-subscribe 
 37  messaging to accomplish all this. A few topic channels are built in 
 38  ('start', 'stop', 'exit', and 'graceful'). Frameworks and site containers 
 39  are free to define their own. If a message is sent to a channel that has 
 40  not been defined or has no listeners, there is no effect. 
 41   
 42  In general, there should only ever be a single Bus object per process. 
 43  Frameworks and site containers share a single Bus object by publishing 
 44  messages and subscribing listeners. 
 45   
 46  The Bus object works as a finite state machine which models the current 
 47  state of the process. Bus methods move it from one state to another; 
 48  those methods then publish to subscribed listeners on the channel for 
 49  the new state. 
 50   
 51                          O 
 52                          | 
 53                          V 
 54         STOPPING --> STOPPED --> EXITING -> X 
 55            A   A         | 
 56            |    \___     | 
 57            |        \    | 
 58            |         V   V 
 59          STARTED <-- STARTING 
 60   
 61  """ 
 62   
 63  import atexit 
 64  import os 
 65  try: 
 66      set 
 67  except NameError: 
 68      from sets import Set as set 
 69  import sys 
 70  import threading 
 71  import time 
 72  import traceback as _traceback 
 73  import warnings 
 74   
 75   
 76  # Use a flag to indicate the state of the bus. 
77 -class _StateEnum(object):
78 - class State(object):
79 name = None
80 - def __repr__(self):
81 return "states.%s" % self.name
82
83 - def __setattr__(self, key, value):
84 if isinstance(value, self.State): 85 value.name = key 86 object.__setattr__(self, key, value)
87 states = _StateEnum() 88 states.STOPPED = states.State() 89 states.STARTING = states.State() 90 states.STARTED = states.State() 91 states.STOPPING = states.State() 92 states.EXITING = states.State() 93 94
95 -class Bus(object):
96 """Process state-machine and messenger for HTTP site deployment. 97 98 All listeners for a given channel are guaranteed to be called even 99 if others at the same channel fail. Each failure is logged, but 100 execution proceeds on to the next listener. The only way to stop all 101 processing from inside a listener is to raise SystemExit and stop the 102 whole server. 103 """ 104 105 states = states 106 state = states.STOPPED 107 execv = False 108
109 - def __init__(self):
110 self.execv = False 111 self.state = states.STOPPED 112 self.listeners = dict( 113 [(channel, set()) for channel 114 in ('start', 'stop', 'exit', 'graceful', 'log')]) 115 self._priorities = {}
116
117 - def subscribe(self, channel, callback, priority=None):
118 """Add the given callback at the given channel (if not present).""" 119 if channel not in self.listeners: 120 self.listeners[channel] = set() 121 self.listeners[channel].add(callback) 122 123 if priority is None: 124 priority = getattr(callback, 'priority', 50) 125 self._priorities[(channel, callback)] = priority
126
127 - def unsubscribe(self, channel, callback):
128 """Discard the given callback (if present).""" 129 listeners = self.listeners.get(channel) 130 if listeners and callback in listeners: 131 listeners.discard(callback) 132 del self._priorities[(channel, callback)]
133
134 - def publish(self, channel, *args, **kwargs):
135 """Return output of all subscribers for the given channel.""" 136 if channel not in self.listeners: 137 return [] 138 139 exc = None 140 output = [] 141 142 items = [(self._priorities[(channel, listener)], listener) 143 for listener in self.listeners[channel]] 144 items.sort() 145 for priority, listener in items: 146 try: 147 output.append(listener(*args, **kwargs)) 148 except KeyboardInterrupt: 149 raise 150 except SystemExit, e: 151 # If we have previous errors ensure the exit code is non-zero 152 if exc and e.code == 0: 153 e.code = 1 154 raise 155 except: 156 exc = sys.exc_info()[1] 157 if channel == 'log': 158 # Assume any further messages to 'log' will fail. 159 pass 160 else: 161 self.log("Error in %r listener %r" % (channel, listener), 162 level=40, traceback=True) 163 if exc: 164 raise 165 return output
166
167 - def _clean_exit(self):
168 """An atexit handler which asserts the Bus is not running.""" 169 if self.state != states.EXITING: 170 warnings.warn( 171 "The main thread is exiting, but the Bus is in the %r state; " 172 "shutting it down automatically now. You must either call " 173 "bus.block() after start(), or call bus.exit() before the " 174 "main thread exits." % self.state, RuntimeWarning) 175 self.exit()
176
177 - def start(self):
178 """Start all services.""" 179 atexit.register(self._clean_exit) 180 181 self.state = states.STARTING 182 self.log('Bus STARTING') 183 try: 184 self.publish('start') 185 self.state = states.STARTED 186 self.log('Bus STARTED') 187 except (KeyboardInterrupt, SystemExit): 188 raise 189 except: 190 self.log("Shutting down due to error in start listener:", 191 level=40, traceback=True) 192 e_info = sys.exc_info() 193 try: 194 self.exit() 195 except: 196 # Any stop/exit errors will be logged inside publish(). 197 pass 198 raise e_info[0], e_info[1], e_info[2]
199
200 - def exit(self):
201 """Stop all services and prepare to exit the process.""" 202 try: 203 self.stop() 204 205 self.state = states.EXITING 206 self.log('Bus EXITING') 207 self.publish('exit') 208 # This isn't strictly necessary, but it's better than seeing 209 # "Waiting for child threads to terminate..." and then nothing. 210 self.log('Bus EXITED') 211 except: 212 # This method is often called asynchronously (whether thread, 213 # signal handler, console handler, or atexit handler), so we 214 # can't just let exceptions propagate out unhandled. 215 # Assume it's been logged and just die. 216 os._exit(70) # EX_SOFTWARE
217
218 - def restart(self):
219 """Restart the process (may close connections). 220 221 This method does not restart the process from the calling thread; 222 instead, it stops the bus and asks the main thread to call execv. 223 """ 224 self.execv = True 225 self.exit()
226
227 - def graceful(self):
228 """Advise all services to reload.""" 229 self.log('Bus graceful') 230 self.publish('graceful')
231
232 - def block(self, interval=0.1):
233 """Wait for the EXITING state, KeyboardInterrupt or SystemExit. 234 235 This function is intended to be called only by the main thread. 236 After waiting for the EXITING state, it also waits for all threads 237 to terminate, and then calls os.execv if self.execv is True. This 238 design allows another thread to call bus.restart, yet have the main 239 thread perform the actual execv call (required on some platforms). 240 """ 241 try: 242 self.wait(states.EXITING, interval=interval) 243 except (KeyboardInterrupt, IOError): 244 # The time.sleep call might raise 245 # "IOError: [Errno 4] Interrupted function call" on KBInt. 246 self.log('Keyboard Interrupt: shutting down bus') 247 self.exit() 248 except SystemExit: 249 self.log('SystemExit raised: shutting down bus') 250 self.exit() 251 raise 252 253 # Waiting for ALL child threads to finish is necessary on OS X. 254 # See http://www.cherrypy.org/ticket/581. 255 # It's also good to let them all shut down before allowing 256 # the main thread to call atexit handlers. 257 # See http://www.cherrypy.org/ticket/751. 258 self.log("Waiting for child threads to terminate...") 259 for t in threading.enumerate(): 260 if t != threading.currentThread() and t.isAlive(): 261 # Note that any dummy (external) threads are always daemonic. 262 if hasattr(threading.Thread, "daemon"): 263 # Python 2.6+ 264 d = t.daemon 265 else: 266 d = t.isDaemon() 267 if not d: 268 t.join() 269 270 if self.execv: 271 self._do_execv()
272
273 - def wait(self, state, interval=0.1):
274 """Wait for the given state(s).""" 275 if isinstance(state, (tuple, list)): 276 states = state 277 else: 278 states = [state] 279 280 def _wait(): 281 while self.state not in states: 282 time.sleep(interval)
283 284 # From http://psyco.sourceforge.net/psycoguide/bugs.html: 285 # "The compiled machine code does not include the regular polling 286 # done by Python, meaning that a KeyboardInterrupt will not be 287 # detected before execution comes back to the regular Python 288 # interpreter. Your program cannot be interrupted if caught 289 # into an infinite Psyco-compiled loop." 290 try: 291 sys.modules['psyco'].cannotcompile(_wait) 292 except (KeyError, AttributeError): 293 pass 294 295 _wait()
296
297 - def _do_execv(self):
298 """Re-execute the current process. 299 300 This must be called from the main thread, because certain platforms 301 (OS X) don't allow execv to be called in a child thread very well. 302 """ 303 args = sys.argv[:] 304 self.log('Re-spawning %s' % ' '.join(args)) 305 args.insert(0, sys.executable) 306 if sys.platform == 'win32': 307 args = ['"%s"' % arg for arg in args] 308 309 os.execv(sys.executable, args)
310
311 - def stop(self):
312 """Stop all services.""" 313 self.state = states.STOPPING 314 self.log('Bus STOPPING') 315 self.publish('stop') 316 self.state = states.STOPPED 317 self.log('Bus STOPPED')
318
319 - def start_with_callback(self, func, args=None, kwargs=None):
320 """Start 'func' in a new thread T, then start self (and return T).""" 321 if args is None: 322 args = () 323 if kwargs is None: 324 kwargs = {} 325 args = (func,) + args 326 327 def _callback(func, *a, **kw): 328 self.wait(states.STARTED) 329 func(*a, **kw)
330 t = threading.Thread(target=_callback, args=args, kwargs=kwargs) 331 t.setName('Bus Callback ' + t.getName()) 332 t.start() 333 334 self.start() 335 336 return t 337
338 - def log(self, msg="", level=20, traceback=False):
339 """Log the given message. Append the last traceback if requested.""" 340 if traceback: 341 exc = sys.exc_info() 342 msg += "\n" + "".join(_traceback.format_exception(*exc)) 343 self.publish('log', msg, level)
344 345 bus = Bus() 346